darksaga
20th August 2007, 15:10
hi all,
in my application I'd like to split the processing of some of my functions into several threads.
Several "workerthreads" should be processed simultaneously, increasing spead especially on multicore systems, and when accessing databases.
The "workerthreads" should be observerd & managed by a "observerthread".
in general it should look like:
worker1---
|
worker2-----<--observer---<---maineventLoop
|
worker3---
etc.
I've already implemented a minimum example, and would like to hear your opinions on how to improve my idea in terms of speed, threadsafety and adaptability.
i'm sure you'll find some glitches :rolleyes: , but its the first time i use the qt threading system in an expanded way...
simple main
#include "QueryThread.h"
#include <QApplication>
int main(int argc, char *argv[])
{
QApplication app(argc, argv);
QueryThread ob(2);
ob.start();
return app.exec();
}
observer
#ifndef QUERYTHREAD_H
#define QUERYTHREAD_H
#include <QThread>
#include <QList>
#include <QWaitCondition>
#include <QMutex>
#include <WorkerThread.h>
class QueryThread : public QThread
{
Q_OBJECT
public:
QueryThread(int maxWorkers = 1, QObject *parent = 0) : QThread(parent){
_maxWorkers = maxWorkers;
_currentWorkers = 0;
}
~QueryThread (void){
for (int i = 0; i < _list.size(); i++){
_list.at(i)->terminate();
_list.at(i)->wait();
delete _list.at(i);
}
_list.clear();
}
void run(){
for (int i = 0; i < 10; i++){
_mutex.lock();
if(_currentWorkers >= _maxWorkers)
_wait.wait(&_mutex);
qDebug("Query %d", i);
createWorker();
_mutex.unlock();
}
}
public slots:
void workerThreadFinsihed(int id){
QMutex mutex;
mutex.lock();
_currentWorkers--;
freeList();
_wait.wakeOne();
mutex.unlock();
}
signals:
void queryFinished();
private:
void createWorker(void){
static int testId = 0;//testid
WorkerThread *work = new WorkerThread(testId++);
_currentWorkers++;
connect(work, SIGNAL(workFinished(int)), this, SLOT(workerThreadFinsihed(int)));
_list.append(work);
work->start();
}
void freeList(void){
for (int i = 0; i < _list.size(); i++){
if (_list.at(i)->isFinished()){
delete _list.at(i);
_list.removeAt(i);
}
}
}
QList<WorkerThread*> _list;
QWaitCondition _wait;
QMutex _mutex;
int _maxWorkers;
int _currentWorkers;
};
#endif
worker
#ifndef WORKERTHREAD_H
#define WORKERTHREAD_H
#include <QThread>
class WorkerThread : public QThread
{
Q_OBJECT
public:
WorkerThread(int id = 0, QObject *parent = 0) : QThread(parent){
_id = id;
}
~WorkerThread(void){qDebug("Worker %d delete", _id);}
void run(){
for (int i = 0; i < 5; i++){
qDebug("Worker %d, iteration %d", _id, i);
sleep(1);
}
emit workFinished(_id);
}
signals:
void workFinished(int id);
private:
int _id;
};
#endif
regards
in my application I'd like to split the processing of some of my functions into several threads.
Several "workerthreads" should be processed simultaneously, increasing spead especially on multicore systems, and when accessing databases.
The "workerthreads" should be observerd & managed by a "observerthread".
in general it should look like:
worker1---
|
worker2-----<--observer---<---maineventLoop
|
worker3---
etc.
I've already implemented a minimum example, and would like to hear your opinions on how to improve my idea in terms of speed, threadsafety and adaptability.
i'm sure you'll find some glitches :rolleyes: , but its the first time i use the qt threading system in an expanded way...
simple main
#include "QueryThread.h"
#include <QApplication>
int main(int argc, char *argv[])
{
QApplication app(argc, argv);
QueryThread ob(2);
ob.start();
return app.exec();
}
observer
#ifndef QUERYTHREAD_H
#define QUERYTHREAD_H
#include <QThread>
#include <QList>
#include <QWaitCondition>
#include <QMutex>
#include <WorkerThread.h>
class QueryThread : public QThread
{
Q_OBJECT
public:
QueryThread(int maxWorkers = 1, QObject *parent = 0) : QThread(parent){
_maxWorkers = maxWorkers;
_currentWorkers = 0;
}
~QueryThread (void){
for (int i = 0; i < _list.size(); i++){
_list.at(i)->terminate();
_list.at(i)->wait();
delete _list.at(i);
}
_list.clear();
}
void run(){
for (int i = 0; i < 10; i++){
_mutex.lock();
if(_currentWorkers >= _maxWorkers)
_wait.wait(&_mutex);
qDebug("Query %d", i);
createWorker();
_mutex.unlock();
}
}
public slots:
void workerThreadFinsihed(int id){
QMutex mutex;
mutex.lock();
_currentWorkers--;
freeList();
_wait.wakeOne();
mutex.unlock();
}
signals:
void queryFinished();
private:
void createWorker(void){
static int testId = 0;//testid
WorkerThread *work = new WorkerThread(testId++);
_currentWorkers++;
connect(work, SIGNAL(workFinished(int)), this, SLOT(workerThreadFinsihed(int)));
_list.append(work);
work->start();
}
void freeList(void){
for (int i = 0; i < _list.size(); i++){
if (_list.at(i)->isFinished()){
delete _list.at(i);
_list.removeAt(i);
}
}
}
QList<WorkerThread*> _list;
QWaitCondition _wait;
QMutex _mutex;
int _maxWorkers;
int _currentWorkers;
};
#endif
worker
#ifndef WORKERTHREAD_H
#define WORKERTHREAD_H
#include <QThread>
class WorkerThread : public QThread
{
Q_OBJECT
public:
WorkerThread(int id = 0, QObject *parent = 0) : QThread(parent){
_id = id;
}
~WorkerThread(void){qDebug("Worker %d delete", _id);}
void run(){
for (int i = 0; i < 5; i++){
qDebug("Worker %d, iteration %d", _id, i);
sleep(1);
}
emit workFinished(_id);
}
signals:
void workFinished(int id);
private:
int _id;
};
#endif
regards