PDA

View Full Version : Problem in synchronization between QThreads



sraju
19th November 2014, 07:51
Dear Experts,

I am working on an application which will exchange the data between the Qt thread and the windows console application. I am developing these applications using MS VS 2008 with Qt plugin in Windows platform.

The design is as follows:


Windows "NamedPipe" is used as IPC for exchaning the data between two applications.
AplicationA

Main GUI application designed using Qt and it has two dedicated QThreads for read and write operations on NamedPipe.
Creates Windows "NamedPipe" in on startup using

m_shPipeHandle = CreateNamedPipe(
lpszPipename, // pipe name
PIPE_ACCESS_DUPLEX, // read/write access
PIPE_TYPE_MESSAGE | // message type pipe
PIPE_READMODE_MESSAGE | // message-read mode
PIPE_WAIT, // blocking mode
PIPE_UNLIMITED_INSTANCES, // max. instances
BUFFER_SIZE, // output buffer size
BUFFER_SIZE, // input buffer size
NMPWAIT_USE_DEFAULT_WAIT, // client time-out
NULL); // default security attribute
Launches "ApplicationB" using

QProcess::startDetached("ApplicationB.exe");

Starts two Qt Threads (one for read operation and another for write operation) using


m_pReaderThread = new QThread;
m_pReader = new CWorker();
m_pWriterThread = new QThread;
m_pWriter = new CWorker();

m_pReader->moveToThread(m_pReaderThread);
m_pWriter->moveToThread(m_pWriterThread);

connect(m_pReaderThread, SIGNAL(started()), m_pReader, SLOT(doReadData()));
connect(m_pReader, SIGNAL(finishedWork()), m_pReaderThread, SLOT(quit()));
connect(m_pReader, SIGNAL(finishedWork()), m_pReader, SLOT(deleteLater()));
connect(m_pReaderThread, SIGNAL(finished()), m_pReaderThread, SLOT(deleteLater()));

connect(m_pWriterThread, SIGNAL(started()), m_pWriter, SLOT(doWriteData()));
connect(m_pWriter, SIGNAL(finishedWork()), m_pWriterThread, SLOT(quit()));
connect(m_pWriter, SIGNAL(finishedWork()), m_pWriter, SLOT(deleteLater()));
connect(m_pWriterThread, SIGNAL(finished()), m_pWriterThread, SLOT(deleteLater()));

connect(m_pSampleUi->btnStartStop, SIGNAL(clicked()), this, SLOT(sltToggleButton()));

m_pReaderThread->start();
m_pWriterThread->start();


The `CWorker::doReadData()` looks as follows:


void CWorker::doReadData()
{
char szBuffer[BUFFER_SIZE];
DWORD cbBytes;
bool bResult = false;

ConnectNamedPipe(CWorker::m_shPipeHandle, NULL);

do {
//Read client message
bResult = ReadFile(
m_shPipeHandle, // handle to pipe
szBuffer, // buffer to receive data
BUFFER_SIZE, // size of buffer
&cbBytes, // number of bytes read
NULL); // not overlapped I/O

if ((!bResult) || (!cbBytes))
{
qDebug()<<"\nError occurred while reading from the client:"<<GetLastError();
// TODO: Action to be taken on failure
}
else
{
qDebug()<<"Client sent following message:"<<szBuffer;
// Inform the Main UI thread to update UI elements or to do necessary action.
emit dataAvailable(QString(szBuffer).toInt());

// Update the shared m_enumCmd so that Writer thread can write the Acknowledgement in NamedPipe so that "ApplicationB" can read & process it.
m_Mutex.lock();
m_enumCmd = COMMAND_ACK_READ;
m_Mutex.unlock();

// Release the semaphore so that writer thread can acquire and starts writing the data (ACK) onto the pipe.
m_Sem.release();
}
} while(m_bReadThreadControl);
}


The `CWorker::doWriteData()` looks as follows:


void CWorker::doWriteData()
{
DWORD cbBytes;
bool bResult = false;
char szBuffer[BUFFER_SIZE];

do
{
m_Sem.acquire();

qDebug()<<"\nInside the Write Loop\n";

switch(m_enumCmd)
{
case COMMAND_PAUSE_SERVICE:
sprintf(szBuffer, "%d", COMMAND_PAUSE_SERVICE);
break;
case COMMAND_RESUME_SERVICE:
sprintf(szBuffer, "%d", COMMAND_RESUME_SERVICE);
break;
case COMMAND_ACK_READ:
sprintf(szBuffer, "%d", COMMAND_ACK_READ);
break;
case COMMAND_UNKNOWN:
default:
szBuffer[0] = '\0';
break;
}

bResult = WriteFile(
m_shPipeHandle, // handle to pipe
szBuffer, // buffer to write from
strlen(szBuffer) + 1, // number of bytes to write, include the NULL
&cbBytes, // number of bytes written
NULL);

if ((!bResult) || (!cbBytes))
{
qDebug()<<"\nError occurred while writing to the client:"<<GetLastError();
// TODO: Action to be taken on failure
}
else
{
qDebug()<<"Written data is: "<<szBuffer;
}

} while (m_bWriteThreadControl);
}









ApplicationB

Its normal windows console application.
Connects to the named pipe using


//Connect to the server pipe using CreateFile()
hPipe = CreateFile(
lpszPipename, // pipe name
GENERIC_READ | GENERIC_WRITE, // read and write access
0, // no sharing
NULL, // default security attributes
OPEN_EXISTING, // opens existing pipe
FILE_ATTRIBUTE_NORMAL, // default attributes
NULL); // no template file

Waits for input from user and writes the user data to the Pipe and then it waits for the acknowledgement from the server.


while (1)
{
// Reset the buffer
szBuffer[0] = 0;

// Wait for the user input and update msgId;

// Store the msg id value to the szBuffer.
sprintf(szBuffer, "%d", msgId);
szBuffer[strlen(szBuffer)] = 0;

//Send the message to server
BOOL bResult = WriteFile(
hPipe, // handle to pipe
szBuffer, // buffer to write from
strlen(szBuffer) + 1, // number of bytes to write, include the NULL
&cbBytes, // number of bytes written
NULL); // not overlapped I/O

if ((!bResult) || (!cbBytes))
{
std::cout<<"\nError occurred while writing to the server:"<<GetLastError();
}
else
{
std::cout<<"\nWriteFile() was successful.";
std::cout<<"\nWritten"<<szBuffer<<"to the Pipe\n";
}

//Read server response
bResult = ReadFile(
hPipe, // handle to pipe
szBuffer, // buffer to receive data
BUFFER_SIZE,
&cbBytes, // number of bytes read
NULL); // not overlapped I/O

if ((!bResult) )//|| (0 == cbBytes))
{
std::cout<<"\nError occurred while reading from the server:"<<GetLastError();
}
else
{
std::cout<<"\nReadFile() was successful.";
std::cout<<"\nServer sent the following message:"<<szBuffer<<std::endl;
}
}




Questions:
My intention is to dedicate a thread exclusively for Read operation and another thread exclusively for Write operation. Even if the ReadThread wants to send any data (say acknowledgement) it should intimate the WriterThread (by releasing semaphore).

a. Is there anything wrong in the way I am trying to synchronize the read and write operations at the Qt Application side.
b. Will there be any problem If we try to synchronze the operations between the the QProcess (Console App) and the QThreads?
c. How do I synchronize the read/write operations without any delay between Qt Application and Console Application.
d. Can you please suggest the better way to achieve this functionality

wysota
19th November 2014, 09:40
Cannot you just use QLocalSocket?