KoosKoets
4th April 2007, 09:42
Hi folks,
I'm new to Qt, so please forgive any ignorance... Note that I did read whatever I could find on the net and the Qt documentation on sockets and threading, but I must be overlooking something...
I need to develop a distributed system of applications. One of these will use a GUI, so our choice was to develop everything with Qt. Communication between these applications will be on TCP based sockets. The apps will have to use multithreading for their tasks and communication processing.
I started off with a server and a client. The server sends packets to the client and the client just dumps these. The connection set-up between server and client works fine. The sending of the data by the server works fine also.
For the client I've made a Connection class derived from QThread. The run function of this QThread waits forReadyRead(-1), after which the data is streamed into a container. Whenever a container is read completely, it is placed in a QQueue (guarded with a QMutex) and a QSemaphore is set.
The main loop blocks on the QSemphore. Whenever data is available the container is taken from the QQueue (guarded with a QMutex), it is dumped and deleted.
At least, that is what I desire.
The client does receive data. The main loop does dump this data. But for no apparent reason the process of reading/dumping stops. It seems like my threads are not scheduled anymore. Sometimes immedately, sometimes only in the end. In DDD it runs without problems :(
I assume I'm doing something wrong with this QThread business, but it could have to do with this waitForReadRead(-1) also. To be honest, I'm lost.
I've tried also to use an additional Consumer class, with its own QThread which blocks on the semaphore->acquire. This didn't solve my problem either :(
Below, some excerpts of my code. I hope somebody is able to point my in the right direction...
main of client
int main(int argc, char *argv[])
{
QCoreApplication lApplication(argc, argv);
Connection *lClient = new Connection(QHostAddress("172.16.140.32"), 5001);
QSemaphore *lSema = lClient->getSemaphore();
CCSDS *lPacket; /// container
int i = 0;
lClient->start(QThread::LowPriority);
while (true) {
lSema->acquire();
lPacket = lClient->recvCCSDS();
printf("%d\n", ++i);
if (lPacket) {
lPacket->dump();
delete lPacket;
}
}
}
Most of the "magic" happens in this Connection class:
Connection::Connection(const QHostAddress aAddress, const qint16 aPort)
: QThread(), mSocket(NULL), mQueue(), mMutex(), mSemaphore(0)
{
/// create the socket
mSocket = new QTcpSocket();
mSocket->setTextModeEnabled(false);
mSocket->connectToHost(aAddress, aPort);
mSocket->waitForConnected(-1):
}
void Connection::run()
{
int lCount = 0;
int lBufLength = 0; /// size of the buffer not yet read
int lPacketLength = 0; /// length of current CCSDS packet
int lReadLength = 0; /// length of data read
int lStart = 0;
char *lData = NULL;
CCSDS *lPacket = NULL;
QDataStream lStream(mSocket);
lStream.setVersion(QDataStream::Qt_4_2);
while (true) {
if (!mSocket || mSocket->state() == QAbstractSocket::UnconnectedState)
return;
while (lBufLength < CCSDS_PRIM_SIZE) {
if (!mSocket->waitForReadyRead(-1))
return;
lBufLength = mSocket->bytesAvailable();
}
while (lBufLength) {
lBufLength = mSocket->bytesAvailable();
lPacket = new CCSDS();
lData = lPacket->data();
lStart = 0;
/// read only the primary header
lReadLength = lStream.readRawData(lData, CCSDS_PRIM_SIZE);
if (lReadLength == -1)
return;
/// adapt lBufLength and lStart with part which has been read
lStart += lReadLength;
lBufLength -= lReadLength;
/// primary header available: determine packet length
lPacketLength = 1 + (lData[4] << 8) + lData[5];
while (lBufLength < lPacketLength) {
if (!mSocket->waitForReadyRead(-1))
return;
lBufLength += mSocket->bytesAvailable();
}
lReadLength = lStream.readRawData(&lData[lStart], MIN(lPacketLength,lBufLength));
if (lReadLength == -1) {
return;
}
lStart += lReadLength;
lBufLength -= lReadLength;
/// packet received: put on queue
mMutex.lock();
mQueue.enqueue(lPacket);
mMutex.unlock();
lPacket = NULL;
lCount++;
}
/// inform user: set semaphore
mSemaphore.release(lCount);
lCount = 0;
}
}
CCSDS *Connection::recvCCSDS()
{
CCSDS *lPacket = NULL;
/// critical section
mMutex.lock();
if (!mQueue.isEmpty()) {
lPacket = mQueue.dequeue();
}
mMutex.unlock();
return lPacket;
}
I'm new to Qt, so please forgive any ignorance... Note that I did read whatever I could find on the net and the Qt documentation on sockets and threading, but I must be overlooking something...
I need to develop a distributed system of applications. One of these will use a GUI, so our choice was to develop everything with Qt. Communication between these applications will be on TCP based sockets. The apps will have to use multithreading for their tasks and communication processing.
I started off with a server and a client. The server sends packets to the client and the client just dumps these. The connection set-up between server and client works fine. The sending of the data by the server works fine also.
For the client I've made a Connection class derived from QThread. The run function of this QThread waits forReadyRead(-1), after which the data is streamed into a container. Whenever a container is read completely, it is placed in a QQueue (guarded with a QMutex) and a QSemaphore is set.
The main loop blocks on the QSemphore. Whenever data is available the container is taken from the QQueue (guarded with a QMutex), it is dumped and deleted.
At least, that is what I desire.
The client does receive data. The main loop does dump this data. But for no apparent reason the process of reading/dumping stops. It seems like my threads are not scheduled anymore. Sometimes immedately, sometimes only in the end. In DDD it runs without problems :(
I assume I'm doing something wrong with this QThread business, but it could have to do with this waitForReadRead(-1) also. To be honest, I'm lost.
I've tried also to use an additional Consumer class, with its own QThread which blocks on the semaphore->acquire. This didn't solve my problem either :(
Below, some excerpts of my code. I hope somebody is able to point my in the right direction...
main of client
int main(int argc, char *argv[])
{
QCoreApplication lApplication(argc, argv);
Connection *lClient = new Connection(QHostAddress("172.16.140.32"), 5001);
QSemaphore *lSema = lClient->getSemaphore();
CCSDS *lPacket; /// container
int i = 0;
lClient->start(QThread::LowPriority);
while (true) {
lSema->acquire();
lPacket = lClient->recvCCSDS();
printf("%d\n", ++i);
if (lPacket) {
lPacket->dump();
delete lPacket;
}
}
}
Most of the "magic" happens in this Connection class:
Connection::Connection(const QHostAddress aAddress, const qint16 aPort)
: QThread(), mSocket(NULL), mQueue(), mMutex(), mSemaphore(0)
{
/// create the socket
mSocket = new QTcpSocket();
mSocket->setTextModeEnabled(false);
mSocket->connectToHost(aAddress, aPort);
mSocket->waitForConnected(-1):
}
void Connection::run()
{
int lCount = 0;
int lBufLength = 0; /// size of the buffer not yet read
int lPacketLength = 0; /// length of current CCSDS packet
int lReadLength = 0; /// length of data read
int lStart = 0;
char *lData = NULL;
CCSDS *lPacket = NULL;
QDataStream lStream(mSocket);
lStream.setVersion(QDataStream::Qt_4_2);
while (true) {
if (!mSocket || mSocket->state() == QAbstractSocket::UnconnectedState)
return;
while (lBufLength < CCSDS_PRIM_SIZE) {
if (!mSocket->waitForReadyRead(-1))
return;
lBufLength = mSocket->bytesAvailable();
}
while (lBufLength) {
lBufLength = mSocket->bytesAvailable();
lPacket = new CCSDS();
lData = lPacket->data();
lStart = 0;
/// read only the primary header
lReadLength = lStream.readRawData(lData, CCSDS_PRIM_SIZE);
if (lReadLength == -1)
return;
/// adapt lBufLength and lStart with part which has been read
lStart += lReadLength;
lBufLength -= lReadLength;
/// primary header available: determine packet length
lPacketLength = 1 + (lData[4] << 8) + lData[5];
while (lBufLength < lPacketLength) {
if (!mSocket->waitForReadyRead(-1))
return;
lBufLength += mSocket->bytesAvailable();
}
lReadLength = lStream.readRawData(&lData[lStart], MIN(lPacketLength,lBufLength));
if (lReadLength == -1) {
return;
}
lStart += lReadLength;
lBufLength -= lReadLength;
/// packet received: put on queue
mMutex.lock();
mQueue.enqueue(lPacket);
mMutex.unlock();
lPacket = NULL;
lCount++;
}
/// inform user: set semaphore
mSemaphore.release(lCount);
lCount = 0;
}
}
CCSDS *Connection::recvCCSDS()
{
CCSDS *lPacket = NULL;
/// critical section
mMutex.lock();
if (!mQueue.isEmpty()) {
lPacket = mQueue.dequeue();
}
mMutex.unlock();
return lPacket;
}