Hello all!
I am trying to implement a server application using a thread pool. I'm using QTcpSocket to handle the connections between clients and server.
I have a very rough sketch and I would like to have your opinions, on what I could/should change to reach best solution.
So far I have a WorkerThread that should be in charge of processing clients requests. I also have a ThreadPool class that inherits QTcpServer and reimplements incomingConnection(int socket_descriptor).
ThreadPool has a QList of WorkerThreads that are started in the constructor (ThreadPool(QObject parent = 0 ...).
ThreadPool
const unsigned int max_worker_threads;
{
public:
block_size(0),
active_threads(0) {
foreach(WorkerThread worker, worker_threads) {
worker.start();
connect(&worker,
SIGNAL(thread_finished
(QTcpSocket *)),
this,
SLOT(worker_thread_finished
(QTcpSocket *)));
}
}
~ThreadPool() {
}
//signals:
slots:
void pass_job() {
QTcpSocket *_socket
= static_cast<QTcpSocket
*>
(sender
());
if (block_size == 0) {
if (_socket->bytesAvailable() < (int)sizeof(quint16))
return;
}
in >> block_size;
if (_socket->bytes_available() < block_size) {
return;
}
//there is enough data in the stream
//acquire lock
//mutex.lock();
if (active_threads < max_threads) {
active_threads++;
pending_requests.append(_socket);
}
else {
//add "job" to the list
pending_requests.append(_socket);
}
//release lock
//mutex.unlock();
//wake on worker_thread to process the request
wait.wakeOne();
}
//mutex.lock();
active_threads--;
pending_requests.removeAll(socket);
//mutex.unlock();
//responde to the next request
serve_next_request();
}
private:
void incomingConnection(int socket_descriptor) {
client_list.append(socket);
connect(socket, SIGNAL(readyRead()), this, SLOT(pass_job()));
}
void serve_next_request() {
if (!pending_requests.isEmpty() && active_threads < max_worker_threads) {
wait.wakeOne();
}
}
QList<WorkerThread *> work_threads;
QList<QTcpSocket *> client_list;
QList<QTcpSocket *> pending_requests;
//QMutex mutex;
quint16 block_size;
unsigned int active_threads;
...
};
const unsigned int max_worker_threads;
QWaitCondition wait;
class ThreadPool : public QTcpServer
{
public:
ThreadPool(QObject *parent = 0)
: QTcpServer(parent),
block_size(0),
active_threads(0) {
foreach(WorkerThread worker, worker_threads) {
worker.start();
connect(&worker, SIGNAL(thread_finished(QTcpSocket *)),
this, SLOT(worker_thread_finished(QTcpSocket *)));
}
}
~ThreadPool() {
}
//signals:
slots:
void pass_job() {
QTcpSocket *_socket = static_cast<QTcpSocket *>(sender());
QDataStream in(_socket);
in.setVersion(QDataStream::Qt_4_4);
if (block_size == 0) {
if (_socket->bytesAvailable() < (int)sizeof(quint16))
return;
}
in >> block_size;
if (_socket->bytes_available() < block_size) {
return;
}
//there is enough data in the stream
//acquire lock
//mutex.lock();
if (active_threads < max_threads) {
active_threads++;
pending_requests.append(_socket);
}
else {
//add "job" to the list
pending_requests.append(_socket);
}
//release lock
//mutex.unlock();
//wake on worker_thread to process the request
wait.wakeOne();
}
void worker_thread_finished(QTcpSocket *socket) {
//mutex.lock();
active_threads--;
pending_requests.removeAll(socket);
//mutex.unlock();
//responde to the next request
serve_next_request();
}
private:
void incomingConnection(int socket_descriptor) {
QTcpSocket *socket = new QTcpSocket(socket_descriptor);
client_list.append(socket);
connect(socket, SIGNAL(readyRead()), this, SLOT(pass_job()));
}
void serve_next_request() {
if (!pending_requests.isEmpty() && active_threads < max_worker_threads) {
wait.wakeOne();
}
}
QList<WorkerThread *> work_threads;
QList<QTcpSocket *> client_list;
QList<QTcpSocket *> pending_requests;
//QMutex mutex;
quint16 block_size;
unsigned int active_threads;
...
};
To copy to clipboard, switch view to plain text mode
WorkerThread
class WorkerThread
: public QThread{
public:
//do initialization
}
~WorkerThread() {
}
void run() {
while(1) {
mutex.lock();
wait.wait(&mutex);
//read data from socket and process request
emit thread_finished();
mutex.unlock();
}
}
signals:
void thread_finished();
private:
//other variables
}
class WorkerThread : public QThread
{
public:
WorkerThread(QObject parent = 0) : QThread(parent) {
//do initialization
}
~WorkerThread() {
}
void run() {
while(1) {
mutex.lock();
wait.wait(&mutex);
//read data from socket and process request
emit thread_finished();
mutex.unlock();
}
}
signals:
void thread_finished();
private:
QMutex mutex;
//other variables
}
To copy to clipboard, switch view to plain text mode
(I know the code is very incomplete)
Now I have this problem: How can I pass the socket to the working_thread in order for it to process that request??
Any comments to my "implementation" are greatly appreciated!
Thank you very much.
Bookmarks