PDA

View Full Version : Thread Safe Queue container....



jcox23
3rd December 2009, 18:35
Hello,

I started learning about multi threading with Python then C....
solving a classical Consumer/Producer problem...

In python, the class Queue (and its equivalent the GLib (C)) were a good solution it seems... thread-safe queue container, one thread push stuff in it, other pop stuff from it...

I'm porting to C++/Qt, and I was surprised to see no such convenience class inside Qt....
QList, QQueue and co are all unsafe regarding threading.

So I wrote a class around QQueue to make it thread safe...(with mutexes)...

But I'm thinking that maybe such class is absent because there is a more natural way to handle the Consumer/Producer problem in a "Qt way"....

How would you go about it?

squidge
3rd December 2009, 18:50
Depends on the application. If there was more reads than writes then I'd use QReadLocker & QWriteLocker, as they are based for this purpose. They also ensure the state of the lock is always well defined.

wysota
3rd December 2009, 23:31
Producer/consumer problem is usually solved using semaphores or wait conditions, depending on the type of pattern you are dealing with (mainly if the number of producer "slots" is limited - i.e. when you produce data into a buffer of limited size).

Tanuki-no Torigava
3rd December 2009, 23:36
You can use QMutex on critical ops. Check assistan for QMutex details.

jcox23
4th December 2009, 09:26
Producer/consumer problem is usually solved using semaphores or wait conditions, depending on the type of pattern you are dealing with (mainly if the number of producer "slots" is limited - i.e. when you produce data into a buffer of limited size).

In this particular case, the container cannot be limited in size...even though it will likely have little "pressure" (1:1 - 3:1 consumers : producers, with each pop/push separated by seconds)... it is critical that the information produced by the producer is not lost....

I ended up with:

#ifndef ASYNC_queueH
#define ASYNC_queueH

#include <QThread>
#include <QQueue>

template<class T> class QAsyncQueue
{
public:

QAsyncQueue(uint _max = -1)
: _max(max)
{
}

~QAsyncQueue()
{
clean();
}

uint count()
{
_mutex.lock();
int count = _queue.count();
_mutex.unlock();
return count;
}

bool isFull()
{
if (-1 == _max)
return false;

_mutex.lock();
int count = _queue.count();
_mutex.unlock();
return count >= max_;
}

bool isEmpty()
{
_mutex.lock();
bool empty = _queue.isEmpty();
_mutex.unlock();
return empty;
}

void clean()
{
_mutex.lock();
_queue.clear();
_mutex.unlock();
}

void push(const T& t)
{
_mutex.lock();
_queue.enqueue(t);
_mutex.unlock();
}

T pull()
{
_mutex.lock();
T i = _queue.dequeue();
_mutex.unlock();
return i;
}

private:

QQueue<T> _queue;
QMutex _mutex;
int _max;
};
#endif


probably not state-of-the-art C++ code, but it seems to work... I declare one of these queue as a global or static and threads are sharing it....

so from your answers so far, Qt is providing classic tools for threading...locking and such must be done by hand... there is no high level class hiding all this...

thanks for the answers so far!

wysota
4th December 2009, 10:17
In this particular case, the container cannot be limited in size...even though it will likely have little "pressure" (1:1 - 3:1 consumers : producers, with each pop/push separated by seconds)...
Bad idea. It's better to use a buffer of limited size with semaphores. You can't predict the order of threads being woken up by the scheduler. It may happen that your producers or consumers get a bit starved and you'll have a problem.

By the way - your queue doesn't protect you from buffer underruns (or overruns). The fact that you call "isEmpty" or "isFull" is completely meaningless as right when you return some other thread may change its state so you may be adding to a full queue or reading from an empty one. So your queue is not thread-safe in its current form.


it is critical that the information produced by the producer is not lost....
Nothing will be lost. The producer will wait until there is space in the buffer.


there is no high level class hiding all this...

Actually there is. You could have used events to push produced data into a queue.

Attached you will find a real thread-safe queue I just implemented.

jcox23
4th December 2009, 10:36
Attached you will find a real thread-safe queue I just implemented.

Thanks, I will study this carefully!....


By the way - your queue doesn't protect you from buffer underruns (or overruns). The fact that you call "isEmpty" or "isFull" is completely meaningless as right when you return some other thread may change its state so you may be adding to a full queue or reading from an empty one. So your queue is not thread-safe in its current form.

I want to be sure to understand this properly...
you're saying that a construct like:

if(!asyncqueue.isEmpty()) asyncqueue.pull()
is not safe?
because between the actual test of emptiness and the actual pull, another thread could have pulled, emptying the queue possibly... and so my pull will fail...
Is that correct?
and that's why you added the semaphore on top of the mutex protection?....

wysota
4th December 2009, 11:12
you're saying that a construct like:

if(!asyncqueue.isEmpty()) asyncqueue.pull()
is not safe?
Yes, that's correct.


because between the actual test of emptiness and the actual pull, another thread could have pulled, emptying the queue possibly... and so my pull will fail...
Exactly.


and that's why you added the semaphore on top of the mutex protection?....

Yes, the semaphore makes sure you can't read from an empty queue or write to a full queue. If you try, you will be blocked until you can. This simplifies the code and improves resource usage as your threads won't be spinning around checking if they can access the queue.

jcox23
4th December 2009, 12:36
Yes, that's correct.

Exactly.

Yes, the semaphore makes sure you can't read from an empty queue or write to a full queue. If you try, you will be blocked until you can. This simplifies the code and improves resource usage as your threads won't be spinning around checking if they can access the queue.

Thanks very much for the helpful discussion...

Btw, I got segfaults when trying tryDequeue()...(because I would prefer to have a non blocking behavior)... but don't mind me, maybe I'm not using it right...

wysota
4th December 2009, 13:48
The whole point is to block. Non-blocking variants are in general unreliable. And if you don't block and get a buffer overflow, you make your application unstable because you're constantly spinning in the loop waiting for the buffer to be available thus eating cpu power that could be spent on consuming messages from the buffer.

I can't think of a single situation where you wouldn't want to block (unless blocking was more expensive than a busy loop). If you produce an element and the buffer is full and you don't want to block then what will you do with it? You can only discard it, you can't store it anywhere because you risk falling off a cliff as you are producing elements faster than you can consume them. Blocking is a way to slow down production or consumption in a graceful way.

Tanuki-no Torigava
5th December 2009, 00:16
Just checked the code. I do suggest to change lock to tryLock for QMutex. Works better because much more predictable with timeouts.

jcox23
6th December 2009, 12:19
I can't think of a single situation where you wouldn't want to block (unless blocking was more expensive than a busy loop). If you produce an element and the buffer is full and you don't want to block then what will you do with it?

that's why I wanted to used a growing container...
the original design, was a growing container, that is only checked by consumers (only 1 or 2 thread) like every second (no cpu hog then)...

but I'm open to new designs...and will checkout how I can deal with the blocking....
(i'm mainly concerned on how to kill a thread that is blocking)


Just checked the code. I do suggest to change lock to tryLock for QMutex. Works better because much more predictable with timeouts.

I'll see how it goes.... but right now in the actual the tryAcquire method is causing segfaults....

wysota
6th December 2009, 16:12
(i'm mainly concerned on how to kill a thread that is blocking)
Set a flag that the thread will check right after acquiring a semaphore and release the semaphore so that threads that are being blocked can continue. They should see the flag and exit.

ferrabras
15th December 2009, 11:10
After checking several interesting threads about data sharing among threads I chose this one to ask for suggestions.

I am used to POSIX C thread where we can create a pointer to a queue dynamically share it among any other thread, witch could push or pull pointers into or from it.:cool:

I used many times an old C solution where the application creates a queue of struct of pointers with thread id and network data buffer pointer. Upon request the application would create a new thread that would download data from network into a buffer, created dynamically. After download this buffer pointer would be pushed into the queue along with the thread id of the creator. Later a parser thread would process the data according to the thread id, because the parse was URL dependent, and then free the buffer memory.

I was thinking of doing the same thing with Qt using QQueue<QNetworkReply *> but I read that QObject classes created in one thread shall not have its members called from another thread (believe it has something to do with message loop). Also heard that QNetwork and QSql modules classes shall be used in the same thread they were created.:eek:

From what you've discussed here you came out with a thread safe container but this queue cannot contain pointer to QObject descendant, can it?:rolleyes:

I didn't want to copy all data from QNetworkReply into another container not inherited from QObject to then queue it but I see no other solution.
So far this is what I came out with:
1) the thread would dynamically create a QDataStream, wich is not QObject descendanta(is it?), and push all data from QNetworkReply into it.
2) push QDataStream pointer into queue.
3) parser thread would pull data out of the shared queue and free the QDataStream after parsing it.

I think it's a waste of memory and time copying data from one container to another but was the only solution candidate I pondered so far.:confused:

I hope you guys can give me some advices before I start coding this solution. Meanwhile I will take care of the parser function of QDataStream!
Thanks a lot in advance!

wysota
15th December 2009, 11:40
You don't need threads to handle multiple network connections with Qt.

ferrabras
15th December 2009, 17:19
From what I know about QNetworkAccessManager I could connect a void timeout() signal from a timer to a function that requests data and connect a finished(QNetworkReply *) signal from the manager to the function that would use de data. Maybe I create a timer and a manager for each Url, this is also a good idea. I'm sure I could suppress some threads doing it. Thanks.
Yet I would need the parser/processor to be non blocking so that they don't block the application message loop. So the parser/processor have to be "threaded" and a queue of downloaded data should be available to them. Instead of passing a QNetworkReply pointer I would have to pass a copy of the data itself anyway.

wysota
15th December 2009, 19:24
Yet I would need the parser/processor to be non blocking so that they don't block the application message loop. So the parser/processor have to be "threaded" and a queue of downloaded data should be available to them.

How long do you expect the parser to parse the result? Maybe it's so short it's not worth delegating into another thread. Or maybe you can use Qt Concurrent.

Have you seen the article on Keeping the GUI Responsive?

bob2oneil
4th April 2011, 15:02
Hi wysota, thanks for all that you do to support the Qt community. I just reviewed your fine thread safe queue template. Since this discussion is a few years old, I wonder if there have been any changes to Qt in this time (such as additional thread safe collections or other constructs) that you would utilize for this solution, or if your template implementation is as timely now as when this thread was active? Would you do anything differently with the latest release of Qt?

wysota
4th April 2011, 15:37
There have been no changes to Qt in this regard as far as I know.

kevinp
2nd September 2011, 17:07
wysota, Thanks for your example! Just when I think I'm understanding it, a puzzle: At the end of tryDequeue(), shouldn't it be _semFree.release() instead of _semUsed.release() to note 1 less item in the queue, as in dequeue()?

wysota
4th September 2011, 15:21
wysota, Thanks for your example! Just when I think I'm understanding it, a puzzle: At the end of tryDequeue(), shouldn't it be _semFree.release() instead of _semUsed.release() to note 1 less item in the queue, as in dequeue()?

No. _semFree is released when dequeue() is called. The point of releasing _semUsed is that first you try to acquire it at the beginning of the method to make sure the queue is not empty so then you have to release the lock at the end of the method.

hazelnusse
14th October 2011, 22:02
Is the threadsafequeue.tar.gz link still valid for other people? I just tried to download it and am unable to untar it:


luke@ThinkPad-W510:/tmp$ tar xvf threadsafequeue.tar.gz
tar: This does not look like a tar archive
tar: Skipping to next header
tar: Exiting with failure status due to previous errors


Also, the file size when I download it is 990 bytes, but the forum post lists it as 960 bytes.... Anybody else having this issue?

wysota
14th October 2011, 22:30
It is tar-gzipped.

hazelnusse
14th October 2011, 23:57
It is tar-gzipped.

I am aware that it is tar-gzipped, and I am familiar with working with .tar.gz/.tgz files. `tar xvf` normally gunzips then untars tar-gzipped files just fine, but not this one. To get it to work, I had to do those things in two steps:


$ gunzip threadsafequeue.tar.gz
$ tar x -f threadsafequeue.tar


Which is unusual. Adding the -z option to tar did not fix it either.

wysota
15th October 2011, 08:41
`tar xvf` normally gunzips then untars tar-gzipped files just fine
Does it? I was always sure -z option to tar is responsible for ungzipping.


Adding the -z option to tar did not fix it either.
The file was probably gzipped twice, don't ask me how.

suspiria
5th June 2012, 19:36
FYI, in the copy of wysota's otherwise excellent ThreadSafeQueue that i downloaded, the return statement is missing from head().

perhaps it has been fixed later and i missed the later version.

thanks.