PDA

View Full Version : QThread and QQueue



qball2k5
27th July 2006, 22:02
Hey!

I have an application that has three thread.

Thread One: GUI
Thread Two: Messeger, comminicated with a device using an Aardvark (I2C bus)
Thread Three: Decoder, decodes the data from the Messeger, sends it to UI

The problem is that the Decoder seems to have a tendency to stop working, but I don't ever tell it to stop. I can see that the Messeger continuing to send data to te Decoder, but the decoder doesn't process it. Here's how the decoder thread looks.



Decoder::Decoder(QObject *parent)
: QThread(parent)
{
stopped = false;
abort = false;
}

Decoder::~Decoder()
{
mutex.lock();
waitCondition.wakeOne();
mutex.unlock();

wait();
}

void Decoder::run()
{
do
{
mutex.lock();
if (decodeQueue.isEmpty())
waitCondition.wait(&mutex);
mutex.unlock();

if(mutex.tryLock())
{
QStringList packet = decodeQueue.dequeue();
mutex.unlock();

Decode(packet);
}

msleep(2);

}while(!stopped);

if(stopped)
quit();
else
exec();
}

void Decoder::stop()
{
stopped = true;
}

void Decoder::DecodePacket( const QStringList &packet )
{
mutex.lock();
decodeQueue.enqueue(packet);
waitCondition.wakeOne();
mutex.unlock();
}

void Decoder::Decode( QStringList packet )
{
//Decodes packet and emits data to the GUi
}


Any ideas?

Thanks.

Rob

jacek
27th July 2006, 22:42
void Decoder::run()
{
...
if(stopped)
quit();
else
exec();
}
This part is a bit weird.


Decodes packet and emits data to the GUi
Are you sure that you use queued connection for this? Decoder object lives in a thread that created it, while Decoder::run() lives in a new thread --- this will fool automatic connections.

I'm not sure why do you unlock mutex just to try to lock it again.


Decoder::~Decoder()
{
stopped = true;

mutex.lock();
waitCondition.wakeOne();
mutex.unlock();

wait();
}

Decoder::run()
{
while( ! stopped ) {
mutex.lock();

while( decodeQueue.isEmpty() && ! stopped ) {
waitCondition.wait( &mutex );
}

if( ! stopped ) {
QStringList packet = decodeQueue.dequeue();
}

mutex.unlock();

if( ! stopped ) {
Decode( packet );
}
}
}

qball2k5
28th July 2006, 14:34
I'm definately using a queued connection, I set that up for all my signals.

This is the slot that recieves the QStringList from the messager



void Decoder::DecodePacket( const QStringList &packet )
{
mutex.lock();
decodeQueue.enqueue(packet);
waitCondition.wakeOne();
mutex.unlock();
}


In your example, won't the mutex be locked so this will never be able to add a new packet into the queue? Or am I missing something?

As for the wierd part, I don't know why I put that there, thought I needed it. It was in one of the examples.

qball2k5
28th July 2006, 15:26
You know what....it works much better. I have another thread question though.

My application right now is using like 85% of the CPU. I know it's because my messager thread. The messager thread basically has a queue of outgoing messages and if a message needs to be sent, it sends it. Then it checks to see if it needs to read data from the Aardvark (I2C). If it does, it reads it then passes it to the Decoder.

Here's how it looks. It works, just not very efficieintly. Any ideas?



Messager::~Messager()
{
//Destructor
mutex.lock();
waitCondition.wakeOne();
mutex.unlock();

wait();
}

void Messager::run()
{

do
{
if(!packetQueue.isEmpty())
{
if(mutex.tryLock())
{
Packet sendPacket = packetQueue.dequeue();
UINT8 address = addressQueue.dequeue();
UPDATE type = typeQueue.dequeue();

mutex.unlock();

if(SendMessage(sendPacket, address))
emit PacketSent(true, type);
else
emit PacketSent(false, type);
}
}

msleep(pollRate);

if(ReadDevice1)
ReadMessage(D1_ADDRESS);


if(ReadDevice2)
ReadMessage(D2_ADDRESS);


if(ReadDevice3)
ReadMessage(D3_ADDRESS);


if(ReadDevice4)
ReadMessage(D4_ADDRESS);


if (ReadDevice5)
ReadMessage(D5_ADDRESS);

if(ReadDevice6)
ReadMessage(D6_ADDRESS);


if(abort)
stopped = true;

}while(!stopped);

if(stopped)
quit();
else
exec();

}

void Messager::SendPacket(Packet myPacket, UINT8 address, UPDATE Type)
{
QMutexLocker locker(&mutex);

packetQueue.enqueue(myPacket);
addressQueue.enqueue(address);
typeQueue.enqueue(Type);
}

bool Messager::Config(int port, int bitrate)
{
//Configure I2C device
}

bool Messager::SendMessage(Packet outgoingPacket, UINT8 addr)
{
//Write packet to I2C bus
}

bool Messager::ReadMessage(UINT8 address)
{
//Read data from I2C bus, emit PacketRecieved signal to Decoder
}

void Messager::EnableDevice1Status(bool Enabled)
{
mutex.lock();
ReadDevice1 = Enabled;
mutex.unlock();
}



It needs to read only when told to and send any message that gets put into the queue. I'm still new to threads. I'd really appreciate any ideas.

If this is confusing, I can attempt to elaborate better.

Thanks,

Rob

jacek
28th July 2006, 15:30
I'm definately using a queued connection, I set that up for all my signals.

This is the slot that recieves the QStringList from the messager
When you use queued connections, signals behave like events that are delivered to event loop that runs in a thread that owns the receiver.

There are two problems:
your thread doesn't have a running event loop (and it doesn't have to), so you can't use queued connections to send signals to objects that live inside it. You have to use direct connections, but with proper locking mechanism to avoid problems.
Decoder was created in the GUI thread, so all events (including those that realize queued connections) will go through the event loop in the GUI thread. If you block the GUI, you will also block the Decoder.

In other words, try using a direct connection when you connect to Decoder::DecodePacket() (but Decoder should be still connected to the GUI using queued connection).


In your example, won't the mutex be locked so this will never be able to add a new packet into the queue?
No, it shouldn't. QWaitCondition::wait() unlocks the mutex, waits and locks it again.


As for the wierd part, I don't know why I put that there, thought I needed it. It was in one of the examples.
You exit the while loop only when stopped == true, so the else clause is never executed.

jacek
28th July 2006, 15:40
My application right now is using like 85% of the CPU. I know it's because my messager thread. The messager thread basically has a queue of outgoing messages and if a message needs to be sent, it sends it. Then it checks to see if it needs to read data from the Aardvark (I2C). If it does, it reads it then passes it to the Decoder.
Sounds like it's a job for QSocketNotifier.

To avoid problems, IMO, you should split the implementation into two classes: Messenger and MessengerThread.

MessengerThread should look like this:
void MessengerThread::run()
{
Messenger m;
// setup connections or whatever
exec();
}
while Messenger (derived from QObject) should use QSocketNotifiers to monitor the devices.

Of course you can do this using a single class, but you must watch out for connection types.

qball2k5
28th July 2006, 16:06
So....

I'm starting the two threads from the GUI thread, called DeckSimulator.

I call:



messager.start(QThread::TimeCriticalPriority);
decoder.start(QThread::LowPriority);


in the constructor of DeckSimulator

That's starts them on there own thread. Right? Or do I need to start the thread when I call SendPacket(from Messager) and DecodePacket(from Decoder). Like in the Mandalbrot example.

jacek
28th July 2006, 17:32
That's starts them on there own thread. Right?
Yes, but only the run() method will be in a new thread. messager and decoder objects will live in the GUI thread, as you instantiate them before the new thread is created.


Or do I need to start the thread when I call SendPacket(from Messager) and DecodePacket(from Decoder). Like in the Mandalbrot example.
No, you don't have to.

qball2k5
31st July 2006, 20:16
Sounds like it's a job for QSocketNotifier.

It's really just one device, the Aardvark device, but different address to read using that device.

It still looks like it locks up after a while and doesn't work. Could this be a stack issue?

It just stops decoding, even though I can see the reads being done. I see the packet signal getting emitted and the DecodePacket slot getting called by that emitted signal, via breakpoints...i put breakpoints in the run function and it's not doing anything. the Run function isn't going, but the stopped variable is false, so it should continue to run.

Any ideas?

jacek
31st July 2006, 21:21
It's really just one device, the Aardvark device, but different address to read using that device.
It doesn't matter how many devices you have, QSocketNotifier should notify you when there is something to read, but of course you need a file descriptor for this.


It just stops decoding, even though I can see the reads being done.
How about these?

Decoder::~Decoder()
{
waitCondition.wakeOne();
wait();
}

void Decoder::DecodePacket( const QStringList &packet )
{
mutex.lock();
decodeQueue.enqueue(packet);
mutex.unlock();

waitCondition.wakeOne();
}

qball2k5
2nd August 2006, 14:27
It only seems to happen when I minimize it and the bring it back?

Do I need to do something special for this?

jacek
2nd August 2006, 22:25
It only seems to happen when I minimize it and the bring it back?
Are you sure that signal/slot connections between decoder and reader threads are direct? If not, all data will go through GUI thread's event loop.

Anyway, how much data you receive from that device? Maybe you don't need 3 threads for this?

qball2k5
3rd August 2006, 19:09
I've tried it both ways and still no dice.

The data is read every 64Hz, and has can recieve anywhere from 4 bytes of raw data to 512 bytes of raw data. The data is capture in an array of integer values, then copied into a QStringList, then emitted to the decoder. The Messager thread actually works faster then 64 Hz. The messager use to do the decoding at the same time, but I was having lots of slow downs and crashes.

I figured putting the decoder into it's own thread, then having the messager send it packets to decode, then it would make the application more efficient.

So the Messager fills the Decoder queue, then the Decoder decodes the data and emits a signal to the UI to update a particular GUI element. The Decoder queue could get backed up, considering the average length of a message is about 12-18 bytes.

Why does the decoder thread stop working when the application gets minimized that really seems to be te issue.

DeckSimulator, the UI, has the two objects, decoder and messager. They are started in the constructor of DeckSimulator, I'm running out of ideas. It doesn't fail when it stays non-minimized.

jacek
3rd August 2006, 19:37
The data is read every 64Hz, and has can recieve anywhere from 4 bytes of raw data to 512 bytes of raw data.
Well 64 Hz is a bit too much for a single-threaded application.


DeckSimulator, the UI, has the two objects, decoder and messager. They are started in the constructor of DeckSimulator, I'm running out of ideas. It doesn't fail when it stays non-minimized.
Which Qt version do you use? Anyway, I've checked the task tracker and I didn't found anything connected with this issue.

Maybe you could prepare a minimal, compilable example that reproduces the problem (without all the GUI and decoding stuff, "messager" can send the same data all the time)? This way it will be easier to search for possible solutions.

qball2k5
4th August 2006, 19:01
I'm using 4.1.3

I can see if I can rip out some of the sensitive things. I'm battling a crash right now.

The application crashes when I switch between tabs....my GUI has tabs, each with lots of stuff on them, when I toggle between them, I get an index of range problem. I think I know why...it's a problem with my decoder.

qball2k5
7th August 2006, 15:04
Here's a compiled stripped down Messager. I connect the decoder to the PacketRecieved signal, then the decoder is connected to the GUI.

messager.cpp


#include <QMutexLocker>
#include <QFile>
#include <QTextStream>
#include <QtDebug>
#include <QStringList>
#include <iostream>
#include <QMetaType>
#include <QTime>

#include "messager.h"

using std::cout;

Messager::Messager(QObject *parent)
: QThread(parent)
{
stopped = false;
abort = false;
ReadDevice = false;

pollRate = 3;
//Constructor
}

Messager::~Messager()
{
//Destructor
mutex.lock();
waitCondition.wakeOne();
mutex.unlock();

wait();
}

void Messager::run()
{

do
{
//This code is for sending a packet
/* if(!packetQueue.isEmpty())
{
if(mutex.tryLock())
{
Packet sendPacket = packetQueue.dequeue();
UINT8 address = addressQueue.dequeue();
UPDATE type = typeQueue.dequeue();

mutex.unlock();

if(SendMessage(sendPacket, address))
emit PacketSent(true, type);
else
emit PacketSent(false, type);
}
} */

msleep(pollRate);


if(ReadDevice)
ReadMessage(0);

if(abort)
stopped = true;

}while(!stopped);

if(stopped)
quit();
else
exec();

}

void Messager::stop()
{
stopped = true;
}

/*void Messager::SendPacket(Packet myPacket, UINT8 address, UPDATE Type)
{
QMutexLocker locker(&mutex);

packetQueue.enqueue(myPacket);
addressQueue.enqueue(address);
typeQueue.enqueue(Type);
}

bool Messager::Config(int port, int bitrate)
{
QMutexLocker locker(&mutex);

// Open the port
ahandle = aa_open(port);

if (ahandle <= 0)
{
return false;
}

aa_configure(ahandle, AA_CONFIG_GPIO_I2C);
aa_i2c_pullup(ahandle, AA_I2C_PULLUP_BOTH);
aa_target_power(ahandle, AA_TARGET_POWER_NONE);

// Set the bitrate
bitrate = aa_i2c_bitrate(ahandle, bitrate);

return true;
}

bool Messager::SendMessage(Packet outgoingPacket, UINT8 addr)
{
QMutexLocker locker(&mutex);

int status = 0;
int retries = 0;
UINT16 aacount;

status = aa_i2c_write_ext(ahandle, addr, AA_I2C_NO_FLAGS, (2*outgoingPacket.GetLength())+4, outgoingPacket.GetData(), &aacount);

if(status != 0) // Try again
{
++retries;

status = aa_i2c_write_ext(ahandle, addr, AA_I2C_NO_FLAGS, (2*outgoingPacket.GetLength())+4, outgoingPacket.GetData(), &aacount);

if(status != 0)
{
qDebug() << "Message Sending Failure!\n";
return false;
}

else
return true;

}
else
return true;
}*/

bool Messager::ReadMessage(int address)
{
QStringList packet;
QString formatter;
int i;

for(i = 0; i < 15; i ++)
packet.append(formatter.number(0xFF));


emit PacketRecieved( packet );

return true;

/* UINT8 buffer[511];
UINT16 actualSize = 0;
UINT8* curBuf = buffer;
UINT16 actualRead = 0;
INT32 status = 0;
UINT8 payloadSzWords = 0;
UINT16 curSize = 4;
UINT16 messageID = 0;
int i = 0;
int retries = 0;
int start = 0;
bool ok;

status = aa_i2c_read_ext(ahandle, address, AA_I2C_NO_FLAGS, curSize, curBuf, &actualRead);

if ((0 != status) )
{
++retries;
status = aa_i2c_read_ext(ahandle, address, AA_I2C_NO_FLAGS, curSize, curBuf, &actualRead);
}
else if ( actualRead != curSize)
{
return false;
}
else // the correct number of bytes were received
{
payloadSzWords = curBuf[0];
actualSize = actualRead;

if ( 0 != payloadSzWords )
{
curSize = 2 * (UINT16(payloadSzWords));
actualSize += curSize;
curBuf = &buffer[actualRead];
status = aa_i2c_read_ext(ahandle, address, AA_I2C_NO_FLAGS, curSize, curBuf, &actualRead);

if ((0 != status))
{
++retries;
status = aa_i2c_read_ext(ahandle, address, AA_I2C_NO_FLAGS, curSize, curBuf, &actualRead);
}
else if ( actualRead != curSize)
{
status = -7;
}
else if(status ==0)
{
for(i = 0; i < actualSize; i++)
{
packet.append(formatter.sprintf("%02x ", buffer[i] & 0xff));
}

emit PacketRecieved( packet );
}
else
return false;
}
else
return false;
}

return true;*/
}

void Messager::EnableDeviceStatus(bool Enabled)
{
mutex.lock();
ReadDevice = Enabled;
mutex.unlock();
}


And here's messager.h


#ifndef MESSAGER_H
#define MESSAGER_H

#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include <QQueue>

class Messager : public QThread
{
Q_OBJECT

public:
Messager(QObject *parent = 0);
~Messager();
void stop();
void EnableDeviceStatus(bool Enabled);

signals:
//void PacketSent(bool Success, UPDATE changeUI);
void PacketRecieved(const QStringList &packet);

public slots:
//void SendPacket(Packet myPacket, UINT8 address, UPDATE MessageType);

protected:
void run();

private:
//bool SendMessage(Packet outgoingPacket, UINT8 address);
bool ReadMessage(int address);

/* QQueue <Packet> packetQueue;
QQueue <UINT8> addressQueue;
QQueue <UPDATE> typeQueue;
UPDATE MessageType;
UINT8 addr;

Aardvark ahandle;

*/
volatile bool stopped;
bool abort;
int pollRate;

bool ReadDevice;

QMutex mutex;
QWaitCondition waitCondition;

};

#endif


There you go, do you want a striped down decoder too?

jacek
7th August 2006, 15:15
There you go, do you want a striped down decoder too?
Actually I was thinking about something that I could compile and test.

qball2k5
8th August 2006, 15:15
See attached code, It's basically a one button interface (which is about 1/20 of the amount of data). Messager emits two possible messages that will cause 3 of the dots to flash. The messages are sent to the decoder and then the decoder updates the UI. The code pretty much speaks for itself.

I don't see the problem.

I connect the decoder object to the GUI, normally, with multiple signals to different slots? Could that be the problem, should the decoder be attached to the GUI via one signal instead of perhaps the 5-6 connections, I currently use.

jacek
8th August 2006, 19:12
Messager emits two possible messages that will cause 3 of the dots to flash.
Yes, they're flashing, but they're flashing all the time (even after I minimize and restore it). So the question is: how do I reproduce the bad behavior? Or maybe it just works on my system?


I connect the decoder object to the GUI, normally, with multiple signals to different slots? Could that be the problem, should the decoder be attached to the GUI via one signal instead of perhaps the 5-6 connections, I currently use.
No, it shouldn't be a problem.

qball2k5
15th August 2006, 16:32
I've talked to some people....mainly the people I work with....and they just won't minimize the application. Problem sort of fixed.

I might switch it to a single connection between the decoder and UI and see if that works.

This is a difficult problem to debug considering my app feeds off data from an external device.

The code I gave you had one connection, it really has 6....and will have 8 once completed.
I'm going to try and bring the 6 connection back, then simulate the messaging and see if the problem returns.

I'll post that code when I can...thanks for all your on going help