PDA

View Full Version : ERROR with QSemaphores on a real time display application



^Nisok^
9th January 2012, 21:49
Hey guys!

I have some major issues on the synchronisation of two threads. The problem is common producer - consumer but the speed of the two threads is variable and depends on an external device.

The producer is reading constantly datagrams from a UDPServer and after some validations it stores them to a boost::circular_buffer


producer ( QSemphore* freeBytes, QSemaphore * usedBytes, *BUFFER)
while(true)
{
if (udpSocket.waitForReadyRead(10) && !paused)
{

while (udpSocket.hasPendingDatagrams())
{
datagramSize = udpSocket.pendingDatagramSize();
datagram.resize(datagramSize);
udpSocket.readDatagram(datagram.data(), datagramSize);

hexDatagram.resize(datagramSize*2);

hexDatagramSize = hexDatagram.size();
hexDatagram = datagram.toHex();
blocksAcquired = datagramSize/1280;

READING_CHECKING();
if( OK_FILL_A_TEMP_BUFFER())
if(CERTAIN_AMOUNT_OF_INFORMATION)
{
freeBytes->acquire(1); // Pointer to a QSemaphore on the main thread
for (j=0; j<320; j++)
{
iBuffer->push_back(tempBuffer[j]);
}
tempIndex=0;
usedBytes->release(1);// Pointer to a QSemaphore on the main thread
}
}
datagram.clear();
hexDatagram.clear();
}
}


The consumer checks the usedBytes->available() in order to exceed a certain level and then start to process them.



while(true)
{
availablePacks = usedBytes->available();

if(availablePacks>20)
{
for(i=0; i<availablePacks-5; i++)
{
usedBytes->acquire(1);

for(j=0; j<80; j++)
{
cha[j] = iBuffer->front();
iBuffer->pop_front();

chb[j] = iBuffer->front();
iBuffer->pop_front();

chc[j] = iBuffer->front();
iBuffer->pop_front();

chd[j] = iBuffer->front();
iBuffer->pop_front();

}
//printf("Con: %d \n",iBuffer->size());
freeBytes->release(1);

for(j=0; j<80; j++)
{
PROCCESSING();

}

}
}


But all the time the consumer overruns the produces leading to a SIGSEVG error.

Added after 12 minutes:

I should also note that the BUFFER is 200 times larger than the transfering packages. and that I have only 100 Semaphores in order to ensure somehow that the threads won't overlap. Even if finally they do so.

wysota
9th January 2012, 23:41
I would assume some of your magic numbers calculations are incorrect. I don't see the point of "if(availablePacks>20)" as well.

^Nisok^
10th January 2012, 10:14
Thank you for your reply.
What do you mean magic numbers??

Because I don't need to display the data as fast as they come, I wait to have gathered several packages. But this argument can be removed....

wysota
10th January 2012, 11:04
What do you mean magic numbers??

5, 20, 80, 320, 1280

^Nisok^
10th January 2012, 13:08
I am afraid that the numbers are correct. The application runs even for several hours before the crash.

Any other idea??

wysota
10th January 2012, 13:58
I don't see how the amount of time the application runs for is related to any of those numbers. The application crashes when there are conditions met which cause a buffer underrun. The only time related aspect is that the overall probability of the fault manifesting itself increases as the time passes.

For instance you have a statement: blocksAcquired = datagramSize/1280;

Since datagramSize is an integer, I assume blocksAcquired is also an integer. If datagramSize is not constant, it might sometimes have a size that cannot be divided by 1280. When this happens, you leak blocks (since if datagramSize = 1281 then blocksAcquired = 1, whereas it is likely you'd expect it to be 2). You have to check such border cases. If your application crashes, it is not a direct fault of a buffer underrun, since the semaphore should block an attempt to read from an empty buffer. So first check where exactly the crash occurs (on reading from the buffer or from processing the data) using a debugger. And simplify your code --- if you don't need to strange conditions then throw them away.

As a side note, I would advise you to manipulate the semaphores in terms of bytes instead of blocks, if you later read or write bytes and not whole blocks. You'll have less places where you can make a mistake in your code.

^Nisok^
10th January 2012, 15:44
I followed your suggestions and I can say that for the past 2 hours it hasn't crash.
But still I am a bit nervous ;) ...

Because till now all the crashes took place randomly.
The debugger wasn't helpfull at all.

Thank you for your help !!

I will update the thread if necessary...

wysota
10th January 2012, 15:47
If it crashes, come back with a debugger backtrace :)

^Nisok^
10th January 2012, 16:01
One important note that I would like to submit for all the other developers which will read this thread is that the boost::circular_buffer.push_back() check the new size of the buffer using the size().
The size of the buffer is stored to a particular place in the memory.
So if a consumer thread checks periodically the size of the buffer you have to use a mutex to lock the concurrent access.

wysota
10th January 2012, 16:06
The consumer should never check the size of the buffer. That's what you have a semaphore for, right? And the value will be strictly for information purposes only anyway as it can change immediately after being read.

^Nisok^
11th January 2012, 12:17
yep... after several hours it crashed again here is the debugger information:

it crashed on this point


iBuffer->pop_front();


on the pop_front() at the destroy_item


void pop_front() {
BOOST_CB_ASSERT(!empty()); // check for empty buffer (front element not available)
destroy_item(m_first);
increment(m_first);
--m_size;
}


at invalidate_iterators


void destroy_item(pointer p) {
m_alloc.destroy(p);
#if BOOST_CB_ENABLE_DEBUG
invalidate_iterators(iterator(this, p));
std::memset(p, cb_details::UNINITIALIZED, sizeof(value_type));
#endif
}


at remove(p,previous)


void invalidate_iterators(const Iterator& it) {
const debug_iterator_base* previous = 0;
for (const debug_iterator_base* p = m_iterators; p != 0; p = p->next()) {
if (((Iterator*)p)->m_it == it.m_it) {
p->invalidate();
remove(p, previous);
continue;
}
previous = p;
}
}


at m_iterators = m_iterators->next();


//! Remove the current iterator from the iterator chain.
void remove(const debug_iterator_base* current,
const debug_iterator_base* previous) const {
if (previous == 0)
m_iterators = m_iterators->next();
else
previous->set_next(current->next());
}
};




inline const debug_iterator_base* debug_iterator_base::next() const { return m_next; }

wysota
11th January 2012, 13:08
Are you protecting the buffer itself from concurrent access? Or is it thread-safe already? Because if not then it might be happening that you are modifying the buffer from two threads at once causing it to lose coherence. Surround the calls to push_back and pop_front with QMutex::lock() and QMutex::unlock() and all should be fine.

^Nisok^
11th January 2012, 13:31
Its not a thread safe buffer.

If I put mutexes (which I had) what is the purpose of the semaphores?

wysota
11th January 2012, 14:19
To prevent buffer underruns/overruns and get rid of the producer and consumer constantly checking whether there is work available for them.

The semaphore doesn't grant exclusive access. It makes sure no more than N threads can access the section (where N is the value of the semaphore).