Results 1 to 3 of 3

Thread: Problem with QThread being interrupted

  1. #1
    Join Date
    Feb 2011
    Posts
    8
    Qt products
    Qt5
    Platforms
    Unix/X11 Windows

    Default Problem with QThread being interrupted

    I am building a program to stream video packets over the network. My code functions properly for the most part, but the part of my code that I use for timing has problems. Frequently datagrams are sent 50-200+ ms late. I am using a qthread spinning in a while loop that checks a qElapsedTimer. Any gap between datagrams that is greater then 200ms causes problems with the mux hardware downstream, so this problem is unacceptable. I have tested to make sure the send datagram function is executed in the correct thread. What am I doing wrong?

    Header File:
    Qt Code:
    1. #ifndef STREAM_H
    2. #define STREAM_H
    3.  
    4. #include "QtCore"
    5. #include <QUdpSocket>
    6. #include "QThread"
    7.  
    8. typedef QList< QByteArray > BufferList;
    9. Q_DECLARE_METATYPE(BufferList);
    10. Q_DECLARE_METATYPE(QHostAddress);
    11.  
    12. class Worker : public QObject
    13. {
    14. Q_OBJECT
    15.  
    16. public:
    17. explicit Worker();
    18. ~Worker();
    19. bool loop;
    20.  
    21. public slots:
    22. void send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);
    23.  
    24. signals:
    25. void done_streaming();
    26.  
    27. private:
    28. void save_file(BufferList datagram_buffer);
    29. QUdpSocket *udp_streaming_socket;
    30. int datagram_index , socket_state;
    31.  
    32. QElapsedTimer elapsed_timer;
    33. QHostAddress ip_stream_address;
    34. qint16 ip_stream_port;
    35. };
    36.  
    37. class stream : public QObject
    38. {
    39. Q_OBJECT
    40.  
    41. QThread workerThread;
    42. public:
    43. explicit stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent);
    44. ~stream();
    45.  
    46. public slots:
    47. void send_udp_packet();
    48. void make_udp_packet(QByteArray packet);
    49. void done_with_worker();
    50. signals:
    51. void start_stream(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);
    52. void done_with_stream();
    53. private:
    54. QHostAddress ip_stream_address;
    55. qint16 ip_stream_port;
    56.  
    57. BufferList datagram_buffer;
    58. QByteArray datagram, packet;
    59. Worker *worker;
    60. qint64 timer_freq;
    61. int packet_index , KbitRate , pkts_PerDgram , packet_size , dgram_size;
    62. };
    63. #endif // STREAM_H
    To copy to clipboard, switch view to plain text mode 

    Streaming Code:
    Qt Code:
    1. #include "stream.h"
    2. #include "windows.h"
    3. /// ========================================================================================================
    4. stream::stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent) :
    5. QObject(parent)
    6. {
    7. qRegisterMetaType<QHostAddress>("QHostAddress");
    8. qRegisterMetaType<BufferList>("BufferList");
    9.  
    10. KbitRate = kBitRate;
    11. ip_stream_address = stream_addr;
    12. ip_stream_port = stream_port;
    13.  
    14. timer_freq = 8*pkt_size*pktsPerDgram; // 8 bits per byte, ms between packets.
    15. timer_freq = timer_freq*1000000;
    16. timer_freq = timer_freq/(kBitRate);
    17.  
    18. worker = new Worker;
    19.  
    20. connect(&workerThread, &QThread::finished, worker, &QObject::deleteLater);
    21. connect(this, &stream::start_stream , worker, &Worker::send_datagrams );
    22. connect(worker,SIGNAL(done_streaming()),this,SLOT(done_with_worker()));
    23. worker->moveToThread(&workerThread);
    24. workerThread.start(QThread::TimeCriticalPriority);
    25.  
    26. // Prepare constants to build datagram.
    27. pkts_PerDgram = pktsPerDgram;
    28. packet_size = pkt_size;
    29. dgram_size = pkt_size*pktsPerDgram;
    30. qDebug()<< "thread id = " << QThread::currentThreadId();
    31. //qDebug()<< "workerthread id = "<< workerThread.currentThreadId();
    32. qDebug()<< "priority is " <<workerThread.priority();
    33. packet.fill('1',packet_size);
    34. datagram.clear();
    35. packet_index=0;
    36. }
    37.  
    38. stream::~stream()
    39. {
    40. qDebug("stopping stream");
    41. worker->loop=false;
    42. workerThread.quit();
    43. workerThread.wait();
    44. }
    45.  
    46. void stream::make_udp_packet(QByteArray packet)
    47. {
    48. // QByteArray((char*)pktBuffer,packet_size);
    49. if (packet_index < pkts_PerDgram)
    50. {
    51. datagram.append(packet);
    52. packet_index++;
    53.  
    54. if(packet_index == pkts_PerDgram)
    55. {
    56. datagram_buffer.append( datagram );
    57. datagram.clear();
    58. packet_index=0;
    59. }
    60. }
    61. }
    62.  
    63. void stream::send_udp_packet()
    64. {
    65. qDebug()<< "stream thread id = " << QThread::currentThreadId();
    66. start_stream( ip_stream_address, ip_stream_port , timer_freq ,datagram_buffer);
    67. }
    68.  
    69. void stream::done_with_worker()
    70. {
    71. qDebug("done with worker");
    72. emit done_with_stream();
    73. }
    74.  
    75. Worker::Worker()
    76. {
    77. loop=true;
    78. qDebug("Constructor");
    79. }
    80.  
    81. Worker::~Worker()
    82. {
    83. qDebug("Closing Socket");
    84. udp_streaming_socket->close();
    85. }
    86. void Worker::send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer)
    87. {
    88. udp_streaming_socket = new QUdpSocket(this); // switch to QUdpSocket and remove winsock when Qt is patched
    89.  
    90. ip_stream_address = stream_addr;
    91. ip_stream_port = stream_port;
    92. datagram_index = 0;
    93. qint64 start_time, end_time;
    94. elapsed_timer.start();
    95. qDebug()<< "worker thread id = " << QThread::currentThreadId();
    96. //qDebug()<< "thread priority = " << QThread::Priority;
    97. while(loop)
    98. {
    99. while(elapsed_timer.nsecsElapsed() <= timer_freq)
    100. {
    101. // spin until time to send next packet
    102. }
    103. if(datagram_buffer.size()<=datagram_index)
    104. {
    105. emit done_streaming();
    106. save_file(datagram_buffer);
    107. qDebug("done with stream");
    108. break;
    109. }
    110. if(datagram_buffer.size()>datagram_index)
    111. {
    112. start_time = elapsed_timer.nsecsElapsed();
    113. socket_state = udp_streaming_socket->writeDatagram(datagram_buffer.at(datagram_index).data(), datagram_buffer.at(datagram_index).size() ,ip_stream_address , ip_stream_port);
    114. udp_streaming_socket->waitForBytesWritten();
    115. //end_time = elapsed_timer.nsecsElapsed();
    116. if(start_time - timer_freq > 50000)
    117. {
    118. qDebug()<< "Late by = " << start_time - timer_freq << " ns at datagram " << datagram_index ;
    119. }
    120. elapsed_timer.restart(); // start elapsed timer
    121. if(socket_state<=0)
    122. {
    123. qDebug()<< "Socket error "<< udp_streaming_socket->error();
    124. }
    125. datagram_index++;
    126. }
    127. }
    128. udp_streaming_socket->close();
    129. qDebug()<< "worker thread id = " << QThread::currentThreadId();
    130. //qDebug()<< "thread priority = " << QThread::Priority;
    131. }
    132.  
    133. void Worker::save_file(BufferList datagram_buffer)
    134. {
    135. QFile file("c:\\3abn\\qudp_test.ts");
    136. if(file.open(QIODevice::WriteOnly))
    137. {
    138. QDataStream out(&file);
    139. for(int i=0; i<datagram_buffer.size();i++)
    140. out.writeRawData(datagram_buffer.at(i).data(),datagram_buffer.at(i).size());
    141. file.close();
    142. }
    143. }
    To copy to clipboard, switch view to plain text mode 

  2. #2
    Join Date
    Feb 2011
    Posts
    8
    Qt products
    Qt5
    Platforms
    Unix/X11 Windows

    Default Re: Problem with QThread being interrupted

    Might this have to do with the process priority? (see here)

    I noticed this becoming a problem most when I moved from a very simple GUI to one with a system tray icon and other stuff going on in the main GUI thread. I need real time priority for the worker thread, but not for the GUI. I can enforce that this will be running on a multi core machine, so consuming all cycles of one core for this isn't a problem. Time accuracy is most important, although going with a real time OS is something I would like to avoid. Is there a cross platform way to manage the process priority, or do I need to make blocks of code for each platform I want to support? Also, if I set the process priority as real time will that basically mean I will be tying up at least two CPU cores even if I demote the thread priority of the GUI thread? It seems a little wrong to assign real time priority to the whole process of a program with a system tray icon...

  3. #3
    Join Date
    Jan 2006
    Location
    Warsaw, Poland
    Posts
    33,363
    Thanks
    3
    Thanked 5,012 Times in 4,791 Posts
    Qt products
    Qt3 Qt4 Qt5 Qt/Embedded
    Platforms
    Unix/X11 Windows Android Maemo/MeeGo
    Wiki edits
    10

    Default Re: Problem with QThread being interrupted

    What is the point of the while loop? Why aren't you using a timer? Why are you using waitForBytesWritten?
    Your biological and technological distinctiveness will be added to our own. Resistance is futile.

    Please ask Qt related questions on the forum and not using private messages or visitor messages.


Similar Threads

  1. Replies: 1
    Last Post: 4th October 2012, 14:49
  2. Replies: 5
    Last Post: 26th January 2011, 19:12
  3. QThread Problem
    By MrShahi in forum Qt Programming
    Replies: 1
    Last Post: 8th July 2009, 10:14
  4. Qthread Problem
    By sroger in forum Qt Programming
    Replies: 0
    Last Post: 2nd March 2009, 07:35
  5. QThread problem
    By MrShahi in forum Qt Programming
    Replies: 6
    Last Post: 15th July 2008, 12:28

Tags for this Thread

Bookmarks

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
Digia, Qt and their respective logos are trademarks of Digia Plc in Finland and/or other countries worldwide.