PDA

View Full Version : 1 producer 14 Consumers problem



tsyahoo
20th May 2009, 09:53
Hi everyone.

We are trying to develop an application involving 1 producer and 14 consumers simultaneously accessing a circular buffers.

The producer is entitled to produce data bytes and store them into several buffers (depending on the kind of data it produces). The consumers are entitled to retrieve data bytes from the buffer and use them to perform operations such as graphic display and file data logging.

Basically there are the producer and a pair of consumers per circular buffer; one consumer for graphic display and another one for file data logging. There is a total of 7 buffers each of which refers to a specific physical data acquisition channel.

The concurrent access to the circular buffer is protected by a pair of semaphores, and one mutex (that we believe can be removed).

The application run smoothly for a limited number of channels but it crashes when handling all the 7 channels.

Running in debug mode QT4.5 under Windows XP - SP3, we observe that the application get stuck during the call to tryAcquire, and more specifically to the QMutexLocker locker(&d->mutex); statement of it.

Do you think that there is a maximum amount of semaphores that can be used within an application ?
Do you have any suggestion on things that need to be considered while dealing with threads, semaphores, mutexes and such ?

Thanks to you all.

wysota
20th May 2009, 23:08
I'm sorry to say that but your code doesn't make much sense... To handle a circular buffer, regardless of the number of consumers you only need one semaphore and optionally one mutex (but you can avoid having it for example by using QAtomicPointer).

Here is a rough implementation of a circular buffer (let's assume it holds integers, for simplicity):

class CircularBuffer {
public:
CircularBuffer(int size), m_sem(0) {
m_data = new int(size);
m_firstToRead = 0;
m_firstToWrite = 0;
m_size = size;
}
int read() {
m_sem.acquire(1);
m_mutex.lock(); // protect m_firstToRead
int val = m_data[m_firstToRead];
m_firstToRead = (m_firstToRead+1) % m_size;
m_mutex.unlock();
return val;
}
void write(int val) {
m_data[m_firstToWrite] = val;
m_firstToWrite = (m_firstToWrite+1) % m_size;
m_sem.release(1);
}
private:
QSemaphore m_sem;
int m_firstToRead;
int m_firstToWrite;
int m_size;
QMutex m_mutex;
};

If you want more than one producer, you need to protect m_firstToWrite as well.

If you need more buffers, or a buffer of buffers, just repeat the pattern. Allocating an array of arrays of semaphores is probably a very bad idea.

tsyahoo
22nd May 2009, 11:04
Hi there. Thank yopu very much for you helpful comment. I appreciated it very much, especially the part where you say that what I write does not make sense.

But let's move forward and let's take the discussion on technical issues rather than personal judgments.

The idea underlying my class is to have a circular buffer able to serve one producer and several consumers. All the consumers must be able to read each byte produced by the producer, and the producer must wait until all the consumers have retrieved the produced data. I do not see anything like that in your implementation.

Furthermore, there are no arrays of arrays of semaphores. There is one array of pointers to semaphores. Each semaphore is related to a different consumer, and since all the consumers live a separate life, there must be an independent semaphore for each one of them.

Thank you anyway.

wysota
22nd May 2009, 20:24
The idea underlying my class is to have a circular buffer able to serve one producer and several consumers. All the consumers must be able to read each byte produced by the producer, and the producer must wait until all the consumers have retrieved the produced data. I do not see anything like that in your implementation.

You forgot to mention that in your post :)

What you want is broadcast, not a synchronized access to data (and you can do it with events). And you don't need semaphores for that. If you want to keep it the way it is, there is QAtomicInt that can be incremented atomically which is sufficient for your task.

Still you need a single semaphore to protect the buffer itself but then you only need to make sure all consumers accessed the data before overwriting it. You can do it in one of two ways:

1. each item in the buffer has its own semaphore that gets released (with the value of 1) by each consumer after reading the data and gets acquired (with the value of 14 or whatever the amount of consumers you have per cell) by the producer. This makes sure the producer overwrites data in a cell only if all consumers have already accessed it.

2. each item in the buffer has a counter that gets incremented when a consumer accessed it. The producer has to wait until the value of that counter equals the number of consumers (+1 if the producer itself also increments the buffer to provide consistency). This can also be implemented using a mutex and a wait condition to make sure the producer only reads the counter if its value has changed recently.

Right now I can think of at least one more way involving two semaphores and a bunch of QAtomicInt objects to solve the task. One semaphore protects the buffer from overrunning, the other from underrunning. One is released by the producer each time it writes an item, the other is released by the last consumer accessing the item. Consumers use the atomic integer to check if they are the last one to read the data and if they should release the semaphore.

Here is the supposed code for implementing the consumer:



int currentIndex = ...;

underrunSemaphore.acquire(1);
consumeData(data[currentIndex]);
int val = data[currentIndex].counter.fetchAndAddOrdered(1);
if(val==1){
// I'm the second (last) consumer to access the data
overrunSemaphore.release(1);
}
currentIndex++;