PDA

View Full Version : Reading from sockets in a multithreaded program



KoosKoets
4th April 2007, 09:42
Hi folks,

I'm new to Qt, so please forgive any ignorance... Note that I did read whatever I could find on the net and the Qt documentation on sockets and threading, but I must be overlooking something...

I need to develop a distributed system of applications. One of these will use a GUI, so our choice was to develop everything with Qt. Communication between these applications will be on TCP based sockets. The apps will have to use multithreading for their tasks and communication processing.

I started off with a server and a client. The server sends packets to the client and the client just dumps these. The connection set-up between server and client works fine. The sending of the data by the server works fine also.

For the client I've made a Connection class derived from QThread. The run function of this QThread waits forReadyRead(-1), after which the data is streamed into a container. Whenever a container is read completely, it is placed in a QQueue (guarded with a QMutex) and a QSemaphore is set.

The main loop blocks on the QSemphore. Whenever data is available the container is taken from the QQueue (guarded with a QMutex), it is dumped and deleted.

At least, that is what I desire.

The client does receive data. The main loop does dump this data. But for no apparent reason the process of reading/dumping stops. It seems like my threads are not scheduled anymore. Sometimes immedately, sometimes only in the end. In DDD it runs without problems :(

I assume I'm doing something wrong with this QThread business, but it could have to do with this waitForReadRead(-1) also. To be honest, I'm lost.

I've tried also to use an additional Consumer class, with its own QThread which blocks on the semaphore->acquire. This didn't solve my problem either :(

Below, some excerpts of my code. I hope somebody is able to point my in the right direction...

main of client

int main(int argc, char *argv[])
{
QCoreApplication lApplication(argc, argv);

Connection *lClient = new Connection(QHostAddress("172.16.140.32"), 5001);
QSemaphore *lSema = lClient->getSemaphore();
CCSDS *lPacket; /// container
int i = 0;

lClient->start(QThread::LowPriority);

while (true) {
lSema->acquire();
lPacket = lClient->recvCCSDS();
printf("%d\n", ++i);
if (lPacket) {
lPacket->dump();
delete lPacket;
}
}
}

Most of the "magic" happens in this Connection class:



Connection::Connection(const QHostAddress aAddress, const qint16 aPort)
: QThread(), mSocket(NULL), mQueue(), mMutex(), mSemaphore(0)
{
/// create the socket
mSocket = new QTcpSocket();
mSocket->setTextModeEnabled(false);
mSocket->connectToHost(aAddress, aPort);
mSocket->waitForConnected(-1):
}

void Connection::run()
{
int lCount = 0;
int lBufLength = 0; /// size of the buffer not yet read
int lPacketLength = 0; /// length of current CCSDS packet
int lReadLength = 0; /// length of data read
int lStart = 0;
char *lData = NULL;
CCSDS *lPacket = NULL;
QDataStream lStream(mSocket);

lStream.setVersion(QDataStream::Qt_4_2);

while (true) {
if (!mSocket || mSocket->state() == QAbstractSocket::UnconnectedState)
return;

while (lBufLength < CCSDS_PRIM_SIZE) {
if (!mSocket->waitForReadyRead(-1))
return;

lBufLength = mSocket->bytesAvailable();
}

while (lBufLength) {
lBufLength = mSocket->bytesAvailable();
lPacket = new CCSDS();
lData = lPacket->data();
lStart = 0;

/// read only the primary header
lReadLength = lStream.readRawData(lData, CCSDS_PRIM_SIZE);

if (lReadLength == -1)
return;

/// adapt lBufLength and lStart with part which has been read
lStart += lReadLength;
lBufLength -= lReadLength;

/// primary header available: determine packet length
lPacketLength = 1 + (lData[4] << 8) + lData[5];

while (lBufLength < lPacketLength) {
if (!mSocket->waitForReadyRead(-1))
return;

lBufLength += mSocket->bytesAvailable();
}

lReadLength = lStream.readRawData(&lData[lStart], MIN(lPacketLength,lBufLength));

if (lReadLength == -1) {
return;
}

lStart += lReadLength;
lBufLength -= lReadLength;

/// packet received: put on queue
mMutex.lock();
mQueue.enqueue(lPacket);
mMutex.unlock();
lPacket = NULL;
lCount++;
}
/// inform user: set semaphore
mSemaphore.release(lCount);
lCount = 0;
}
}

CCSDS *Connection::recvCCSDS()
{
CCSDS *lPacket = NULL;

/// critical section
mMutex.lock();
if (!mQueue.isEmpty()) {
lPacket = mQueue.dequeue();
}

mMutex.unlock();
return lPacket;
}

high_flyer
4th April 2007, 10:05
It could be a mutex deadlock.
Try using tryLock() instead of lock() and see if it still gets stuck.
If it doesn't get stuck anymore, you know on which path you have to go.

danadam
4th April 2007, 11:26
I'm not 100% sure of this, cause I have no way to check it now, but:

You create QTcpSocket in Connection constructor, so it belongs to main thread. In Connection thread you call waitForRead() methods, so Connection thread is blocked until some data is available in socket. The following part is the one I'm not sure. I think that even when you use waitFor...() methods in Connection thread, the QSocket is still processed by the thread it lives in. So in order to receive any data (and as a result to unblock Connection thread) the main thread has to do some job. But it may happen that it will be blocked on acquire() and you get deadlock. Main thread waits for Connection thread to release semaphore and Connection thread waits for Main thread to process socket and deliver some data.

Try to move QSocket creation to run() method of Connection class and see if it helps.

KoosKoets
4th April 2007, 11:28
> deadlock

Good hint: it seems to have to do with the semaphores... My results improve if I change the
lSema->acquire(); into
if (!lSema->tryAcquire()) continue; in the main loop of the client.

Still, I fail to see what I'm doing wrong... Besides, sometimes I seem to hit this deadlock situation anyhow.

Reading the documentation on QMutex and QSemapore indicate that these objects are thread-safe and that the acquire() and lock() functions block until they obtain the semaphore or lock respectively.

I've entered QT += thread network in mij .pro file. Is there something else required?

KoosKoets
4th April 2007, 12:01
Main thread waits for Connection thread to release semaphore and Connection thread waits for Main thread to process socket and deliver some data.

Try to move QSocket creation to run() method of Connection class and see if it helps.

This seems a plausible explanation. However, after trying this I still mange get the 'deadlock'. Only in about 5% of my tries, all data is received without problems.

danadam
4th April 2007, 12:17
I think I found the reason. 58th line of Connection code in the first post is:
lBufLength += mSocket->bytesAvailable();I guess it should be:
lBufLength = mSocket->bytesAvailable();

KoosKoets
4th April 2007, 12:27
I guess it should be:
lBufLength = mSocket->bytesAvailable();

Good catch: it is indeed wrong. However, my deadlock remains...

high_flyer
4th April 2007, 12:59
Good catch: it is indeed wrong. However, my deadlock remains...
Why don't you try tryLock()?
It is easy, and it will tell you on the spot if its a mutex deadlock that you have...

KoosKoets
4th April 2007, 14:21
Why don't you try tryLock()?

I did try this, but still my problem remains.

I've stripped my code even further. Upon reception of a packet it is deleted right away (it is not placed in a mutex guarded QQueue, no semaphore release, no semaphore acquire): just reception of data and deletion. The creation of the QTcpSocket is done in the run() function. My problem remains...

It is like the waitForReadyRead(-1) sometimes doesn't wake up even when data is present at the socket. :crying:

moowy
4th April 2007, 20:43
I have a problem with using Qt networking. I want to establish a connection between a server and clients. The client is using a chalkboard and is using server to send coordinates of the lines to draw to them. For the drawing part I am using Irrlecht (because the program is not going to be the chalboard (this is just for testing), but a real 3d environment.

I have problem getting the communication to work. I would really appreciate if someone could look at the code (I attached) and please give me some clue.

On the server side I have:


Server::Server(QObject *parent) : QTcpServer(parent)
{
/* maximum number of players */
setMaxPendingConnections(8);
m_connectedClients=-1; // none connected clients
connect(this,SIGNAL(newConnection()),SLOT(newConne ctions()));

}
void Server::send(int socket,int x1, int y1, int x2, int y2)
{
// I use qhash to keep information about all clients and socketDescriptors
QHashIterator<int, Server_thread*> i(m_clients);
while (i.hasNext()) {
i.next();
if(i.key() != socket) // don't send this to the client which send the info
i.value()->send(x1,y1,x2,y2);
}
}

/* accept incoming connections */
void Server::newConnections()
{
while(hasPendingConnections()) {
QTcpSocket *socket = nextPendingConnection();
Server_thread *thread = new Server_thread(socket, this);
// add the client and socketDescriptor
m_clients[socket->socketDescriptor()] = thread;
/* delete the thread that has finished */
connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
thread->start();
}
}

The thread for clients...


Server_thread::Server_thread(QTcpSocket *socket, Server *parent)
: QThread(parent)
{
m_parent = parent;
m_socket = socket;
connect(this,SIGNAL(address(QString)),this,SLOT(ne w_connection(QString)));
connect(m_socket,SIGNAL(readyRead()),this,SLOT(rea d()));
m_socketDescriptor = m_socket->socketDescriptor();

emit address(m_socket->peerAddress().toString());

/* number of clients++ */
m_parent->incNrClients();

nextBlockSize=0;
}


void Server_thread::run()
{
// just to keep the thread active...
while(m_socket && m_socket->state() == QAbstractSocket::ConnectedState) {

}

m_socket->disconnectFromHost();
m_socket->waitForDisconnected(1000);
}

void Server_thread::read()
{
QDataStream in(m_socket);
in.setVersion(QDataStream::Qt_4_2);
/* read data */
if(nextBlockSize==0) {
if(m_socket->bytesAvailable() < sizeof(quint16))
return;

in >> nextBlockSize;
}
if(m_socket->bytesAvailable() < nextBlockSize)
return;

QString koordinate;
int x1,y1,x2,y2;

in >> koordinate;

sscanf(koordinate.toStdString().c_str(),"%d %d %d %d",&x1,&y1,&x2,&y2);

m_parent->send(m_socketDescriptor,x1,y1,x2,y2);
}

void Server_thread::send(int x1, int y1, int x2, int y2)
{
QString string;
string += QString(to_string<int>(x1).c_str());
string += " ";
string += QString(to_string<int>(y1).c_str());
string += " ";
string += QString(to_string<int>(x2).c_str());
string += " ";
string += QString(to_string<int>(y2).c_str());


QByteArray block;
QDataStream out(&block, QIODevice::WriteOnly);
out.setVersion(QDataStream::Qt_4_2);
out << (quint16)0;
out << string;
out.device()->seek(0);
out << (quint16)(block.size() - sizeof(quint16));

m_socket->write(block);

}


On the client side I have:


using namespace irr;
using namespace core;
using namespace scene;
using namespace video;

//event listener -> this works for sure
class ChalkboardEventReceiver : public IEventReceiver
{
public:
ChalkboardEventReceiver(Client *c)
: mouseDown(false), connection(NULL)
{
connection = c;
}

bool OnEvent(SEvent event)
{
if(event.EventType == EET_MOUSE_INPUT_EVENT) {
if(event.MouseInput.Event == EMIE_LMOUSE_PRESSED_DOWN) {
x = event.MouseInput.X;
y = event.MouseInput.Y;
mouseDown = true;

return true;
}
else if(event.MouseInput.Event == EMIE_LMOUSE_LEFT_UP) {
connection->AddLineLocal(x, y, event.MouseInput.X, event.MouseInput.Y);
connection->SendLineToServer(x, y, event.MouseInput.X, event.MouseInput.Y);
mouseDown = false;

return true;
}
else if(mouseDown && event.MouseInput.Event == EMIE_MOUSE_MOVED) {
connection->AddLineLocal(x, y, event.MouseInput.X, event.MouseInput.Y);
connection->SendLineToServer(x, y, event.MouseInput.X, event.MouseInput.Y);
x = event.MouseInput.X;
y = event.MouseInput.Y;

return true;
}
}

return false;
}


protected:
s32 x, y;
bool mouseDown;
Client *connection;
};

int main(int argc, char *argv[])
{
QApplication app(argc, argv);

Client client;
ChalkboardEventReceiver receiver(&client);
client.init(&receiver);

app.connect(&app, SIGNAL(lastWindowClosed()), &app, SLOT(quit()));
return app.exec();

}

Client::Client(QWidget *parent) : QWidget(parent)
{
connect(&m_socket, SIGNAL(readyRead()), this, SLOT(HandlePacket()));
connect(&m_socket, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(displayError(QAbstractSocket::SocketError)));
m_socket.connectToHost("127.0.0.1",3210);
nextBlockSize = 0;
// Wait for the init() call
device = 0;
// Default
driverType = irr::video::EDT_OPENGL;
}

void Client::SendLineToServer(s32 x1, s32 y1, s32 x2, s32 y2)
{

QString string;
string += QString(to_string<int>(x1).c_str());
string += " ";
string += QString(to_string<int>(y1).c_str());
string += " ";
string += QString(to_string<int>(x2).c_str());
string += " ";
string += QString(to_string<int>(y2).c_str());

QByteArray block;
QDataStream out(&block, QIODevice::WriteOnly);
out.setVersion(QDataStream::Qt_4_2);
out << (quint16)0;
out << string;
out.device()->seek(0);

//calculate the size of the block
out << (quint16)(block.size() - sizeof(quint16));

m_socket.write(block);
}

void Client::HandlePacket()
{
QDataStream in(&m_socket);
in.setVersion(QDataStream::Qt_4_2);

if (nextBlockSize == 0) {
if (m_socket.bytesAvailable() < (int)sizeof(quint16))
return;

in >> nextBlockSize;
}

if (m_socket.bytesAvailable() < nextBlockSize)
return;

QString string;
in >> string;

s32 x1,y1,x2,y2;
const char *str = string.toStdString().c_str();
sscanf(str,"%d %d %d %d",&x1,&y1,&x2,&y2);

AddLineLocal(x1, y1, x2, y2);
}



I'm sorry for posting so much code here (I attached the whole source code). I would really appreciate some help.