Results 1 to 7 of 7

Thread: QUdpSocket Buffering - Approach?

  1. #1
    Join Date
    May 2010
    Posts
    8
    Thanks
    3
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Question QUdpSocket Buffering - Approach?

    Hi there,

    I have been doing some basic network programming using Qt and want to further my knowledge to tackle the following problem. First some background info.

    I have a network accessible device that accepts UDP packets on a particular port. The UDP packets contain specific binary data in a certain order (eg. first byte indicates packet type etc). The device will send responses back to the originating source port - so one can send and receive from the device using a single port. I have this working nicely.

    The setup is like this:

    MyApp <--- UDP ---> MyDevice

    Now, I want to bind to another port on the host, and repackage any incoming data to send to MyDevice like this

    SomeOtherUDPSource <--- UDP ---> MyApp <--- UDP ---> MyDevice

    Now, the problem is this: MyDevice isn't so quick with its processing. It appears I need to slow the sending rate down. Now, according to the doco for QAbstractSocket buffering isn't provided for QUdpSocket which is left to the operating system to handle.

    That's okay, but what's the best way to handle it?

    • Sleep the thread for a bit - Correct me if I am wrong, but the problem with this approach is that depending on the sleep interval, the OS may drop the buffered packet, afterall nothing has read it.
    • Implement a queue producer/consumer approach - multi thread approach, one thread (say incoming) adds the packets to a queue which is then processed more slowly by another thread (outgoing). Here, incoming is receiving faster than outgoing so overruns will occur eventually but this is now a matter of the amount of memory for the queue, correct? And a simple overrun check could be added I suppose.
    • Have a timer with a slot that sends a packet on the incoming queue every Xms - slightly easier than the threaded approach?


    I'd rather get the design right before launching into implementation, so would like to hear thoughts an opinions on a good way to approach this...

    Cheers,

    Xav.

  2. #2
    Join Date
    Jan 2006
    Location
    Belgium
    Posts
    1,938
    Thanked 268 Times in 268 Posts
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows
    Wiki edits
    20

    Default Re: QUdpSocket Buffering - Approach?

    Not an answer to your question I'm afraid, but I'm a little confused. Why use UDP? UDP is good for streaming media for example, where packets may drop.
    Can't these devices work via TCP?

  3. The following user says thank you to tbscope for this useful post:

    xavierda (26th May 2010)

  4. #3
    Join Date
    May 2010
    Posts
    8
    Thanks
    3
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Default Re: QUdpSocket Buffering - Approach?

    Quote Originally Posted by tbscope View Post
    Not an answer to your question I'm afraid, but I'm a little confused. Why use UDP? UDP is good for streaming media for example, where packets may drop.
    Can't these devices work via TCP?
    They're an embedded device - one can telnet to them and poke around, so TCP is available in that sense. However they expose a UDP-based API to manipulate their functionality and it's this I am trying to use. I cannot change it's implementation, yet if I had access to the VxWorks developer tools I could, in theory change it and put some TCP-based API in its place - for now I am using what is already there. The device is actually a radio and I can embed a certain amount of data in a UDP packet, which when sent in the right format to the right port will cause it to be sent over the air (where another radio performs the reverse operation and dumps it out its Ethernet port...)

    I'm leaning towards the queue approach using multiple threads - if it can be done using a single thread, I'm happy to do that too but in order to slow the rate of sending I think a couple of threads are called for.

  5. #4
    Join Date
    May 2010
    Posts
    8
    Thanks
    3
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Talking Re: QUdpSocket Buffering - Approach?

    Quote Originally Posted by xavierda View Post
    Hi there,

    • Implement a queue producer/consumer approach - multi thread approach, one thread (say incoming) adds the packets to a queue which is then processed more slowly by another thread (outgoing). Here, incoming is receiving faster than outgoing so overruns will occur eventually but this is now a matter of the amount of memory for the queue, correct? And a simple overrun check could be added I suppose.
    Quote Originally Posted by xavierda View Post
    I'm leaning towards the queue approach using multiple threads - if it can be done using a single thread, I'm happy to do that too but in order to slow the rate of sending I think a couple of threads are called for.
    Okay, I managed to split the work across 2 threads tonight: the main thread has the QUdpSocket and another thread has a QQueue member variable. In the readyRead connected slot in the main thread I then add the received packet to the queue in my other thread. This other thread utilises a combination of QMutex and QWaitCondition to effectively wakeup when there are packets waiting in the queue and go to sleep when it's empty. In processing my queue I introduced a small delay (to simulate the processing time for the embedded device) and it acts as expected, that is packets are queued as they come in, then processed at a slower rate The queue just grows and grows and eventually I suspect it will fail once memory is exhausted but it's sufficient for my needs at this stage.

    However, in doing so I came across another problem (typical!)

    My implmentation of my queue processing thread has a continuously running loop (ie. run() keeps going until the thread is killed) - is there a way to mix this approach with the event loop (ie. calling exec())? The event loop is such that I can have a QUdpSocket in the same thread to receive packets back from my device. I was hoping to just plumb the readyRead signal using connect() but realised I can't call exec() AND have an endless run() loop. So my question now is, is it best to simply have another thread specifically for recieving with another QUdpSocket?

    So something like this:

    ---> incoming UDP ---> QUdpSocket [Main Thread ]
    Main Thread ---> addPacket (packetQueue) ---> QUdpSocket.write(packet) ---> Device [Queue Processing Thread]
    Main Thread <-- emit PacketReceived(packet) <--- QUdpSocket.readyRead() <--- Device [Incoming Packet Thread]

    Or, given that I don't have to pause/slow down in processing the responses, just received the packets in the main thread? What if I want to queue them too? For example, on receiving packets I keep the last five in the queue to say calculate statistics on the data and display to the user. I guess I'm trying to implement 2-way arbitrary, queued sending and receiving in such a way that the main thread is always running...Suggestions and thoughts?

    Cheers,

    Xav.

  6. #5
    Join Date
    May 2010
    Posts
    8
    Thanks
    3
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Lightbulb Re: QUdpSocket Buffering - Solution (Long Post!)

    Quote Originally Posted by xavierda View Post

    However, in doing so I came across another problem (typical!)

    My implmentation of my queue processing thread has a continuously running loop (ie. run() keeps going until the thread is killed) - is there a way to mix this approach with the event loop (ie. calling exec())? The event loop is such that I can have a QUdpSocket in the same thread to receive packets back from my device. I was hoping to just plumb the readyRead signal using connect() but realised I can't call exec() AND have an endless run() loop. So my question now is, is it best to simply have another thread specifically for recieving with another QUdpSocket?
    Alrighty - still no takers, maybe some code will help move discussion along!

    After some reading here on the forums and further experimentation I modified my code and broke it down into a self-contained sample. Goal: Buffer an incoming UDP stream such that I can send the received packets at a later time (ie. offset by a small delay.)

    In answer to my own question above, I used another thread. It was doing my head in as I thought I would need to scatter multiple QUdpSockets all over the place. After some thought, I use just one. So I defined a MyUDPSocket class which sets up a basic QUdpSocket with a MySocketBuffer member variable.

    Qt Code:
    1. // myudpsocket.h
    2. #ifndef MYUDPSOCKET_H
    3. #define MYUDPSOCKET_H
    4.  
    5. #include <QThread>
    6. #include <QMutex>
    7. #include <QUdpSocket>
    8. #include "mysocketbuffer.h"
    9.  
    10. class MyUDPSocket: public QThread
    11. {
    12. Q_OBJECT
    13.  
    14. public:
    15. MyUDPSocket();
    16. ~MyUDPSocket();
    17. void run();
    18.  
    19. public slots:
    20. void sendPacket(QByteArray packetData);
    21. void readPackets();
    22. void bufferDepleted();
    23.  
    24. private:
    25. QUdpSocket *udpSkt;
    26. MySocketBuffer *mySktBuf;
    27. QMutex mutex;
    28. };
    29.  
    30. #endif // MYUDPSOCKET_H
    31.  
    32. // mysocketbuffer.h
    33. #ifndef MYSOCKETBUFFER_H
    34. #define MYSOCKETBUFFER_H
    35.  
    36. #include <QThread>
    37. #include <QMutex>
    38. #include <QWaitCondition>
    39. #include <QQueue>
    40.  
    41. class MySocketBuffer : public QThread
    42. {
    43. Q_OBJECT
    44.  
    45. public:
    46. MySocketBuffer();
    47. ~MySocketBuffer();
    48.  
    49. void run();
    50. void addPacket(QByteArray *packetData);
    51.  
    52. signals:
    53. void sendDelayedPacket(QByteArray);
    54. void packetQueueDepleted();
    55.  
    56. private:
    57. /* Threading variables */
    58. QMutex mutex;
    59. QWaitCondition packetAdded;
    60. QQueue<QByteArray *> packetQueue;
    61. };
    62.  
    63. #endif // MYSOCKETBUFFER_H
    To copy to clipboard, switch view to plain text mode 

    Now for the implementation. MyUDPSocket simply kicks off the buffer and connects the readyRead() signal as per usual for reading in UDP packets. When a packet is received, it adds it to the secondary thread's packet queue (implemented as a QQueue of QByteArrays).

    Qt Code:
    1. // myudpsocket.cpp
    2. #include <QUdpSocket>
    3. #include "myudpsocket.h"
    4. #include "mysocketbuffer.h"
    5.  
    6. MyUDPSocket::MyUDPSocket()
    7. {
    8. qDebug() << "Setting up MyUDPSocket...";
    9. }
    10.  
    11. MyUDPSocket::~MyUDPSocket()
    12. {
    13. exit();
    14. wait();
    15. delete mySktBuf;
    16. delete udpSkt;
    17.  
    18. qDebug() << "MyUDPSocket Destroyed!";
    19. }
    20.  
    21. void MyUDPSocket::run()
    22. {
    23. udpSkt = new QUdpSocket();
    24. mySktBuf = new MySocketBuffer();
    25.  
    26. udpSkt->bind(51200);
    27.  
    28. connect(udpSkt, SIGNAL(readyRead()), this, SLOT(readPackets()));
    29. connect(mySktBuf, SIGNAL(sendDelayedPacket(QByteArray)), this, SLOT(sendPacket(QByteArray)));
    30. connect(mySktBuf, SIGNAL(packetQueueDepleted()), this, SLOT(bufferDepleted()));
    31.  
    32. mySktBuf->start();
    33.  
    34. qDebug() << "Starting MyUDPSocket event loop...";
    35.  
    36. // Setup event loop to receive packets
    37. exec();
    38. }
    39.  
    40. void MyUDPSocket::readPackets()
    41. {
    42. //QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
    43. while (udpSkt->hasPendingDatagrams())
    44. {
    45. QByteArray datagram;
    46. datagram.resize(udpSkt->pendingDatagramSize());
    47. QHostAddress sender;
    48. quint16 senderPort;
    49.  
    50. udpSkt->readDatagram(datagram.data(), datagram.size(),
    51. &sender, &senderPort);
    52.  
    53. qDebug() << "Received a packet! Adding it to queue...";
    54. mySktBuf->addPacket(new QByteArray(datagram));
    55. }
    56. }
    57.  
    58. void MyUDPSocket::sendPacket(QByteArray packetData)
    59. {
    60. qDebug() << "Queued packet send...";
    61. //QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
    62. udpSkt->writeDatagram(packetData, QHostAddress("10.1.4.1"), 51200);
    63. }
    64.  
    65. void MyUDPSocket::bufferDepleted()
    66. {
    67. qDebug() << "MySocketBuffer Depleted!";
    68. }
    To copy to clipboard, switch view to plain text mode 

    All MySocketBuffer does is wait until there's packets in the queue then sends a signal back to the first thread with the packet data, otherwise it goes to sleep - this thread's run() method is an endless loop, so by partitioning the code into 2 threads, I can use the exec() pattern in MyUDPSocket (to receive data) and this approach, which places the thread under control of the first one through the addPacket() method.

    Qt Code:
    1. // mysocketbuffer.cpp
    2. #include <QDebug>
    3. #include <QMutexLocker>
    4. #include "mysocketbuffer.h"
    5.  
    6. QByteArray * const endOfDataPackets = 0;
    7.  
    8. MySocketBuffer::MySocketBuffer()
    9. {
    10. qDebug() << "Setting up MySocketBuffer...";
    11. }
    12.  
    13. MySocketBuffer::~MySocketBuffer()
    14. {
    15. // Delete all the queued packets
    16. {
    17. QMutexLocker locker(&mutex);
    18. while (!packetQueue.isEmpty())
    19. delete packetQueue.dequeue();
    20. packetQueue.enqueue(endOfDataPackets);
    21. packetAdded.wakeOne();
    22. }
    23. wait();
    24. qDebug() << "MySocketBuffer Destroyed!";
    25. }
    26.  
    27. void MySocketBuffer::run()
    28. {
    29. qDebug() << "MySocketBuffer running...";
    30.  
    31. QByteArray *packet = 0;
    32.  
    33. forever {
    34. {
    35. QMutexLocker locker(&mutex);
    36.  
    37. if (packetQueue.isEmpty())
    38. packetAdded.wait(&mutex);
    39. packet = packetQueue.dequeue();
    40. qDebug() << "Packets Left:" << packetQueue.count();
    41. if (packet == endOfDataPackets)
    42. break;
    43. }
    44.  
    45. // Simulate some packet processing, which introduces small delay
    46. msleep(21);
    47. //sleep(1);
    48. emit sendDelayedPacket(*packet);
    49. delete packet;
    50.  
    51. {
    52. QMutexLocker locker(&mutex);
    53. if (packetQueue.isEmpty())
    54. emit packetQueueDepleted();
    55. }
    56. }
    57. }
    58.  
    59. void MySocketBuffer::addPacket(QByteArray *packetData)
    60. {
    61. QMutexLocker locker(&mutex);
    62. packetQueue.enqueue(packetData);
    63. packetAdded.wakeOne();
    64. }
    To copy to clipboard, switch view to plain text mode 

    I run this in a Linux VM - the IP address of 10.1.4.1 is an alias for my host machine. I setup VLC media player to stream an MP3 file from my host machine to my VM. It queues the packets and sends them back, also on port 51200. I point another instance of VLC to this port - it works! The app quits after 30 seconds (this is mainly to test cleanup code, also included).

    Qt Code:
    1. // Sample output
    2. Setting up MyUDPSocket...
    3. Setting up MySocketBuffer...
    4. MySocketBuffer running...
    5. Starting MyUDPSocket event loop...
    6. Received a packet! Adding it to queue...
    7. Packets Left: 2
    8. Queued packet send...
    9. Packets Left: 1
    10. Queued packet send...
    11. ...
    12. ...
    13. Queued packet send...
    14. MySocketBuffer Depleted!
    15. Received a packet! Adding it to queue...
    16. Packets Left: 0
    17. MySocketBuffer Destroyed!
    18. MyUDPSocket Destroyed!
    To copy to clipboard, switch view to plain text mode 

    As commented in the code above, I don't think I need to use QMutexLocker to protect the QUdpSocket as Qt handles it all using it's queued slot mechanism, right?

    This has been a good exercise for me to better understand threading and Qt's approach to it in general, so I have added the code here as an attachment.

    BUT! Is there a better way? Are there any gotcha's in the code above I haven't handled?

    Cheers,

    Xav.
    Attached Files Attached Files

  7. #6
    Join Date
    May 2010
    Posts
    24
    Thanked 8 Times in 7 Posts
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Default Re: QUdpSocket Buffering - Solution (Long Post!)

    Although MyUDPSocket represents a thread, its slots are not automatically executed in that thread.
    You have to move the QThread object itself to the thread with QObject::moveToThread:
    Qt Code:
    1. MyUDPSocket::MyUDPSocket()
    2. {
    3. qDebug() << "Setting up MyUDPSocket...";
    4. moveToThread(this);
    5. }
    To copy to clipboard, switch view to plain text mode 

    And since QByteArray is basically already a ref-counted pointer to an internal buffer, you should use constant references (const QByteArray &) rather than pointer to pass it through signals/slots.

  8. The following user says thank you to alexisdm for this useful post:

    xavierda (31st May 2010)

  9. #7
    Join Date
    May 2010
    Posts
    8
    Thanks
    3
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Wink Re: QUdpSocket Buffering - Solution (Long Post!)

    Quote Originally Posted by alexisdm View Post
    Although MyUDPSocket represents a thread, its slots are not automatically executed in that thread.
    Ah! I read a lot about that and got confused very quickly. I went back to my code and added some more debug information to better appreciate what is going on. Essentially I just print out the thread address using QThread::currenThread(). It worked before because I had the normal event loop setup in my main thread, like this:

    Qt Code:
    1. int main(int argc, char *argv[])
    2. {
    3. QCoreApplication a(argc, argv);
    4. QTimer::singleShot(30000, &a, SLOT(quit()));
    5. MyUDPSocket myUDPSocket;
    6. myUDPSocket.start();
    7. return a.exec();
    8. }
    To copy to clipboard, switch view to plain text mode 

    So before adding moveToThread(), I'd get output like:

    Qt Code:
    1. Thread: QThread(0x99f1400) In main...
    2. Thread: QThread(0x99f1400) Setting up MyUDPSocket...
    3. Thread: MyUDPSocket(0xbf96d1d8) Setting up MySocketBuffer...
    4. Thread: MyUDPSocket(0xbf96d1d8) Starting MyUDPSocket event loop...
    5. Thread: MySocketBuffer(0x9a0def0) MySocketBuffer running...
    6. // Events for QUdpSocket are processed in main thread!
    7. Thread: QThread(0x99f1400) Received a packet! Adding it to queue...
    8. Thread: MySocketBuffer(0x9a0def0) Packets Left: 0
    9. Thread: QThread(0x99f1400) Queued packet send...
    To copy to clipboard, switch view to plain text mode 

    After moveToThread:

    Qt Code:
    1. Thread: QThread(0x95ca400) In main...
    2. Thread: QThread(0x95ca400) Setting up MyUDPSocket...
    3. Thread: MyUDPSocket(0xbfb580e8) Setting up MySocketBuffer...
    4. Thread: MySocketBuffer(0x95d2b50) MySocketBuffer running...
    5. Thread: MyUDPSocket(0xbfb580e8) Starting MyUDPSocket event loop...
    6. // The MyUDPSocket thread is now handling all the events! :)
    7. Thread: MyUDPSocket(0xbfb580e8) Received a packet! Adding it to queue...
    8. Thread: MySocketBuffer(0x95d2b50) Packets Left: 0
    To copy to clipboard, switch view to plain text mode 

    The moveToThread call is not required for MySocketBuffer as it is already created in the context of MyUdpSocket's thread, right? Just want to be sure I understand this...

    Quote Originally Posted by alexisdm View Post
    And since QByteArray is basically already a ref-counted pointer to an internal buffer, you should use constant references (const QByteArray &) rather than pointer to pass it through signals/slots.
    Yep, tired eyes and compiler errors threw me on that. Had to read a bit about const declaration and const correctness to understand that better - so my knowledge was increased there too!

    I've updated the code and attached to this post so others can learn. I'm looking now at expanding on this code now so that I can plug in different queue processors - for example one queue will slow sending down as per the code in this post, while another will manipulate the data somehow. Looks like the Strategy design pattern will come in handy for this...But my previous question still stands: Is there a more efficient way to queue packets? Or is this a reasonable approach?

    Cheers,

    Xav.
    Attached Files Attached Files

Similar Threads

  1. Encryption, approach request
    By unix7777 in forum Newbie
    Replies: 5
    Last Post: 22nd February 2010, 16:19
  2. double buffering
    By HelloDan in forum Newbie
    Replies: 10
    Last Post: 31st March 2009, 04:14
  3. thread - right approach?
    By heinz32 in forum Qt Programming
    Replies: 3
    Last Post: 17th June 2008, 18:39
  4. Double Buffering for plot graphs
    By Tavit in forum Qt Programming
    Replies: 0
    Last Post: 20th March 2008, 14:10
  5. what is the best approach
    By baray98 in forum Qt Programming
    Replies: 1
    Last Post: 14th September 2007, 10:02

Tags for this Thread

Bookmarks

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
Digia, Qt and their respective logos are trademarks of Digia Plc in Finland and/or other countries worldwide.