Results 1 to 7 of 7

Thread: QUdpSocket Buffering - Approach?

Hybrid View

Previous Post Previous Post   Next Post Next Post
  1. #1
    Join Date
    May 2010
    Posts
    8
    Thanks
    3
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Lightbulb Re: QUdpSocket Buffering - Solution (Long Post!)

    Quote Originally Posted by xavierda View Post

    However, in doing so I came across another problem (typical!)

    My implmentation of my queue processing thread has a continuously running loop (ie. run() keeps going until the thread is killed) - is there a way to mix this approach with the event loop (ie. calling exec())? The event loop is such that I can have a QUdpSocket in the same thread to receive packets back from my device. I was hoping to just plumb the readyRead signal using connect() but realised I can't call exec() AND have an endless run() loop. So my question now is, is it best to simply have another thread specifically for recieving with another QUdpSocket?
    Alrighty - still no takers, maybe some code will help move discussion along!

    After some reading here on the forums and further experimentation I modified my code and broke it down into a self-contained sample. Goal: Buffer an incoming UDP stream such that I can send the received packets at a later time (ie. offset by a small delay.)

    In answer to my own question above, I used another thread. It was doing my head in as I thought I would need to scatter multiple QUdpSockets all over the place. After some thought, I use just one. So I defined a MyUDPSocket class which sets up a basic QUdpSocket with a MySocketBuffer member variable.

    Qt Code:
    1. // myudpsocket.h
    2. #ifndef MYUDPSOCKET_H
    3. #define MYUDPSOCKET_H
    4.  
    5. #include <QThread>
    6. #include <QMutex>
    7. #include <QUdpSocket>
    8. #include "mysocketbuffer.h"
    9.  
    10. class MyUDPSocket: public QThread
    11. {
    12. Q_OBJECT
    13.  
    14. public:
    15. MyUDPSocket();
    16. ~MyUDPSocket();
    17. void run();
    18.  
    19. public slots:
    20. void sendPacket(QByteArray packetData);
    21. void readPackets();
    22. void bufferDepleted();
    23.  
    24. private:
    25. QUdpSocket *udpSkt;
    26. MySocketBuffer *mySktBuf;
    27. QMutex mutex;
    28. };
    29.  
    30. #endif // MYUDPSOCKET_H
    31.  
    32. // mysocketbuffer.h
    33. #ifndef MYSOCKETBUFFER_H
    34. #define MYSOCKETBUFFER_H
    35.  
    36. #include <QThread>
    37. #include <QMutex>
    38. #include <QWaitCondition>
    39. #include <QQueue>
    40.  
    41. class MySocketBuffer : public QThread
    42. {
    43. Q_OBJECT
    44.  
    45. public:
    46. MySocketBuffer();
    47. ~MySocketBuffer();
    48.  
    49. void run();
    50. void addPacket(QByteArray *packetData);
    51.  
    52. signals:
    53. void sendDelayedPacket(QByteArray);
    54. void packetQueueDepleted();
    55.  
    56. private:
    57. /* Threading variables */
    58. QMutex mutex;
    59. QWaitCondition packetAdded;
    60. QQueue<QByteArray *> packetQueue;
    61. };
    62.  
    63. #endif // MYSOCKETBUFFER_H
    To copy to clipboard, switch view to plain text mode 

    Now for the implementation. MyUDPSocket simply kicks off the buffer and connects the readyRead() signal as per usual for reading in UDP packets. When a packet is received, it adds it to the secondary thread's packet queue (implemented as a QQueue of QByteArrays).

    Qt Code:
    1. // myudpsocket.cpp
    2. #include <QUdpSocket>
    3. #include "myudpsocket.h"
    4. #include "mysocketbuffer.h"
    5.  
    6. MyUDPSocket::MyUDPSocket()
    7. {
    8. qDebug() << "Setting up MyUDPSocket...";
    9. }
    10.  
    11. MyUDPSocket::~MyUDPSocket()
    12. {
    13. exit();
    14. wait();
    15. delete mySktBuf;
    16. delete udpSkt;
    17.  
    18. qDebug() << "MyUDPSocket Destroyed!";
    19. }
    20.  
    21. void MyUDPSocket::run()
    22. {
    23. udpSkt = new QUdpSocket();
    24. mySktBuf = new MySocketBuffer();
    25.  
    26. udpSkt->bind(51200);
    27.  
    28. connect(udpSkt, SIGNAL(readyRead()), this, SLOT(readPackets()));
    29. connect(mySktBuf, SIGNAL(sendDelayedPacket(QByteArray)), this, SLOT(sendPacket(QByteArray)));
    30. connect(mySktBuf, SIGNAL(packetQueueDepleted()), this, SLOT(bufferDepleted()));
    31.  
    32. mySktBuf->start();
    33.  
    34. qDebug() << "Starting MyUDPSocket event loop...";
    35.  
    36. // Setup event loop to receive packets
    37. exec();
    38. }
    39.  
    40. void MyUDPSocket::readPackets()
    41. {
    42. //QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
    43. while (udpSkt->hasPendingDatagrams())
    44. {
    45. QByteArray datagram;
    46. datagram.resize(udpSkt->pendingDatagramSize());
    47. QHostAddress sender;
    48. quint16 senderPort;
    49.  
    50. udpSkt->readDatagram(datagram.data(), datagram.size(),
    51. &sender, &senderPort);
    52.  
    53. qDebug() << "Received a packet! Adding it to queue...";
    54. mySktBuf->addPacket(new QByteArray(datagram));
    55. }
    56. }
    57.  
    58. void MyUDPSocket::sendPacket(QByteArray packetData)
    59. {
    60. qDebug() << "Queued packet send...";
    61. //QMutexLocker locker(&mutex); // <-- This doesn't seem necessary (Queued Connection?)
    62. udpSkt->writeDatagram(packetData, QHostAddress("10.1.4.1"), 51200);
    63. }
    64.  
    65. void MyUDPSocket::bufferDepleted()
    66. {
    67. qDebug() << "MySocketBuffer Depleted!";
    68. }
    To copy to clipboard, switch view to plain text mode 

    All MySocketBuffer does is wait until there's packets in the queue then sends a signal back to the first thread with the packet data, otherwise it goes to sleep - this thread's run() method is an endless loop, so by partitioning the code into 2 threads, I can use the exec() pattern in MyUDPSocket (to receive data) and this approach, which places the thread under control of the first one through the addPacket() method.

    Qt Code:
    1. // mysocketbuffer.cpp
    2. #include <QDebug>
    3. #include <QMutexLocker>
    4. #include "mysocketbuffer.h"
    5.  
    6. QByteArray * const endOfDataPackets = 0;
    7.  
    8. MySocketBuffer::MySocketBuffer()
    9. {
    10. qDebug() << "Setting up MySocketBuffer...";
    11. }
    12.  
    13. MySocketBuffer::~MySocketBuffer()
    14. {
    15. // Delete all the queued packets
    16. {
    17. QMutexLocker locker(&mutex);
    18. while (!packetQueue.isEmpty())
    19. delete packetQueue.dequeue();
    20. packetQueue.enqueue(endOfDataPackets);
    21. packetAdded.wakeOne();
    22. }
    23. wait();
    24. qDebug() << "MySocketBuffer Destroyed!";
    25. }
    26.  
    27. void MySocketBuffer::run()
    28. {
    29. qDebug() << "MySocketBuffer running...";
    30.  
    31. QByteArray *packet = 0;
    32.  
    33. forever {
    34. {
    35. QMutexLocker locker(&mutex);
    36.  
    37. if (packetQueue.isEmpty())
    38. packetAdded.wait(&mutex);
    39. packet = packetQueue.dequeue();
    40. qDebug() << "Packets Left:" << packetQueue.count();
    41. if (packet == endOfDataPackets)
    42. break;
    43. }
    44.  
    45. // Simulate some packet processing, which introduces small delay
    46. msleep(21);
    47. //sleep(1);
    48. emit sendDelayedPacket(*packet);
    49. delete packet;
    50.  
    51. {
    52. QMutexLocker locker(&mutex);
    53. if (packetQueue.isEmpty())
    54. emit packetQueueDepleted();
    55. }
    56. }
    57. }
    58.  
    59. void MySocketBuffer::addPacket(QByteArray *packetData)
    60. {
    61. QMutexLocker locker(&mutex);
    62. packetQueue.enqueue(packetData);
    63. packetAdded.wakeOne();
    64. }
    To copy to clipboard, switch view to plain text mode 

    I run this in a Linux VM - the IP address of 10.1.4.1 is an alias for my host machine. I setup VLC media player to stream an MP3 file from my host machine to my VM. It queues the packets and sends them back, also on port 51200. I point another instance of VLC to this port - it works! The app quits after 30 seconds (this is mainly to test cleanup code, also included).

    Qt Code:
    1. // Sample output
    2. Setting up MyUDPSocket...
    3. Setting up MySocketBuffer...
    4. MySocketBuffer running...
    5. Starting MyUDPSocket event loop...
    6. Received a packet! Adding it to queue...
    7. Packets Left: 2
    8. Queued packet send...
    9. Packets Left: 1
    10. Queued packet send...
    11. ...
    12. ...
    13. Queued packet send...
    14. MySocketBuffer Depleted!
    15. Received a packet! Adding it to queue...
    16. Packets Left: 0
    17. MySocketBuffer Destroyed!
    18. MyUDPSocket Destroyed!
    To copy to clipboard, switch view to plain text mode 

    As commented in the code above, I don't think I need to use QMutexLocker to protect the QUdpSocket as Qt handles it all using it's queued slot mechanism, right?

    This has been a good exercise for me to better understand threading and Qt's approach to it in general, so I have added the code here as an attachment.

    BUT! Is there a better way? Are there any gotcha's in the code above I haven't handled?

    Cheers,

    Xav.
    Attached Files Attached Files

  2. #2
    Join Date
    May 2010
    Posts
    24
    Thanked 8 Times in 7 Posts
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Default Re: QUdpSocket Buffering - Solution (Long Post!)

    Although MyUDPSocket represents a thread, its slots are not automatically executed in that thread.
    You have to move the QThread object itself to the thread with QObject::moveToThread:
    Qt Code:
    1. MyUDPSocket::MyUDPSocket()
    2. {
    3. qDebug() << "Setting up MyUDPSocket...";
    4. moveToThread(this);
    5. }
    To copy to clipboard, switch view to plain text mode 

    And since QByteArray is basically already a ref-counted pointer to an internal buffer, you should use constant references (const QByteArray &) rather than pointer to pass it through signals/slots.

  3. The following user says thank you to alexisdm for this useful post:

    xavierda (31st May 2010)

  4. #3
    Join Date
    May 2010
    Posts
    8
    Thanks
    3
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Wink Re: QUdpSocket Buffering - Solution (Long Post!)

    Quote Originally Posted by alexisdm View Post
    Although MyUDPSocket represents a thread, its slots are not automatically executed in that thread.
    Ah! I read a lot about that and got confused very quickly. I went back to my code and added some more debug information to better appreciate what is going on. Essentially I just print out the thread address using QThread::currenThread(). It worked before because I had the normal event loop setup in my main thread, like this:

    Qt Code:
    1. int main(int argc, char *argv[])
    2. {
    3. QCoreApplication a(argc, argv);
    4. QTimer::singleShot(30000, &a, SLOT(quit()));
    5. MyUDPSocket myUDPSocket;
    6. myUDPSocket.start();
    7. return a.exec();
    8. }
    To copy to clipboard, switch view to plain text mode 

    So before adding moveToThread(), I'd get output like:

    Qt Code:
    1. Thread: QThread(0x99f1400) In main...
    2. Thread: QThread(0x99f1400) Setting up MyUDPSocket...
    3. Thread: MyUDPSocket(0xbf96d1d8) Setting up MySocketBuffer...
    4. Thread: MyUDPSocket(0xbf96d1d8) Starting MyUDPSocket event loop...
    5. Thread: MySocketBuffer(0x9a0def0) MySocketBuffer running...
    6. // Events for QUdpSocket are processed in main thread!
    7. Thread: QThread(0x99f1400) Received a packet! Adding it to queue...
    8. Thread: MySocketBuffer(0x9a0def0) Packets Left: 0
    9. Thread: QThread(0x99f1400) Queued packet send...
    To copy to clipboard, switch view to plain text mode 

    After moveToThread:

    Qt Code:
    1. Thread: QThread(0x95ca400) In main...
    2. Thread: QThread(0x95ca400) Setting up MyUDPSocket...
    3. Thread: MyUDPSocket(0xbfb580e8) Setting up MySocketBuffer...
    4. Thread: MySocketBuffer(0x95d2b50) MySocketBuffer running...
    5. Thread: MyUDPSocket(0xbfb580e8) Starting MyUDPSocket event loop...
    6. // The MyUDPSocket thread is now handling all the events! :)
    7. Thread: MyUDPSocket(0xbfb580e8) Received a packet! Adding it to queue...
    8. Thread: MySocketBuffer(0x95d2b50) Packets Left: 0
    To copy to clipboard, switch view to plain text mode 

    The moveToThread call is not required for MySocketBuffer as it is already created in the context of MyUdpSocket's thread, right? Just want to be sure I understand this...

    Quote Originally Posted by alexisdm View Post
    And since QByteArray is basically already a ref-counted pointer to an internal buffer, you should use constant references (const QByteArray &) rather than pointer to pass it through signals/slots.
    Yep, tired eyes and compiler errors threw me on that. Had to read a bit about const declaration and const correctness to understand that better - so my knowledge was increased there too!

    I've updated the code and attached to this post so others can learn. I'm looking now at expanding on this code now so that I can plug in different queue processors - for example one queue will slow sending down as per the code in this post, while another will manipulate the data somehow. Looks like the Strategy design pattern will come in handy for this...But my previous question still stands: Is there a more efficient way to queue packets? Or is this a reasonable approach?

    Cheers,

    Xav.
    Attached Files Attached Files

Similar Threads

  1. Encryption, approach request
    By unix7777 in forum Newbie
    Replies: 5
    Last Post: 22nd February 2010, 15:19
  2. double buffering
    By HelloDan in forum Newbie
    Replies: 10
    Last Post: 31st March 2009, 03:14
  3. thread - right approach?
    By heinz32 in forum Qt Programming
    Replies: 3
    Last Post: 17th June 2008, 17:39
  4. Double Buffering for plot graphs
    By Tavit in forum Qt Programming
    Replies: 0
    Last Post: 20th March 2008, 13:10
  5. what is the best approach
    By baray98 in forum Qt Programming
    Replies: 1
    Last Post: 14th September 2007, 09:02

Tags for this Thread

Bookmarks

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
Qt is a trademark of The Qt Company.