I am building a program to stream video packets over the network. My code functions properly for the most part, but the part of my code that I use for timing has problems. Frequently datagrams are sent 50-200+ ms late. I am using a qthread spinning in a while loop that checks a qElapsedTimer. Any gap between datagrams that is greater then 200ms causes problems with the mux hardware downstream, so this problem is unacceptable. I have tested to make sure the send datagram function is executed in the correct thread. What am I doing wrong?

Header File:
Qt Code:
  1. #ifndef STREAM_H
  2. #define STREAM_H
  3.  
  4. #include "QtCore"
  5. #include <QUdpSocket>
  6. #include "QThread"
  7.  
  8. typedef QList< QByteArray > BufferList;
  9. Q_DECLARE_METATYPE(BufferList);
  10. Q_DECLARE_METATYPE(QHostAddress);
  11.  
  12. class Worker : public QObject
  13. {
  14. Q_OBJECT
  15.  
  16. public:
  17. explicit Worker();
  18. ~Worker();
  19. bool loop;
  20.  
  21. public slots:
  22. void send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);
  23.  
  24. signals:
  25. void done_streaming();
  26.  
  27. private:
  28. void save_file(BufferList datagram_buffer);
  29. QUdpSocket *udp_streaming_socket;
  30. int datagram_index , socket_state;
  31.  
  32. QElapsedTimer elapsed_timer;
  33. QHostAddress ip_stream_address;
  34. qint16 ip_stream_port;
  35. };
  36.  
  37. class stream : public QObject
  38. {
  39. Q_OBJECT
  40.  
  41. QThread workerThread;
  42. public:
  43. explicit stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent);
  44. ~stream();
  45.  
  46. public slots:
  47. void send_udp_packet();
  48. void make_udp_packet(QByteArray packet);
  49. void done_with_worker();
  50. signals:
  51. void start_stream(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);
  52. void done_with_stream();
  53. private:
  54. QHostAddress ip_stream_address;
  55. qint16 ip_stream_port;
  56.  
  57. BufferList datagram_buffer;
  58. QByteArray datagram, packet;
  59. Worker *worker;
  60. qint64 timer_freq;
  61. int packet_index , KbitRate , pkts_PerDgram , packet_size , dgram_size;
  62. };
  63. #endif // STREAM_H
To copy to clipboard, switch view to plain text mode 

Streaming Code:
Qt Code:
  1. #include "stream.h"
  2. #include "windows.h"
  3. /// ========================================================================================================
  4. stream::stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent) :
  5. QObject(parent)
  6. {
  7. qRegisterMetaType<QHostAddress>("QHostAddress");
  8. qRegisterMetaType<BufferList>("BufferList");
  9.  
  10. KbitRate = kBitRate;
  11. ip_stream_address = stream_addr;
  12. ip_stream_port = stream_port;
  13.  
  14. timer_freq = 8*pkt_size*pktsPerDgram; // 8 bits per byte, ms between packets.
  15. timer_freq = timer_freq*1000000;
  16. timer_freq = timer_freq/(kBitRate);
  17.  
  18. worker = new Worker;
  19.  
  20. connect(&workerThread, &QThread::finished, worker, &QObject::deleteLater);
  21. connect(this, &stream::start_stream , worker, &Worker::send_datagrams );
  22. connect(worker,SIGNAL(done_streaming()),this,SLOT(done_with_worker()));
  23. worker->moveToThread(&workerThread);
  24. workerThread.start(QThread::TimeCriticalPriority);
  25.  
  26. // Prepare constants to build datagram.
  27. pkts_PerDgram = pktsPerDgram;
  28. packet_size = pkt_size;
  29. dgram_size = pkt_size*pktsPerDgram;
  30. qDebug()<< "thread id = " << QThread::currentThreadId();
  31. //qDebug()<< "workerthread id = "<< workerThread.currentThreadId();
  32. qDebug()<< "priority is " <<workerThread.priority();
  33. packet.fill('1',packet_size);
  34. datagram.clear();
  35. packet_index=0;
  36. }
  37.  
  38. stream::~stream()
  39. {
  40. qDebug("stopping stream");
  41. worker->loop=false;
  42. workerThread.quit();
  43. workerThread.wait();
  44. }
  45.  
  46. void stream::make_udp_packet(QByteArray packet)
  47. {
  48. // QByteArray((char*)pktBuffer,packet_size);
  49. if (packet_index < pkts_PerDgram)
  50. {
  51. datagram.append(packet);
  52. packet_index++;
  53.  
  54. if(packet_index == pkts_PerDgram)
  55. {
  56. datagram_buffer.append( datagram );
  57. datagram.clear();
  58. packet_index=0;
  59. }
  60. }
  61. }
  62.  
  63. void stream::send_udp_packet()
  64. {
  65. qDebug()<< "stream thread id = " << QThread::currentThreadId();
  66. start_stream( ip_stream_address, ip_stream_port , timer_freq ,datagram_buffer);
  67. }
  68.  
  69. void stream::done_with_worker()
  70. {
  71. qDebug("done with worker");
  72. emit done_with_stream();
  73. }
  74.  
  75. Worker::Worker()
  76. {
  77. loop=true;
  78. qDebug("Constructor");
  79. }
  80.  
  81. Worker::~Worker()
  82. {
  83. qDebug("Closing Socket");
  84. udp_streaming_socket->close();
  85. }
  86. void Worker::send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer)
  87. {
  88. udp_streaming_socket = new QUdpSocket(this); // switch to QUdpSocket and remove winsock when Qt is patched
  89.  
  90. ip_stream_address = stream_addr;
  91. ip_stream_port = stream_port;
  92. datagram_index = 0;
  93. qint64 start_time, end_time;
  94. elapsed_timer.start();
  95. qDebug()<< "worker thread id = " << QThread::currentThreadId();
  96. //qDebug()<< "thread priority = " << QThread::Priority;
  97. while(loop)
  98. {
  99. while(elapsed_timer.nsecsElapsed() <= timer_freq)
  100. {
  101. // spin until time to send next packet
  102. }
  103. if(datagram_buffer.size()<=datagram_index)
  104. {
  105. emit done_streaming();
  106. save_file(datagram_buffer);
  107. qDebug("done with stream");
  108. break;
  109. }
  110. if(datagram_buffer.size()>datagram_index)
  111. {
  112. start_time = elapsed_timer.nsecsElapsed();
  113. socket_state = udp_streaming_socket->writeDatagram(datagram_buffer.at(datagram_index).data(), datagram_buffer.at(datagram_index).size() ,ip_stream_address , ip_stream_port);
  114. udp_streaming_socket->waitForBytesWritten();
  115. //end_time = elapsed_timer.nsecsElapsed();
  116. if(start_time - timer_freq > 50000)
  117. {
  118. qDebug()<< "Late by = " << start_time - timer_freq << " ns at datagram " << datagram_index ;
  119. }
  120. elapsed_timer.restart(); // start elapsed timer
  121. if(socket_state<=0)
  122. {
  123. qDebug()<< "Socket error "<< udp_streaming_socket->error();
  124. }
  125. datagram_index++;
  126. }
  127. }
  128. udp_streaming_socket->close();
  129. qDebug()<< "worker thread id = " << QThread::currentThreadId();
  130. //qDebug()<< "thread priority = " << QThread::Priority;
  131. }
  132.  
  133. void Worker::save_file(BufferList datagram_buffer)
  134. {
  135. QFile file("c:\\3abn\\qudp_test.ts");
  136. if(file.open(QIODevice::WriteOnly))
  137. {
  138. QDataStream out(&file);
  139. for(int i=0; i<datagram_buffer.size();i++)
  140. out.writeRawData(datagram_buffer.at(i).data(),datagram_buffer.at(i).size());
  141. file.close();
  142. }
  143. }
To copy to clipboard, switch view to plain text mode