PDA

View Full Version : Problem with QThread being interrupted



zcybercomputing
11th November 2014, 02:27
I am building a program to stream video packets over the network. My code functions properly for the most part, but the part of my code that I use for timing has problems. Frequently datagrams are sent 50-200+ ms late. I am using a qthread spinning in a while loop that checks a qElapsedTimer. Any gap between datagrams that is greater then 200ms causes problems with the mux hardware downstream, so this problem is unacceptable. I have tested to make sure the send datagram function is executed in the correct thread. What am I doing wrong?

Header File:


#ifndef STREAM_H
#define STREAM_H

#include "QtCore"
#include <QUdpSocket>
#include "QThread"

typedef QList< QByteArray > BufferList;
Q_DECLARE_METATYPE(BufferList);
Q_DECLARE_METATYPE(QHostAddress);

class Worker : public QObject
{
Q_OBJECT

public:
explicit Worker();
~Worker();
bool loop;

public slots:
void send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);

signals:
void done_streaming();

private:
void save_file(BufferList datagram_buffer);
QUdpSocket *udp_streaming_socket;
int datagram_index , socket_state;

QElapsedTimer elapsed_timer;
QHostAddress ip_stream_address;
qint16 ip_stream_port;
};

class stream : public QObject
{
Q_OBJECT

QThread workerThread;
public:
explicit stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent);
~stream();

public slots:
void send_udp_packet();
void make_udp_packet(QByteArray packet);
void done_with_worker();
signals:
void start_stream(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);
void done_with_stream();
private:
QHostAddress ip_stream_address;
qint16 ip_stream_port;

BufferList datagram_buffer;
QByteArray datagram, packet;
Worker *worker;
qint64 timer_freq;
int packet_index , KbitRate , pkts_PerDgram , packet_size , dgram_size;
};
#endif // STREAM_H


Streaming Code:

#include "stream.h"
#include "windows.h"
/// ================================================== ================================================== ====
stream::stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent) :
QObject(parent)
{
qRegisterMetaType<QHostAddress>("QHostAddress");
qRegisterMetaType<BufferList>("BufferList");

KbitRate = kBitRate;
ip_stream_address = stream_addr;
ip_stream_port = stream_port;

timer_freq = 8*pkt_size*pktsPerDgram; // 8 bits per byte, ms between packets.
timer_freq = timer_freq*1000000;
timer_freq = timer_freq/(kBitRate);

worker = new Worker;

connect(&workerThread, &QThread::finished, worker, &QObject::deleteLater);
connect(this, &stream::start_stream , worker, &Worker::send_datagrams );
connect(worker,SIGNAL(done_streaming()),this,SLOT( done_with_worker()));
worker->moveToThread(&workerThread);
workerThread.start(QThread::TimeCriticalPriority);

// Prepare constants to build datagram.
pkts_PerDgram = pktsPerDgram;
packet_size = pkt_size;
dgram_size = pkt_size*pktsPerDgram;
qDebug()<< "thread id = " << QThread::currentThreadId();
//qDebug()<< "workerthread id = "<< workerThread.currentThreadId();
qDebug()<< "priority is " <<workerThread.priority();
packet.fill('1',packet_size);
datagram.clear();
packet_index=0;
}

stream::~stream()
{
qDebug("stopping stream");
worker->loop=false;
workerThread.quit();
workerThread.wait();
}

void stream::make_udp_packet(QByteArray packet)
{
// QByteArray((char*)pktBuffer,packet_size);
if (packet_index < pkts_PerDgram)
{
datagram.append(packet);
packet_index++;

if(packet_index == pkts_PerDgram)
{
datagram_buffer.append( datagram );
datagram.clear();
packet_index=0;
}
}
}

void stream::send_udp_packet()
{
qDebug()<< "stream thread id = " << QThread::currentThreadId();
start_stream( ip_stream_address, ip_stream_port , timer_freq ,datagram_buffer);
}

void stream::done_with_worker()
{
qDebug("done with worker");
emit done_with_stream();
}

Worker::Worker()
{
loop=true;
qDebug("Constructor");
}

Worker::~Worker()
{
qDebug("Closing Socket");
udp_streaming_socket->close();
}
void Worker::send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer)
{
udp_streaming_socket = new QUdpSocket(this); // switch to QUdpSocket and remove winsock when Qt is patched

ip_stream_address = stream_addr;
ip_stream_port = stream_port;
datagram_index = 0;
qint64 start_time, end_time;
elapsed_timer.start();
qDebug()<< "worker thread id = " << QThread::currentThreadId();
//qDebug()<< "thread priority = " << QThread::Priority;
while(loop)
{
while(elapsed_timer.nsecsElapsed() <= timer_freq)
{
// spin until time to send next packet
}
if(datagram_buffer.size()<=datagram_index)
{
emit done_streaming();
save_file(datagram_buffer);
qDebug("done with stream");
break;
}
if(datagram_buffer.size()>datagram_index)
{
start_time = elapsed_timer.nsecsElapsed();
socket_state = udp_streaming_socket->writeDatagram(datagram_buffer.at(datagram_index).d ata(), datagram_buffer.at(datagram_index).size() ,ip_stream_address , ip_stream_port);
udp_streaming_socket->waitForBytesWritten();
//end_time = elapsed_timer.nsecsElapsed();
if(start_time - timer_freq > 50000)
{
qDebug()<< "Late by = " << start_time - timer_freq << " ns at datagram " << datagram_index ;
}
elapsed_timer.restart(); // start elapsed timer
if(socket_state<=0)
{
qDebug()<< "Socket error "<< udp_streaming_socket->error();
}
datagram_index++;
}
}
udp_streaming_socket->close();
qDebug()<< "worker thread id = " << QThread::currentThreadId();
//qDebug()<< "thread priority = " << QThread::Priority;
}

void Worker::save_file(BufferList datagram_buffer)
{
QFile file("c:\\3abn\\qudp_test.ts");
if(file.open(QIODevice::WriteOnly))
{
QDataStream out(&file);
for(int i=0; i<datagram_buffer.size();i++)
out.writeRawData(datagram_buffer.at(i).data(),data gram_buffer.at(i).size());
file.close();
}
}

zcybercomputing
11th November 2014, 05:29
Might this have to do with the process priority? (see here (http://msdn.microsoft.com/en-us/library/windows/desktop/ms685100(v=vs.85).aspx))

I noticed this becoming a problem most when I moved from a very simple GUI to one with a system tray icon and other stuff going on in the main GUI thread. I need real time priority for the worker thread, but not for the GUI. I can enforce that this will be running on a multi core machine, so consuming all cycles of one core for this isn't a problem. Time accuracy is most important, although going with a real time OS is something I would like to avoid. Is there a cross platform way to manage the process priority, or do I need to make blocks of code for each platform I want to support? Also, if I set the process priority as real time will that basically mean I will be tying up at least two CPU cores even if I demote the thread priority of the GUI thread? It seems a little wrong to assign real time priority to the whole process of a program with a system tray icon...

wysota
11th November 2014, 09:34
What is the point of the while loop? Why aren't you using a timer? Why are you using waitForBytesWritten?