PDA

View Full Version : Client-Server application using thread pool - implementation suggestions



doctore
22nd July 2009, 13:03
Hello all!

I am trying to implement a server application using a thread pool. I'm using QTcpSocket to handle the connections between clients and server.

I have a very rough sketch and I would like to have your opinions, on what I could/should change to reach best solution.

So far I have a WorkerThread that should be in charge of processing clients requests. I also have a ThreadPool class that inherits QTcpServer and reimplements incomingConnection(int socket_descriptor).

ThreadPool has a QList of WorkerThreads that are started in the constructor (ThreadPool(QObject parent = 0 ...).


ThreadPool



const unsigned int max_worker_threads;
QWaitCondition wait;

class ThreadPool : public QTcpServer
{

public:
ThreadPool(QObject *parent = 0)
: QTcpServer(parent),
block_size(0),
active_threads(0) {

foreach(WorkerThread worker, worker_threads) {
worker.start();
connect(&worker, SIGNAL(thread_finished(QTcpSocket *)),
this, SLOT(worker_thread_finished(QTcpSocket *)));
}
}


~ThreadPool() {

}

//signals:

slots:
void pass_job() {

QTcpSocket *_socket = static_cast<QTcpSocket *>(sender());

QDataStream in(_socket);
in.setVersion(QDataStream::Qt_4_4);

if (block_size == 0) {
if (_socket->bytesAvailable() < (int)sizeof(quint16))
return;
}

in >> block_size;

if (_socket->bytes_available() < block_size) {
return;
}

//there is enough data in the stream

//acquire lock
//mutex.lock();
if (active_threads < max_threads) {
active_threads++;
pending_requests.append(_socket);
}
else {
//add "job" to the list
pending_requests.append(_socket);
}
//release lock
//mutex.unlock();

//wake on worker_thread to process the request
wait.wakeOne();

}

void worker_thread_finished(QTcpSocket *socket) {
//mutex.lock();
active_threads--;
pending_requests.removeAll(socket);
//mutex.unlock();
//responde to the next request
serve_next_request();
}



private:

void incomingConnection(int socket_descriptor) {
QTcpSocket *socket = new QTcpSocket(socket_descriptor);
client_list.append(socket);
connect(socket, SIGNAL(readyRead()), this, SLOT(pass_job()));
}

void serve_next_request() {

if (!pending_requests.isEmpty() && active_threads < max_worker_threads) {
wait.wakeOne();
}
}


QList<WorkerThread *> work_threads;
QList<QTcpSocket *> client_list;
QList<QTcpSocket *> pending_requests;

//QMutex mutex;
quint16 block_size;
unsigned int active_threads;
...

};
WorkerThread



class WorkerThread : public QThread
{

public:
WorkerThread(QObject parent = 0) : QThread(parent) {
//do initialization

}

~WorkerThread() {

}

void run() {
while(1) {
mutex.lock();
wait.wait(&mutex);

//read data from socket and process request

emit thread_finished();
mutex.unlock();
}
}

signals:
void thread_finished();


private:
QMutex mutex;
//other variables
}


(I know the code is very incomplete)
Now I have this problem: How can I pass the socket to the working_thread in order for it to process that request??

Any comments to my "implementation" are greatly appreciated!

Thank you very much.

yogeshgokul
22nd July 2009, 13:24
Now I have this problem: How can I pass the socket to the working_thread in order for it to process that request??
Have you seen the ThreadedFortuneServer qt example. I think this is very similar.

doctore
22nd July 2009, 13:42
Have you seen the ThreadedFortuneServer qt example. I think this is very similar.

Thanks for your reply.

But I don't think that the problems are very similar.

The ThreadedFortuneServer, like the suggests uses threads, but in a different manner. For each new connection one new thread is created; the client request is answered and the thread terminates its execution.

Now, what I want is something like this:

One list of threads, that when there aren't any requests to process are sleeping (qwaitcondition). When a new request arrives one of the threads is awaken. The request is processed and the thread goes to sleep again.

wysota
22nd July 2009, 14:15
Have a look at QThreadPool and QRunnable. I think they are exactly for you.

doctore
22nd July 2009, 15:30
Have a look at QThreadPool (http://doc.trolltech.com/latest/qthreadpool.html) and QRunnable (http://doc.trolltech.com/latest/qrunnable.html). I think they are exactly for you.


Thank you!

I did as you suggested, and read the docs on QThreadPool and QRunnable.

I have this sketch and a few doubts.



#include <QThreadPool>
#include <QRunnable>

const unsigned int num_threads = 5;


class WServer : public QTcpServer
{

public:
WServer(QObject *parent = 0) : QTcpServer(parent) {

thread_pool = new QThreadPool(num_threads);

}

slots:
process_request() {

socket = static_cast<QTcpSocket *>(sender());
RequestProcessor *request_processor =
new RequestProcessor(socket->socketDescriptor());
connect(request_processor, SIGNAL(process_finished()),
this, SLOT(process_finished()));
thread_pool().start(request_processor);

}

process_finished() {
//process_finished
}

private:
void incomingConnection(int socket_descriptor) {
QTcpSocket *_socket = new QTcpSocket(socket_descriptor);
client_list.append(_socket);
connect(_socket, SIGNAL(disconnected()),
this, SLOT(deleteLater()));
connect(_socket, SIGNAL(readyRead()),
this, SLOT(process_request()));
}

QTcpSocket *socket;
QThreadPool *thread_pool;
QList<QTcpSocket *> client_list;

};


class RequestProcessor : public QRunnable
{

public:
RequestProcessor(QObject parent = 0, int descriptor)
: QRunnable(parent),
socket_descriptor(descriptor) {


}
signals:
void process_finished();


private:
void run() {
QTcpSocket *socket = new QTcpSocket(this);
socket->setSocketDescriptor(descriptor);
// identify type of request
// access database
// send response
qDebug() << "in thread: " << QThread::currentThread();
emit process_finished();
}

//private variables
int socket_descriptor;

};

(I should start by saying that I'm am not in front of a computer with an installation of Qt, and the that "code" I've written is certainly not functional, :wink:)

The final application should query a MySql database in response to the clients requests and send the appropriate response.

The main doubts I have are:

1 - Should I create one connection to the database per thread (possibly in the run() function, or the constructor of the class) or one connection is sufficient (created in the WServer class)?

2 - Can I access members variables of the WServer class in the run function of the QRunnable class instances?? (With the access protected with QMutex). For instance, if one of the clients request the following information: what is the number of clients connected in this moment?? The run method needs to access client_list->size()

wysota
22nd July 2009, 15:50
1 - Should I create one connection to the database per thread (possibly in the run() function, or the constructor of the class) or one connection is sufficient (created in the WServer class)?
You need to have a dedicated connection for each thread. I'm not sure if creating them in run() if you're going to use QRunnable is a good solution. As far as I understand it a single thread can run more than one runnable. If that was the case, it would be best to store connections outside the runnable so that if a runnable is ran on a thread that used to handle some other runnable, it can pick up the connection the previous runnable created. That's purely for efficiency reasons so you can ignore it and have the connection created and destroyed within the runnable.


2 - Can I access members variables of the WServer class in the run function of the QRunnable class instances?? (With the access protected with QMutex). For instance, if one of the clients request the following information: what is the number of clients connected in this moment?? The run method needs to access client_list->size()

Well... that depends. If you design it properly then yes. What might be troubling is that you might cause a deadlock with improper locking. The controlling thread should be operative all the time. But if you don't intend to block it, you'll be safe.

doctore
22nd July 2009, 16:13
You need to have a dedicated connection for each thread. I'm not sure if creating them in run() if you're going to use QRunnable is a good solution. As far as I understand it a single thread can run more than one runnable. If that was the case, it would be best to store connections outside the runnable so that if a runnable is ran on a thread that used to handle some other runnable, it can pick up the connection the previous runnable created. That's purely for efficiency reasons so you can ignore it and have the connection created and destroyed within the runnable.

Could you elaborate on this, please?

Also, when you say that it's best to store the connections outside the runnable where precisely are you refering, and how would I inform the runabble instance which connection to use when accessing the database?

Thank very much for the all the information.

wysota
22nd July 2009, 19:38
Could you elaborate on this, please?

Also, when you say that it's best to store the connections outside the runnable where precisely are you refering, and how would I inform the runabble instance which connection to use when accessing the database?

There are two obvious possibilities:

One is storing the connection in the runnable.

class MyRunnable : public QRunnable {
public:

void run(){
QSqlDatabase db = QSqlDatabase::addDatabase("...", "...");
doSomethingWith(db);
QSqlDatabase::removeDatabase(db.connectionName());
}
};

The other is to have a map of per-thread connections

QMap<Qt::HANDLE, QSqlDatabase> databases;

class MyRunnable : public QRunnable {
public:
void run(){
Qt::HANDLE curThr = QThread::currentThreadId();
QSqlDatabase db;
if(databases.contains(curThr)){
db = databases[curThr];
} else {
db = QSqlDatabase::addDatabase(..., ...);
databases[curThr] = db;
}
}
};

In this case you'd need some code to cleanup databases after threads are destroyed. QThreadStorage can probably be used for that.

Of course the second approach only makes sense if threads are not destroyed immediately after the runnable ends but are instead reused for future runnables. That needs to be verified first.

doctore
22nd July 2009, 22:09
There are two obvious possibilities:

One is storing the connection in the runnable.

class MyRunnable : public QRunnable {
public:

void run(){
QSqlDatabase db = QSqlDatabase::addDatabase("...", "...");
doSomethingWith(db);
QSqlDatabase::removeDatabase(db.connectionName());
}
};The other is to have a map of per-thread connections

QMap<Qt::HANDLE, QSqlDatabase> databases;

class MyRunnable : public QRunnable {
public:
void run(){
Qt::HANDLE curThr = QThread::currentThreadId();
QSqlDatabase db;
if(databases.contains(curThr)){
db = databases[curThr];
} else {
db = QSqlDatabase::addDatabase(..., ...);
databases[curThr] = db;
}
}
};In this case you'd need some code to cleanup databases after threads are destroyed. QThreadStorage (http://doc.trolltech.com/latest/qthreadstorage.html) can probably be used for that.

Of course the second approach only makes sense if threads are not destroyed immediately after the runnable ends but are instead reused for future runnables. That needs to be verified first.


Once more thank you very much!

One final thing, I created a small example using QThreadPool and QRunnable.
The app is just a dialog with 4 buttons. Once any of the buttons are clicked one instance of MyRunnable class is created and passed to the start function of the ThreadPool.


the run() function consists of a loop which prints the text of the clicked button.



void MyRunnable::run() {

for(int i = 0; i < 10000; i++)
qDebug() << button_text;
}
Now, what happens if for instance, I click on the "Button A" followed by "Button B" and "BUTTON C", is that 10000 lines of "Button A" are printed followed by 10000 lines of "Button B", and 10000 lines of "BUTTON C".



BUTTON A
BUTTON A
...
BUTTON A
BUTTON B
BUTTON B
...
BUTTON B
BUTTON C
BUTTON C
...
BUTTON C
Shouldn't the output be more like:


BUTTON A
BUTTON B
BUTTON A
BUTTON C
BUTTON B
.....
It seems that the requests are processed sequentially when they should be "simultaneously". Any ideas??

wysota
22nd July 2009, 22:58
Did you set the desired number of threads in the pool? If not, then if you have a single core machine, you'll have a single thread in the pool which will result in other jobs being queued until previous ones finish.

doctore
22nd July 2009, 23:52
:o

I forgot it.
After adding the instruction setMaxThreadCount(3) everything works perfectly now.

Thank you!