
Originally Posted by
xavierda
However, in doing so I came across another problem (typical!)
My implmentation of my queue processing thread has a continuously running loop (ie. run() keeps going until the thread is killed) - is there a way to mix this approach with the event loop (ie. calling exec())? The event loop is such that I can have a QUdpSocket in the same thread to receive packets back from my device. I was hoping to just plumb the readyRead signal using connect() but realised I can't call exec() AND have an endless run() loop. So my question now is, is it best to simply have another thread specifically for recieving with another QUdpSocket?
Alrighty - still no takers, maybe some code will help move discussion along! 
After some reading here on the forums and further experimentation I modified my code and broke it down into a self-contained sample. Goal: Buffer an incoming UDP stream such that I can send the received packets at a later time (ie. offset by a small delay.)
In answer to my own question above, I used another thread. It was doing my head in as I thought I would need to scatter multiple QUdpSockets all over the place. After some thought, I use just one. So I defined a MyUDPSocket class which sets up a basic QUdpSocket with a MySocketBuffer member variable.
// myudpsocket.h
#ifndef MYUDPSOCKET_H
#define MYUDPSOCKET_H
#include <QThread>
#include <QMutex>
#include <QUdpSocket>
#include "mysocketbuffer.h"
{
Q_OBJECT
public:
MyUDPSocket();
~MyUDPSocket();
void run();
public slots:
void readPackets();
void bufferDepleted();
private:
MySocketBuffer *mySktBuf;
};
#endif // MYUDPSOCKET_H
// mysocketbuffer.h
#ifndef MYSOCKETBUFFER_H
#define MYSOCKETBUFFER_H
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include <QQueue>
class MySocketBuffer
: public QThread{
Q_OBJECT
public:
MySocketBuffer();
~MySocketBuffer();
void run();
signals:
void packetQueueDepleted();
private:
/* Threading variables */
QQueue<QByteArray *> packetQueue;
};
#endif // MYSOCKETBUFFER_H
// myudpsocket.h
#ifndef MYUDPSOCKET_H
#define MYUDPSOCKET_H
#include <QThread>
#include <QMutex>
#include <QUdpSocket>
#include "mysocketbuffer.h"
class MyUDPSocket: public QThread
{
Q_OBJECT
public:
MyUDPSocket();
~MyUDPSocket();
void run();
public slots:
void sendPacket(QByteArray packetData);
void readPackets();
void bufferDepleted();
private:
QUdpSocket *udpSkt;
MySocketBuffer *mySktBuf;
QMutex mutex;
};
#endif // MYUDPSOCKET_H
// mysocketbuffer.h
#ifndef MYSOCKETBUFFER_H
#define MYSOCKETBUFFER_H
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include <QQueue>
class MySocketBuffer : public QThread
{
Q_OBJECT
public:
MySocketBuffer();
~MySocketBuffer();
void run();
void addPacket(QByteArray *packetData);
signals:
void sendDelayedPacket(QByteArray);
void packetQueueDepleted();
private:
/* Threading variables */
QMutex mutex;
QWaitCondition packetAdded;
QQueue<QByteArray *> packetQueue;
};
#endif // MYSOCKETBUFFER_H
To copy to clipboard, switch view to plain text mode
Now for the implementation. MyUDPSocket simply kicks off the buffer and connects the readyRead() signal as per usual for reading in UDP packets. When a packet is received, it adds it to the secondary thread's packet queue (implemented as a QQueue of QByteArrays).
// myudpsocket.cpp
#include <QUdpSocket>
#include "myudpsocket.h"
#include "mysocketbuffer.h"
MyUDPSocket::MyUDPSocket()
{
qDebug() << "Setting up MyUDPSocket...";
}
MyUDPSocket::~MyUDPSocket()
{
exit();
wait();
delete mySktBuf;
delete udpSkt;
qDebug() << "MyUDPSocket Destroyed!";
}
void MyUDPSocket::run()
{
mySktBuf = new MySocketBuffer();
udpSkt->bind(51200);
connect(udpSkt, SIGNAL(readyRead()), this, SLOT(readPackets()));
connect(mySktBuf, SIGNAL(packetQueueDepleted()), this, SLOT(bufferDepleted()));
mySktBuf->start();
qDebug() << "Starting MyUDPSocket event loop...";
// Setup event loop to receive packets
exec();
}
void MyUDPSocket::readPackets()
{
//QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
while (udpSkt->hasPendingDatagrams())
{
datagram.resize(udpSkt->pendingDatagramSize());
quint16 senderPort;
udpSkt->readDatagram(datagram.data(), datagram.size(),
&sender, &senderPort);
qDebug() << "Received a packet! Adding it to queue...";
}
}
void MyUDPSocket
::sendPacket(QByteArray packetData
) {
qDebug() << "Queued packet send...";
//QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
udpSkt
->writeDatagram
(packetData,
QHostAddress("10.1.4.1"),
51200);
}
void MyUDPSocket::bufferDepleted()
{
qDebug() << "MySocketBuffer Depleted!";
}
// myudpsocket.cpp
#include <QUdpSocket>
#include "myudpsocket.h"
#include "mysocketbuffer.h"
MyUDPSocket::MyUDPSocket()
{
qDebug() << "Setting up MyUDPSocket...";
}
MyUDPSocket::~MyUDPSocket()
{
exit();
wait();
delete mySktBuf;
delete udpSkt;
qDebug() << "MyUDPSocket Destroyed!";
}
void MyUDPSocket::run()
{
udpSkt = new QUdpSocket();
mySktBuf = new MySocketBuffer();
udpSkt->bind(51200);
connect(udpSkt, SIGNAL(readyRead()), this, SLOT(readPackets()));
connect(mySktBuf, SIGNAL(sendDelayedPacket(QByteArray)), this, SLOT(sendPacket(QByteArray)));
connect(mySktBuf, SIGNAL(packetQueueDepleted()), this, SLOT(bufferDepleted()));
mySktBuf->start();
qDebug() << "Starting MyUDPSocket event loop...";
// Setup event loop to receive packets
exec();
}
void MyUDPSocket::readPackets()
{
//QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
while (udpSkt->hasPendingDatagrams())
{
QByteArray datagram;
datagram.resize(udpSkt->pendingDatagramSize());
QHostAddress sender;
quint16 senderPort;
udpSkt->readDatagram(datagram.data(), datagram.size(),
&sender, &senderPort);
qDebug() << "Received a packet! Adding it to queue...";
mySktBuf->addPacket(new QByteArray(datagram));
}
}
void MyUDPSocket::sendPacket(QByteArray packetData)
{
qDebug() << "Queued packet send...";
//QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
udpSkt->writeDatagram(packetData, QHostAddress("10.1.4.1"), 51200);
}
void MyUDPSocket::bufferDepleted()
{
qDebug() << "MySocketBuffer Depleted!";
}
To copy to clipboard, switch view to plain text mode
All MySocketBuffer does is wait until there's packets in the queue then sends a signal back to the first thread with the packet data, otherwise it goes to sleep - this thread's run() method is an endless loop, so by partitioning the code into 2 threads, I can use the exec() pattern in MyUDPSocket (to receive data) and this approach, which places the thread under control of the first one through the addPacket() method.
// mysocketbuffer.cpp
#include <QDebug>
#include <QMutexLocker>
#include "mysocketbuffer.h"
MySocketBuffer::MySocketBuffer()
{
qDebug() << "Setting up MySocketBuffer...";
}
MySocketBuffer::~MySocketBuffer()
{
// Delete all the queued packets
{
while (!packetQueue.isEmpty())
delete packetQueue.dequeue();
packetQueue.enqueue(endOfDataPackets);
packetAdded.wakeOne();
}
wait();
qDebug() << "MySocketBuffer Destroyed!";
}
void MySocketBuffer::run()
{
qDebug() << "MySocketBuffer running...";
forever {
{
if (packetQueue.isEmpty())
packetAdded.wait(&mutex);
packet = packetQueue.dequeue();
qDebug() << "Packets Left:" << packetQueue.count();
if (packet == endOfDataPackets)
break;
}
// Simulate some packet processing, which introduces small delay
msleep(21);
//sleep(1);
emit sendDelayedPacket(*packet);
delete packet;
{
if (packetQueue.isEmpty())
emit packetQueueDepleted();
}
}
}
void MySocketBuffer
::addPacket(QByteArray *packetData
) {
packetQueue.enqueue(packetData);
packetAdded.wakeOne();
}
// mysocketbuffer.cpp
#include <QDebug>
#include <QMutexLocker>
#include "mysocketbuffer.h"
QByteArray * const endOfDataPackets = 0;
MySocketBuffer::MySocketBuffer()
{
qDebug() << "Setting up MySocketBuffer...";
}
MySocketBuffer::~MySocketBuffer()
{
// Delete all the queued packets
{
QMutexLocker locker(&mutex);
while (!packetQueue.isEmpty())
delete packetQueue.dequeue();
packetQueue.enqueue(endOfDataPackets);
packetAdded.wakeOne();
}
wait();
qDebug() << "MySocketBuffer Destroyed!";
}
void MySocketBuffer::run()
{
qDebug() << "MySocketBuffer running...";
QByteArray *packet = 0;
forever {
{
QMutexLocker locker(&mutex);
if (packetQueue.isEmpty())
packetAdded.wait(&mutex);
packet = packetQueue.dequeue();
qDebug() << "Packets Left:" << packetQueue.count();
if (packet == endOfDataPackets)
break;
}
// Simulate some packet processing, which introduces small delay
msleep(21);
//sleep(1);
emit sendDelayedPacket(*packet);
delete packet;
{
QMutexLocker locker(&mutex);
if (packetQueue.isEmpty())
emit packetQueueDepleted();
}
}
}
void MySocketBuffer::addPacket(QByteArray *packetData)
{
QMutexLocker locker(&mutex);
packetQueue.enqueue(packetData);
packetAdded.wakeOne();
}
To copy to clipboard, switch view to plain text mode
I run this in a Linux VM - the IP address of 10.1.4.1 is an alias for my host machine. I setup VLC media player to stream an MP3 file from my host machine to my VM. It queues the packets and sends them back, also on port 51200. I point another instance of VLC to this port - it works! The app quits after 30 seconds (this is mainly to test cleanup code, also included).
// Sample output
Setting up MyUDPSocket...
Setting up MySocketBuffer...
MySocketBuffer running...
Starting MyUDPSocket event loop...
Received a packet! Adding it to queue...
Packets Left: 2
Queued packet send...
Packets Left: 1
Queued packet send...
...
...
Queued packet send...
MySocketBuffer Depleted!
Received a packet! Adding it to queue...
Packets Left: 0
MySocketBuffer Destroyed!
MyUDPSocket Destroyed!
// Sample output
Setting up MyUDPSocket...
Setting up MySocketBuffer...
MySocketBuffer running...
Starting MyUDPSocket event loop...
Received a packet! Adding it to queue...
Packets Left: 2
Queued packet send...
Packets Left: 1
Queued packet send...
...
...
Queued packet send...
MySocketBuffer Depleted!
Received a packet! Adding it to queue...
Packets Left: 0
MySocketBuffer Destroyed!
MyUDPSocket Destroyed!
To copy to clipboard, switch view to plain text mode
As commented in the code above, I don't think I need to use QMutexLocker to protect the QUdpSocket as Qt handles it all using it's queued slot mechanism, right?
This has been a good exercise for me to better understand threading and Qt's approach to it in general, so I have added the code here as an attachment.
BUT! Is there a better way? Are there any gotcha's in the code above I haven't handled?
Cheers,
Xav.
Bookmarks