#ifndef STREAM_H
#define STREAM_H
#include "QtCore"
#include <QUdpSocket>
#include "QThread"
Q_DECLARE_METATYPE(BufferList);
{
Q_OBJECT
public:
explicit Worker();
~Worker();
bool loop;
public slots:
void send_datagrams
(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer
);
signals:
void done_streaming();
private:
void save_file(BufferList datagram_buffer);
int datagram_index , socket_state;
QElapsedTimer elapsed_timer;
qint16 ip_stream_port;
};
{
Q_OBJECT
public:
explicit stream
(QHostAddress stream_addr, qint16 stream_port,
int kBitRate,
int pktsPerDgram,
int pkt_size,
QObject *parent
);
~stream();
public slots:
void send_udp_packet();
void done_with_worker();
signals:
void start_stream
(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer
);
void done_with_stream();
private:
qint16 ip_stream_port;
BufferList datagram_buffer;
Worker *worker;
qint64 timer_freq;
int packet_index , KbitRate , pkts_PerDgram , packet_size , dgram_size;
};
#endif // STREAM_H
#ifndef STREAM_H
#define STREAM_H
#include "QtCore"
#include <QUdpSocket>
#include "QThread"
typedef QList< QByteArray > BufferList;
Q_DECLARE_METATYPE(BufferList);
Q_DECLARE_METATYPE(QHostAddress);
class Worker : public QObject
{
Q_OBJECT
public:
explicit Worker();
~Worker();
bool loop;
public slots:
void send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);
signals:
void done_streaming();
private:
void save_file(BufferList datagram_buffer);
QUdpSocket *udp_streaming_socket;
int datagram_index , socket_state;
QElapsedTimer elapsed_timer;
QHostAddress ip_stream_address;
qint16 ip_stream_port;
};
class stream : public QObject
{
Q_OBJECT
QThread workerThread;
public:
explicit stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent);
~stream();
public slots:
void send_udp_packet();
void make_udp_packet(QByteArray packet);
void done_with_worker();
signals:
void start_stream(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);
void done_with_stream();
private:
QHostAddress ip_stream_address;
qint16 ip_stream_port;
BufferList datagram_buffer;
QByteArray datagram, packet;
Worker *worker;
qint64 timer_freq;
int packet_index , KbitRate , pkts_PerDgram , packet_size , dgram_size;
};
#endif // STREAM_H
To copy to clipboard, switch view to plain text mode
#include "stream.h"
#include "windows.h"
/// ========================================================================================================
stream
::stream(QHostAddress stream_addr, qint16 stream_port,
int kBitRate,
int pktsPerDgram,
int pkt_size,
QObject *parent
) :{
qRegisterMetaType<QHostAddress>("QHostAddress");
qRegisterMetaType<BufferList>("BufferList");
KbitRate = kBitRate;
ip_stream_address = stream_addr;
ip_stream_port = stream_port;
timer_freq = 8*pkt_size*pktsPerDgram; // 8 bits per byte, ms between packets.
timer_freq = timer_freq*1000000;
timer_freq = timer_freq/(kBitRate);
worker = new Worker;
connect(&workerThread, &QThread::finished, worker, &QObject::deleteLater);
connect(this, &stream::start_stream , worker, &Worker::send_datagrams );
connect(worker,SIGNAL(done_streaming()),this,SLOT(done_with_worker()));
worker->moveToThread(&workerThread);
workerThread.
start(QThread::TimeCriticalPriority);
// Prepare constants to build datagram.
pkts_PerDgram = pktsPerDgram;
packet_size = pkt_size;
dgram_size = pkt_size*pktsPerDgram;
qDebug
()<<
"thread id = " <<
QThread::currentThreadId();
//qDebug()<< "workerthread id = "<< workerThread.currentThreadId();
qDebug()<< "priority is " <<workerThread.priority();
packet.fill('1',packet_size);
datagram.clear();
packet_index=0;
}
stream::~stream()
{
qDebug("stopping stream");
worker->loop=false;
workerThread.quit();
workerThread.wait();
}
{
// QByteArray((char*)pktBuffer,packet_size);
if (packet_index < pkts_PerDgram)
{
datagram.append(packet);
packet_index++;
if(packet_index == pkts_PerDgram)
{
datagram_buffer.append( datagram );
datagram.clear();
packet_index=0;
}
}
}
void stream::send_udp_packet()
{
qDebug
()<<
"stream thread id = " <<
QThread::currentThreadId();
start_stream( ip_stream_address, ip_stream_port , timer_freq ,datagram_buffer);
}
void stream::done_with_worker()
{
qDebug("done with worker");
emit done_with_stream();
}
Worker::Worker()
{
loop=true;
qDebug("Constructor");
}
Worker::~Worker()
{
qDebug("Closing Socket");
udp_streaming_socket->close();
}
void Worker
::send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer
) {
udp_streaming_socket
= new QUdpSocket(this);
// switch to QUdpSocket and remove winsock when Qt is patched
ip_stream_address = stream_addr;
ip_stream_port = stream_port;
datagram_index = 0;
qint64 start_time, end_time;
elapsed_timer.start();
qDebug
()<<
"worker thread id = " <<
QThread::currentThreadId();
//qDebug()<< "thread priority = " << QThread::Priority;
while(loop)
{
while(elapsed_timer.nsecsElapsed() <= timer_freq)
{
// spin until time to send next packet
}
if(datagram_buffer.size()<=datagram_index)
{
emit done_streaming();
save_file(datagram_buffer);
qDebug("done with stream");
break;
}
if(datagram_buffer.size()>datagram_index)
{
start_time = elapsed_timer.nsecsElapsed();
socket_state = udp_streaming_socket->writeDatagram(datagram_buffer.at(datagram_index).data(), datagram_buffer.at(datagram_index).size() ,ip_stream_address , ip_stream_port);
udp_streaming_socket->waitForBytesWritten();
//end_time = elapsed_timer.nsecsElapsed();
if(start_time - timer_freq > 50000)
{
qDebug()<< "Late by = " << start_time - timer_freq << " ns at datagram " << datagram_index ;
}
elapsed_timer.restart(); // start elapsed timer
if(socket_state<=0)
{
qDebug()<< "Socket error "<< udp_streaming_socket->error();
}
datagram_index++;
}
}
udp_streaming_socket->close();
qDebug
()<<
"worker thread id = " <<
QThread::currentThreadId();
//qDebug()<< "thread priority = " << QThread::Priority;
}
void Worker::save_file(BufferList datagram_buffer)
{
QFile file("c:\\3abn\\qudp_test.ts");
{
for(int i=0; i<datagram_buffer.size();i++)
out.writeRawData(datagram_buffer.at(i).data(),datagram_buffer.at(i).size());
file.close();
}
}
#include "stream.h"
#include "windows.h"
/// ========================================================================================================
stream::stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent) :
QObject(parent)
{
qRegisterMetaType<QHostAddress>("QHostAddress");
qRegisterMetaType<BufferList>("BufferList");
KbitRate = kBitRate;
ip_stream_address = stream_addr;
ip_stream_port = stream_port;
timer_freq = 8*pkt_size*pktsPerDgram; // 8 bits per byte, ms between packets.
timer_freq = timer_freq*1000000;
timer_freq = timer_freq/(kBitRate);
worker = new Worker;
connect(&workerThread, &QThread::finished, worker, &QObject::deleteLater);
connect(this, &stream::start_stream , worker, &Worker::send_datagrams );
connect(worker,SIGNAL(done_streaming()),this,SLOT(done_with_worker()));
worker->moveToThread(&workerThread);
workerThread.start(QThread::TimeCriticalPriority);
// Prepare constants to build datagram.
pkts_PerDgram = pktsPerDgram;
packet_size = pkt_size;
dgram_size = pkt_size*pktsPerDgram;
qDebug()<< "thread id = " << QThread::currentThreadId();
//qDebug()<< "workerthread id = "<< workerThread.currentThreadId();
qDebug()<< "priority is " <<workerThread.priority();
packet.fill('1',packet_size);
datagram.clear();
packet_index=0;
}
stream::~stream()
{
qDebug("stopping stream");
worker->loop=false;
workerThread.quit();
workerThread.wait();
}
void stream::make_udp_packet(QByteArray packet)
{
// QByteArray((char*)pktBuffer,packet_size);
if (packet_index < pkts_PerDgram)
{
datagram.append(packet);
packet_index++;
if(packet_index == pkts_PerDgram)
{
datagram_buffer.append( datagram );
datagram.clear();
packet_index=0;
}
}
}
void stream::send_udp_packet()
{
qDebug()<< "stream thread id = " << QThread::currentThreadId();
start_stream( ip_stream_address, ip_stream_port , timer_freq ,datagram_buffer);
}
void stream::done_with_worker()
{
qDebug("done with worker");
emit done_with_stream();
}
Worker::Worker()
{
loop=true;
qDebug("Constructor");
}
Worker::~Worker()
{
qDebug("Closing Socket");
udp_streaming_socket->close();
}
void Worker::send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer)
{
udp_streaming_socket = new QUdpSocket(this); // switch to QUdpSocket and remove winsock when Qt is patched
ip_stream_address = stream_addr;
ip_stream_port = stream_port;
datagram_index = 0;
qint64 start_time, end_time;
elapsed_timer.start();
qDebug()<< "worker thread id = " << QThread::currentThreadId();
//qDebug()<< "thread priority = " << QThread::Priority;
while(loop)
{
while(elapsed_timer.nsecsElapsed() <= timer_freq)
{
// spin until time to send next packet
}
if(datagram_buffer.size()<=datagram_index)
{
emit done_streaming();
save_file(datagram_buffer);
qDebug("done with stream");
break;
}
if(datagram_buffer.size()>datagram_index)
{
start_time = elapsed_timer.nsecsElapsed();
socket_state = udp_streaming_socket->writeDatagram(datagram_buffer.at(datagram_index).data(), datagram_buffer.at(datagram_index).size() ,ip_stream_address , ip_stream_port);
udp_streaming_socket->waitForBytesWritten();
//end_time = elapsed_timer.nsecsElapsed();
if(start_time - timer_freq > 50000)
{
qDebug()<< "Late by = " << start_time - timer_freq << " ns at datagram " << datagram_index ;
}
elapsed_timer.restart(); // start elapsed timer
if(socket_state<=0)
{
qDebug()<< "Socket error "<< udp_streaming_socket->error();
}
datagram_index++;
}
}
udp_streaming_socket->close();
qDebug()<< "worker thread id = " << QThread::currentThreadId();
//qDebug()<< "thread priority = " << QThread::Priority;
}
void Worker::save_file(BufferList datagram_buffer)
{
QFile file("c:\\3abn\\qudp_test.ts");
if(file.open(QIODevice::WriteOnly))
{
QDataStream out(&file);
for(int i=0; i<datagram_buffer.size();i++)
out.writeRawData(datagram_buffer.at(i).data(),datagram_buffer.at(i).size());
file.close();
}
}
To copy to clipboard, switch view to plain text mode
Bookmarks