PDA

View Full Version : QUdpSocket Buffering - Approach?



xavierda
25th May 2010, 15:09
Hi there,

I have been doing some basic network programming using Qt and want to further my knowledge to tackle the following problem. First some background info.

I have a network accessible device that accepts UDP packets on a particular port. The UDP packets contain specific binary data in a certain order (eg. first byte indicates packet type etc). The device will send responses back to the originating source port - so one can send and receive from the device using a single port. I have this working nicely.

The setup is like this:

MyApp <--- UDP ---> MyDevice

Now, I want to bind to another port on the host, and repackage any incoming data to send to MyDevice like this

SomeOtherUDPSource <--- UDP ---> MyApp <--- UDP ---> MyDevice

Now, the problem is this: MyDevice isn't so quick with its processing. It appears I need to slow the sending rate down. Now, according to the doco for QAbstractSocket (http://doc.trolltech.com/latest/qabstractsocket.html#setReadBufferSize) buffering isn't provided for QUdpSocket which is left to the operating system to handle.

That's okay, but what's the best way to handle it?


Sleep the thread for a bit - Correct me if I am wrong, but the problem with this approach is that depending on the sleep interval, the OS may drop the buffered packet, afterall nothing has read it.
Implement a queue producer/consumer approach - multi thread approach, one thread (say incoming) adds the packets to a queue which is then processed more slowly by another thread (outgoing). Here, incoming is receiving faster than outgoing so overruns will occur eventually but this is now a matter of the amount of memory for the queue, correct? And a simple overrun check could be added I suppose.
Have a timer with a slot that sends a packet on the incoming queue every Xms - slightly easier than the threaded approach?


I'd rather get the design right before launching into implementation, so would like to hear thoughts an opinions on a good way to approach this...

Cheers,

Xav.

tbscope
25th May 2010, 16:33
Not an answer to your question I'm afraid, but I'm a little confused. Why use UDP? UDP is good for streaming media for example, where packets may drop.
Can't these devices work via TCP?

xavierda
25th May 2010, 23:04
Not an answer to your question I'm afraid, but I'm a little confused. Why use UDP? UDP is good for streaming media for example, where packets may drop.
Can't these devices work via TCP?

They're an embedded device - one can telnet to them and poke around, so TCP is available in that sense. However they expose a UDP-based API to manipulate their functionality and it's this I am trying to use. I cannot change it's implementation, yet if I had access to the VxWorks developer tools I could, in theory change it and put some TCP-based API in its place - for now I am using what is already there. The device is actually a radio and I can embed a certain amount of data in a UDP packet, which when sent in the right format to the right port will cause it to be sent over the air (where another radio performs the reverse operation and dumps it out its Ethernet port...)

I'm leaning towards the queue approach using multiple threads - if it can be done using a single thread, I'm happy to do that too but in order to slow the rate of sending I think a couple of threads are called for.

xavierda
26th May 2010, 15:24
Hi there,


Implement a queue producer/consumer approach - multi thread approach, one thread (say incoming) adds the packets to a queue which is then processed more slowly by another thread (outgoing). Here, incoming is receiving faster than outgoing so overruns will occur eventually but this is now a matter of the amount of memory for the queue, correct? And a simple overrun check could be added I suppose.



I'm leaning towards the queue approach using multiple threads - if it can be done using a single thread, I'm happy to do that too but in order to slow the rate of sending I think a couple of threads are called for.

Okay, I managed to split the work across 2 threads tonight: the main thread has the QUdpSocket and another thread has a QQueue member variable. In the readyRead connected slot in the main thread I then add the received packet to the queue in my other thread. This other thread utilises a combination of QMutex and QWaitCondition to effectively wakeup when there are packets waiting in the queue and go to sleep when it's empty. In processing my queue I introduced a small delay (to simulate the processing time for the embedded device) and it acts as expected, that is packets are queued as they come in, then processed at a slower rate :) The queue just grows and grows and eventually I suspect it will fail once memory is exhausted but it's sufficient for my needs at this stage.

However, in doing so I came across another problem (typical!)

My implmentation of my queue processing thread has a continuously running loop (ie. run() keeps going until the thread is killed) - is there a way to mix this approach with the event loop (ie. calling exec())? The event loop is such that I can have a QUdpSocket in the same thread to receive packets back from my device. I was hoping to just plumb the readyRead signal using connect() but realised I can't call exec() AND have an endless run() loop. So my question now is, is it best to simply have another thread specifically for recieving with another QUdpSocket?

So something like this:

---> incoming UDP ---> QUdpSocket [Main Thread ]
Main Thread ---> addPacket (packetQueue) ---> QUdpSocket.write(packet) ---> Device [Queue Processing Thread]
Main Thread <-- emit PacketReceived(packet) <--- QUdpSocket.readyRead() <--- Device [Incoming Packet Thread]

Or, given that I don't have to pause/slow down in processing the responses, just received the packets in the main thread? What if I want to queue them too? For example, on receiving packets I keep the last five in the queue to say calculate statistics on the data and display to the user. I guess I'm trying to implement 2-way arbitrary, queued sending and receiving in such a way that the main thread is always running...Suggestions and thoughts?

Cheers,

Xav.

xavierda
30th May 2010, 15:29
However, in doing so I came across another problem (typical!)

My implmentation of my queue processing thread has a continuously running loop (ie. run() keeps going until the thread is killed) - is there a way to mix this approach with the event loop (ie. calling exec())? The event loop is such that I can have a QUdpSocket in the same thread to receive packets back from my device. I was hoping to just plumb the readyRead signal using connect() but realised I can't call exec() AND have an endless run() loop. So my question now is, is it best to simply have another thread specifically for recieving with another QUdpSocket?

Alrighty - still no takers, maybe some code will help move discussion along! ;)

After some reading here on the forums and further experimentation I modified my code and broke it down into a self-contained sample. Goal: Buffer an incoming UDP stream such that I can send the received packets at a later time (ie. offset by a small delay.)

In answer to my own question above, I used another thread. It was doing my head in as I thought I would need to scatter multiple QUdpSockets all over the place. After some thought, I use just one. So I defined a MyUDPSocket class which sets up a basic QUdpSocket with a MySocketBuffer member variable.



// myudpsocket.h
#ifndef MYUDPSOCKET_H
#define MYUDPSOCKET_H

#include <QThread>
#include <QMutex>
#include <QUdpSocket>
#include "mysocketbuffer.h"

class MyUDPSocket: public QThread
{
Q_OBJECT

public:
MyUDPSocket();
~MyUDPSocket();
void run();

public slots:
void sendPacket(QByteArray packetData);
void readPackets();
void bufferDepleted();

private:
QUdpSocket *udpSkt;
MySocketBuffer *mySktBuf;
QMutex mutex;
};

#endif // MYUDPSOCKET_H

// mysocketbuffer.h
#ifndef MYSOCKETBUFFER_H
#define MYSOCKETBUFFER_H

#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include <QQueue>

class MySocketBuffer : public QThread
{
Q_OBJECT

public:
MySocketBuffer();
~MySocketBuffer();

void run();
void addPacket(QByteArray *packetData);

signals:
void sendDelayedPacket(QByteArray);
void packetQueueDepleted();

private:
/* Threading variables */
QMutex mutex;
QWaitCondition packetAdded;
QQueue<QByteArray *> packetQueue;
};

#endif // MYSOCKETBUFFER_H


Now for the implementation. MyUDPSocket simply kicks off the buffer and connects the readyRead() signal as per usual for reading in UDP packets. When a packet is received, it adds it to the secondary thread's packet queue (implemented as a QQueue of QByteArrays).



// myudpsocket.cpp
#include <QUdpSocket>
#include "myudpsocket.h"
#include "mysocketbuffer.h"

MyUDPSocket::MyUDPSocket()
{
qDebug() << "Setting up MyUDPSocket...";
}

MyUDPSocket::~MyUDPSocket()
{
exit();
wait();
delete mySktBuf;
delete udpSkt;

qDebug() << "MyUDPSocket Destroyed!";
}

void MyUDPSocket::run()
{
udpSkt = new QUdpSocket();
mySktBuf = new MySocketBuffer();

udpSkt->bind(51200);

connect(udpSkt, SIGNAL(readyRead()), this, SLOT(readPackets()));
connect(mySktBuf, SIGNAL(sendDelayedPacket(QByteArray)), this, SLOT(sendPacket(QByteArray)));
connect(mySktBuf, SIGNAL(packetQueueDepleted()), this, SLOT(bufferDepleted()));

mySktBuf->start();

qDebug() << "Starting MyUDPSocket event loop...";

// Setup event loop to receive packets
exec();
}

void MyUDPSocket::readPackets()
{
//QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
while (udpSkt->hasPendingDatagrams())
{
QByteArray datagram;
datagram.resize(udpSkt->pendingDatagramSize());
QHostAddress sender;
quint16 senderPort;

udpSkt->readDatagram(datagram.data(), datagram.size(),
&sender, &senderPort);

qDebug() << "Received a packet! Adding it to queue...";
mySktBuf->addPacket(new QByteArray(datagram));
}
}

void MyUDPSocket::sendPacket(QByteArray packetData)
{
qDebug() << "Queued packet send...";
//QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
udpSkt->writeDatagram(packetData, QHostAddress("10.1.4.1"), 51200);
}

void MyUDPSocket::bufferDepleted()
{
qDebug() << "MySocketBuffer Depleted!";
}


All MySocketBuffer does is wait until there's packets in the queue then sends a signal back to the first thread with the packet data, otherwise it goes to sleep - this thread's run() method is an endless loop, so by partitioning the code into 2 threads, I can use the exec() pattern in MyUDPSocket (to receive data) and this approach, which places the thread under control of the first one through the addPacket() method.



// mysocketbuffer.cpp
#include <QDebug>
#include <QMutexLocker>
#include "mysocketbuffer.h"

QByteArray * const endOfDataPackets = 0;

MySocketBuffer::MySocketBuffer()
{
qDebug() << "Setting up MySocketBuffer...";
}

MySocketBuffer::~MySocketBuffer()
{
// Delete all the queued packets
{
QMutexLocker locker(&mutex);
while (!packetQueue.isEmpty())
delete packetQueue.dequeue();
packetQueue.enqueue(endOfDataPackets);
packetAdded.wakeOne();
}
wait();
qDebug() << "MySocketBuffer Destroyed!";
}

void MySocketBuffer::run()
{
qDebug() << "MySocketBuffer running...";

QByteArray *packet = 0;

forever {
{
QMutexLocker locker(&mutex);

if (packetQueue.isEmpty())
packetAdded.wait(&mutex);
packet = packetQueue.dequeue();
qDebug() << "Packets Left:" << packetQueue.count();
if (packet == endOfDataPackets)
break;
}

// Simulate some packet processing, which introduces small delay
msleep(21);
//sleep(1);
emit sendDelayedPacket(*packet);
delete packet;

{
QMutexLocker locker(&mutex);
if (packetQueue.isEmpty())
emit packetQueueDepleted();
}
}
}

void MySocketBuffer::addPacket(QByteArray *packetData)
{
QMutexLocker locker(&mutex);
packetQueue.enqueue(packetData);
packetAdded.wakeOne();
}


I run this in a Linux VM - the IP address of 10.1.4.1 is an alias for my host machine. I setup VLC media player to stream an MP3 file from my host machine to my VM. It queues the packets and sends them back, also on port 51200. I point another instance of VLC to this port - it works! The app quits after 30 seconds (this is mainly to test cleanup code, also included).



// Sample output
Setting up MyUDPSocket...
Setting up MySocketBuffer...
MySocketBuffer running...
Starting MyUDPSocket event loop...
Received a packet! Adding it to queue...
Packets Left: 2
Queued packet send...
Packets Left: 1
Queued packet send...
...
...
Queued packet send...
MySocketBuffer Depleted!
Received a packet! Adding it to queue...
Packets Left: 0
MySocketBuffer Destroyed!
MyUDPSocket Destroyed!


As commented in the code above, I don't think I need to use QMutexLocker to protect the QUdpSocket as Qt handles it all using it's queued slot mechanism, right?

This has been a good exercise for me to better understand threading and Qt's approach to it in general, so I have added the code here as an attachment.

BUT! Is there a better way? Are there any gotcha's in the code above I haven't handled?

Cheers,

Xav.

alexisdm
31st May 2010, 15:00
Although MyUDPSocket represents a thread, its slots are not automatically executed in that thread.
You have to move the QThread object itself to the thread with QObject::moveToThread:

MyUDPSocket::MyUDPSocket()
{
qDebug() << "Setting up MyUDPSocket...";
moveToThread(this);
}


And since QByteArray is basically already a ref-counted pointer to an internal buffer, you should use constant references (const QByteArray &) rather than pointer to pass it through signals/slots.

xavierda
31st May 2010, 16:11
Although MyUDPSocket represents a thread, its slots are not automatically executed in that thread.

Ah! I read a lot about that and got confused very quickly. I went back to my code and added some more debug information to better appreciate what is going on. Essentially I just print out the thread address using QThread::currenThread() (http://doc.trolltech.com/latest/qthread.html#currentThread). It worked before because I had the normal event loop setup in my main thread, like this:



int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
QTimer::singleShot(30000, &a, SLOT(quit()));
MyUDPSocket myUDPSocket;
myUDPSocket.start();
return a.exec();
}

So before adding moveToThread(), I'd get output like:


Thread: QThread(0x99f1400) In main...
Thread: QThread(0x99f1400) Setting up MyUDPSocket...
Thread: MyUDPSocket(0xbf96d1d8) Setting up MySocketBuffer...
Thread: MyUDPSocket(0xbf96d1d8) Starting MyUDPSocket event loop...
Thread: MySocketBuffer(0x9a0def0) MySocketBuffer running...
// Events for QUdpSocket are processed in main thread!
Thread: QThread(0x99f1400) Received a packet! Adding it to queue...
Thread: MySocketBuffer(0x9a0def0) Packets Left: 0
Thread: QThread(0x99f1400) Queued packet send...

After moveToThread:


Thread: QThread(0x95ca400) In main...
Thread: QThread(0x95ca400) Setting up MyUDPSocket...
Thread: MyUDPSocket(0xbfb580e8) Setting up MySocketBuffer...
Thread: MySocketBuffer(0x95d2b50) MySocketBuffer running...
Thread: MyUDPSocket(0xbfb580e8) Starting MyUDPSocket event loop...
// The MyUDPSocket thread is now handling all the events! :)
Thread: MyUDPSocket(0xbfb580e8) Received a packet! Adding it to queue...
Thread: MySocketBuffer(0x95d2b50) Packets Left: 0

The moveToThread call is not required for MySocketBuffer as it is already created in the context of MyUdpSocket's thread, right? Just want to be sure I understand this...


And since QByteArray is basically already a ref-counted pointer to an internal buffer, you should use constant references (const QByteArray &) rather than pointer to pass it through signals/slots.

Yep, tired eyes and compiler errors threw me on that. Had to read a bit about const declaration (http://duramecho.com/ComputerInformation/WhyHowCppConst.html) and const correctness (http://www.parashift.com/c++-faq-lite/const-correctness.html) to understand that better - so my knowledge was increased there too!

I've updated the code and attached to this post so others can learn. I'm looking now at expanding on this code now so that I can plug in different queue processors - for example one queue will slow sending down as per the code in this post, while another will manipulate the data somehow. Looks like the Strategy design pattern will come in handy for this...But my previous question still stands: Is there a more efficient way to queue packets? Or is this a reasonable approach?

Cheers,

Xav.