PDA

View Full Version : Implementation considerations for data flow pattern



redBeard
14th December 2010, 21:45
I created a signal processing (data flow) engine in Java and now need to convert it to C++ and Qt. It's been a few years since I've done significant C++ programming and this is my first program using Qt.

The Java engine has the following characteristics:

1) consists of a number of data processing elements (some data transform and some display elements)

2) elements each run in their own thread. Connection between engine and elements is using the Observable and Observer Java classes.

3) elements communicate with each other using BlockingQueues.

4) The engine can 'interrupt' each element as they're waiting on the blocking queue for data. Elements (threads) basically wind down in a controlled manner when an interrupt is received.

5) the data 'sent down the pipe' consists of a large number of small messages. Messages are generated by devices every 15 - 30 ms and consist of a dozen numbers.

This all works very well in Java. The problem is my graphics elements (implemented in OpenGL) are too inefficient and the engine/elements are consuming a goodly portion of the CPU.

I'm re-writing the engine and elements in C++ and Qt. I have some basic questions about two main areas: engine<->element connection and element->element communication.

For the engine<->element connection, I'm thinking of just implementing a basic multithreaded structure with the engine starting each element and then doing something to terminate them (and then 'wait()' for them to complete).

For the element->element communication mechanism, I'll use either a basic mutex on an STL queue or a QSemaphore, not sure which.

However, can the engine 'interrupt' a QSemaphore::acquire() method (if I go that route)? Elements will mostly just be waiting for data from their upstream partners. I would like the engine to just interrupt that call and allow them to gracefully wind down.

If I use a mutex on an STL queue, I'll probably use a QWaitCondition and 'wait()' on that object for the upstream partner to add data to the queue. Again, can the engine 'interrupt' threads if they're in the middle of a QWaitCondition::wait() call?

That's about it. I can't seem to find any doc on interrupting Qt-based threads if they're waiting on QSemaphore or QWaitCondition.... thanks for the help.

r

wysota
15th December 2010, 01:37
A short remark. I would be against using threads here unless any of the operations is really time consuming. And even if it is, then I would only delegate this particular task (when it is actually being performed) to a worker thread and I'd keep all other things in one thread. Why? To avoid wasting CPU time on context switching and thread synchronization. Don't use threads just because you can. Use them where doing that gives you an actual, real and significant advantage. In other cases use Qt's event system and signal/slot mechanism to communicate objects together. They will provide appropriate synchronization for you.

redBeard
15th December 2010, 21:55
A short remark. I would be against using threads here unless any of the operations is really time consuming. And even if it is, then I would only delegate this particular task (when it is actually being performed) to a worker thread and I'd keep all other things in one thread. Why? To avoid wasting CPU time on context switching and thread synchronization. Don't use threads just because you can. Use them where doing that gives you an actual, real and significant advantage. In other cases use Qt's event system and signal/slot mechanism to communicate objects together. They will provide appropriate synchronization for you.

Hmmm. thanks. I will have have at least a couple of threads - one for the main engine and most of the elements and probably one for the device reader elements as they're pretty much reading all of the time, anyway.

I'll look closer at the event n signal/slot mechanisms for communication mechanisms. The STL queue with a QWaitCondition structure around it seems like a very elegant solution. I have no problems with running a highly parallel application - the current Java incarnation works just fine (except for the graphics pipeline...).

Added after 1 3 minutes:


Hmmm. thanks. I will have have at least a couple of threads - one for the main engine and most of the elements and probably one for the device reader elements as they're pretty much reading all of the time, anyway.

I'll look closer at the event n signal/slot mechanisms for communication mechanisms. The STL queue with a QWaitCondition structure around it seems like a very elegant solution. I have no problems with running a highly parallel application - the current Java incarnation works just fine (except for the graphics pipeline...).

Another quick question: What is the lifecycle of a message block sent from one object to a bunch of other objects using signals/slots?

I'll be sending 1000s of little message blocks from element to element in my engine. The design of the engine is such that an element doesn't know who is receiving its messages - the element-to-element flow is built up at runtime.

So if one element builds up a message block and emits a signal with that block and 10 other elements are connected to that element/signal, are 10 copies of the message block created n delivered or is one (as a reference).

I would like to build up a reusable memory pool of these message blocks to cut down on message allocation/destruction.

thanks again. r

wysota
16th December 2010, 09:12
most of the elements and probably one for the device reader elements as they're pretty much reading all of the time, anyway.
There is no such thing as "all the time" in computer science. Threads themself don't make your application faster so adding a thread will not cause that your program will be able to read more messages per second.


The STL queue with a QWaitCondition structure around it seems like a very elegant solution.
It gives you a single point of synchronization for all threads -- all have to stop here so that one can read or write to the queue. In what way is it different from having one thread? If you are going to manipulate the queue "all the time" as you said earlier, then stopping at the mutex will be occuring "all the time" too.


I have no problems with running a highly parallel application - the current Java incarnation works just fine (except for the graphics pipeline...).
I love when someone says "it works but...".


Another quick question: What is the lifecycle of a message block sent from one object to a bunch of other objects using signals/slots?
The lifespan is the lifespan of the object being sent. If you send a pointer then it will be alive until someone deletes it, if you send an object then it will be alive until it goes out of scope.


I'll be sending 1000s of little message blocks from element to element in my engine. The design of the engine is such that an element doesn't know who is receiving its messages - the element-to-element flow is built up at runtime.
That's what happens with signals and slots too.


So if one element builds up a message block and emits a signal with that block and 10 other elements are connected to that element/signal, are 10 copies of the message block created n delivered or is one (as a reference).
It depends whether you are sending a reference or a copy. If you'll be emitting across threads then regardless of whether you send a reference or a copy, a copy will have to be made.


I would like to build up a reusable memory pool of these message blocks to cut down on message allocation/destruction.
I don't know what your blocks look like but the first step I would do would be to make the objects implicitly shared. Whether you use some kind of pool for the data or not, that's another issue.

redBeard
16th December 2010, 17:03
There is no such thing as "all the time" in computer science. Threads themself don't make your application faster so adding a thread will not cause that your program will be able to read more messages per second.
Here's more information about the application. The engine is connected to medical devices that produce data and make it available to the device's driver at roughly 64Hz frequency. If an application has posted a read to the device driver at the time data is available, the application receives the data. If no application is posting a read to the driver, the data is lost. Losing data is a bad thing. Devices can also take up to 30 seconds to 'settle' from the time they're told to start sending data.

While the device element is reading from the device and collecting data (and more than likely sending the data down the pipeline) the user can be doing other things in the application, which is graphical in nature.

A 'device thread' seems like the way to go in this case. However, I am certainly willing to entertain other application designs to solve this problem.


It gives you a single point of synchronization for all threads -- all have to stop here so that one can read or write to the queue. In what way is it different from having one thread? If you are going to manipulate the queue "all the time" as you said earlier, then stopping at the mutex will be occuring "all the time" too.

In the Java-based implementation, each connection between elements have their own data queue. Are you saying the overhead of the data-queue-per-element-connection is slower/less efficient than the signal/slot mechanism?

The time required to process the data in the flow is different for each element. Some may process data quickly and some may take a bit longer (e.g., graphical display elements). When we built the original engine, we weren't sure of the relative speed of each element as they processed data. We now know that all (current) data processing elements can process a block of data much faster than 15.6 ms.

The exception are the display elements when trying to display 10 or so scenes on stinky small graphics cards 30 - 35 frames/second. Consequently, we separated the processing of the data from these elements from the 'display refresh' action.


The lifespan is the lifespan of the object being sent. If you send a pointer then it will be alive until someone deletes it, if you send an object then it will be alive until it goes out of scope.

Thanks much for the remainder of your responses.
r

wysota
16th December 2010, 18:01
The engine is connected to medical devices that produce data and make it available to the device's driver at roughly 64Hz frequency.
So not "all the time" but rather "about 60 times per second" which is rougly "every 16ms". If we assume you have a 1GHz RISC CPU (or equivalent) it means it can do 1E9 cycles per second which gives 1M cycles per milisecond meaning that between receiving two messages your machine can perform 16 million instruction cycles. That's quite a lot.


A 'device thread' seems like the way to go in this case.
"Seems" based on what ground? What is the rationale of using a separate thread just for that? It's not that using a second thread will give significantly more time to your application (unless of course you have more cpu cores than active processes/threads in your system which is unlikely).


However, I am certainly willing to entertain other application designs to solve this problem.
The thing is you don't need any special design. If you start thinking asynchronously (events, scheduling) instead of synchronously (threads, wait conditions) there will be plenty of time for everything.


In the Java-based implementation, each connection between elements have their own data queue. Are you saying the overhead of the data-queue-per-element-connection is slower/less efficient than the signal/slot mechanism?
I'm saying that when you have a shared resource and the whole operation of threads is based on accessing the resource then threads will almost permanently be locked waiting for the resource. In a broader view you'll get a similar performance than when using a single thread. It might make sense to use multiple threads if accessing the shared resource is sporadic or is only a percentage of what the thread is doing. Or if you don't care about resources and execution time.


The time required to process the data in the flow is different for each element. Some may process data quickly and some may take a bit longer (e.g., graphical display elements). When we built the original engine, we weren't sure of the relative speed of each element as they processed data. We now know that all (current) data processing elements can process a block of data much faster than 15.6 ms.
Processing the data is a different issue. If you don't care that much whether the data will be processed immediately or in a couple of miliseconds then you can happily offload that to a pool of threads (a thread per core). My main argument is that it doesn't make much sense to read the data in a dedicated thread.

The basic idea of using threads is that the most efficient situation is when you have as many threads (in the system but we can interpolate that to a single app too) as you have cores in the machine. This is because that's a situation when the least context switching takes place and the load is perfectly balanced among cores. If you have more (active) threads than execution units then effectively you get less speed because of context switching, scheduling, synchronization and stuff like that. If you have many sleeping threads (like when waiting for data to be available in some device) that do nothing 99% of the time then that's a waste of resources as you could stuff 100 such threads in a single thread and save on all the scheduling, switching and other things. Plus you save your time designing, implementing and testing plus you get less error-prone code as threading is difficult to debug regardless of anything.

redBeard
16th December 2010, 18:22
The thing is you don't need any special design. If you start thinking asynchronously (events, scheduling) instead of synchronously (threads, wait conditions) there will be plenty of time for everything.
Fair enough. I believe the device layer has an async read method. I'll definitely investigate that.

The only other issue is the device startup time. Like I mentioned, some devices take up to 30 seconds to settle once told to start sending data. If I tell the device element to start reading but then the user has to wait 30 seconds before starting the test, they may complain. The engine is started and stopped a half dozen times or more for a single test.

That's a big reason why the current Java-based engine uses a separate thread for the device element. I can start that once at the beginning of the test series and users can run a number of small tests quickly w/out having to wait for the device to start for each test.

wysota
16th December 2010, 21:12
The only other issue is the device startup time. Like I mentioned, some devices take up to 30 seconds to settle once told to start sending data. If I tell the device element to start reading but then the user has to wait 30 seconds before starting the test, they may complain. The engine is started and stopped a half dozen times or more for a single test.
I don't really see what the use of threads has to do with this. If your device requires 30 seconds to "warm up" then data will start flowing after 30 seconds regardless if you use threads or not.


That's a big reason why the current Java-based engine uses a separate thread for the device element. I can start that once at the beginning of the test series and users can run a number of small tests quickly w/out having to wait for the device to start for each test.
I don't see how not using threads changes that.

redBeard
16th December 2010, 22:02
I don't really see what the use of threads has to do with this. If your device requires 30 seconds to "warm up" then data will start flowing after 30 seconds regardless if you use threads or not.
I didn't say warm up, I said settle. They start sending data immediately and up to the first 30 seconds are invalid after they're told to start sending data. And I can't just not read or toss that data; it is used to synchronize the flow of data from the device. I also continually algorithmically re-sync the device data as the engine is running (every 5 seconds) to ensure data integrity.

I'll take a look to make sure the device I/O library allows for asyn read posting but I'm pretty sure it does. Its been 18 months since I wrote the code that accesses that library.

wysota
16th December 2010, 22:14
And I can't just not read or toss that data;
I really don't see the problem. The only difference between doing that in a separate thread or not is that you use blocking ::read() or select() and read(). I doubt you even need asynchronous ::read() for that. You only need to support select/poll/whatever... Qt provides a nice layer on top of that.

marcvanriet
16th December 2010, 23:09
Just wondering...

Do you communicate with the 'device' using 1 communication link and is the data of the different 'device elements' received in different packets over this communication link ?

Or do you have a separate link to each of the 'device elements' ? In that case, different threads to handle the communication protocol on each link does make sense.

Or you could also have a 'shared' link like an Ethernet connection and have connection-oriented links with each 'device element' of course. Then I guess 1 thread is OK for the Ethernet connection with different handlers for each 'device element' that can all live together in a second thread.

Regards,
Marc

wysota
16th December 2010, 23:33
Or do you have a separate link to each of the 'device elements' ? In that case, different threads to handle the communication protocol on each link does make sense.
How does it make more sense than with one communication channel?

redBeard
17th December 2010, 00:10
Do you communicate with the 'device' using 1 communication link and is the data of the different 'device elements' received in different packets over this communication link ?
I have device elements in the engine for each device. They're connected to the computer via either USB or RS-232. The USB devices have a nice packet-based drivers, whereas the RS-232 device is stream-based. I'm reading from both at the same time (er. to be precise, when they're both powered on and sending data and have available. Both make data ready to the application at roughly 64Hz frequency. The data available frequency for the RS-232 device varies to some degree, rarely is it 15.6ms between successful data reads).

marcvanriet
17th December 2010, 12:58
How does it ... (using multiple threads) ... make more sense than with one communication channel?

It is IMHO easier to implement a communication protocol (sending messages, waiting for reply, handling retries, ...) if you can use blocking read and writes. And if you use blocking functon calls, you must have a thread for each physical communication channel.

Regards,
Marc

wysota
17th December 2010, 13:48
Using blocking calls has downsides on its own. The first problem is when you want to cancel a pending read() because you want to close the application or simply stop listening for incoming data. Since the thread is already blocked on read() you can either use interrupts and handle that situation in the thread (which cause the whole thing not to be so easy anymore) or you have to forcefully terminate the thread which is also bad. Using asynchronous I/O is as easy as using synchronous I/O, it's just a matter of adjusting yourself to the other approach. It's like driving on the left or right side of the road - if you switch, you feel very odd at first but that's just your impression and not real difference.

redBeard
17th December 2010, 16:11
Using blocking calls has downsides on its own. The first problem is when you want to cancel a pending read() because you want to close the application or simply stop listening for incoming data. Since the thread is already blocked on read() you can either use interrupts and handle that situation in the thread (which cause the whole thing not to be so easy anymore) or you have to forcefully terminate the thread which is also bad. Using asynchronous I/O is as easy as using synchronous I/O, it's just a matter of adjusting yourself to the other approach. It's like driving on the left or right side of the road - if you switch, you feel very odd at first but that's just your impression and not real difference.
I agree. The only reason I didn't use async I/O in the old engine was because of the mechanism I used to read the devices. The device 'elements' were Java but all I/O had to be done with low-level C-language libraries accessed via JNI layer.

Implementing async I/O in that environment is rather inefficient - I would have to specify a C routine to be called on successful completion of the read but by then the JNI call had returned. Any C routine called back from the I/O library at that point would have to call the Java layer and that is very inefficient.

Testing showed I could call a C-language library from Java via JNI about 1.3 million times/second that included a small (1K bytes) buffer copy. I didn't test the reverse (calling Java from C) because all of the doc I found on it said don't do it....

wysota
17th December 2010, 16:16
Is the device driver layer your own implementation? What system are you running it on?

redBeard
17th December 2010, 16:33
Is the device driver layer your own implementation? What system are you running it on?
the system drivers come from the device manufacturers.

We ship on Windows systems.

I access the devices via the Windows Device Services library (e.g., CreateFile(), DeviceIoControl(), CloseHandle()). I created a thin JNI package on top of that library.

A quick read of the doc for that library does, indeed, include async I/O capabilities. However, it also includes this quote:


However, for relatively fast I/O operations, the overhead of processing kernel I/O requests and kernel signals may make asynchronous I/O less beneficial, particularly if many fast I/O operations need to be made. In this case, synchronous I/O would be better.

In other words, someone/something is paying the price for the two different types of I/O. For synchronous I/O I would implement a thread for the device element. For async I/O, the kernel has to perform more work. Which is more efficient? Looks like only a fair amount of testing would answer that question...

wysota
17th December 2010, 16:51
In general the kernel is always more efficient than a user process. I don't know what the authors of the library have in mind, maybe they messed up the implementation somehow. But again, it's highly probable you don't need async I/O at all. It's enough to support the equivalent of select()/poll(), the read itself can then be synchronous since you know it will return immediately because there is some data available.

redBeard
11th January 2011, 21:23
All right, I finally got back to this project and did some timings.

The application: one (data) producer and from one to ten consumers.

The tests: one version uses signals/slots between producer and consumer(s) and another version is a multi-threaded beast that uses semaphore-based control to a shared cyclic buffer (1000 elements in size) with with separate threads/element.

The systems: Linux (kernel 2.6.34, 3.0GHz dual core, 4 GB memory), and Windows Vista (same hardware but running Vista).

test 1: send 10M 8-byte messages from single producer to single consumer using signal/slot mechanism
Linux: 3.7 seconds (2.702M messages/second). Vista: 6.06 seconds (1.651M messages/second).

test 2: send 10M 8-byte messages from single producer to 10 consumers using signal/slot mechanism
Linux: 16.96 seconds (589K messages/second). Vista: 28.47 seconds (351K messages/second)

test 3: send 10M 8-bytes messages from single producer to single consumer using semaphore-controlled shared buffer (1000 elements).
Linux: 21.09 seconds (474K messages/second). Vista: 50.9 seconds (196K messages/second).

I didn't even try the single producer-to-multiple consumer test. Didn't seem worth it.....

munch, munch (sound of me eating my shirt...). Thanks for everyone's advice.
r

wysota
12th January 2011, 00:25
Just verify your test is reliable. See what is taking the most time in each situation. It might be that there is a bottleneck somewhere that is easy to fix and that could change your test results.