PDA

View Full Version : How to correctly use signal/slot in a data streaming application?



polluxus
8th October 2015, 11:21
I am using signal/slot in a multithread stock trading system. Apart from the main GUI thread, I have a data feed API thread and a data processing thread. The data feed API thread monitors the market data and if data received, signals are emitted. The data processing thread has slots listen to the signals, and simply print the msg in signals. The signals are sent very frequent, about 100 signals per second. Currently, my slots only work for 1 to 2 minutes then stop response to signals if use default connection. If use directconnection, slots work normally, but in the same thread of data API, which is not I want. The followings are codes related to signal/slot communication. Please kindly help, thanks.

PosixIBClient::tickPrice( ) and tickSize() runs in data feed API thread


void PosixIBClient::tickPrice( TickerId tickerId, TickType field, double price, int canAutoExecute)
{
qDebug() << "PosixIBClient: tickString() in thread: " << QThread::currentThreadId();
qDebug() << "tickPrice():" << tickerId <<", TickType:" << field << ", price:" << price;

emit DummySignalTicked(QString::number(tickerId));
}

void PosixIBClient::tickSize( TickerId tickerId, TickType field, int size)
{
emit DummySignalTicked(QString::number(tickerId));
}



TestSignalSlot - runs in data processing thread


#include <QObject>
#include <QThread>

class TestSignalSlot : public QObject
{
Q_OBJECT
public:
explicit TestSignalSlot(QObject *parent = 0);
virtual ~TestSignalSlot();

QThread *pThread;
signals:

public slots:
void onDummySignalTraded(const QString &msg);
void onDummySignalTicked(const QString &msg);
};

#endif // TESTSIGNALSLOT_H




#include "testsignalslot.h"
#include <QDebug>

TestSignalSlot::TestSignalSlot(QObject *parent) : QObject(parent)
{
pThread = new QThread;
pThread->start();
moveToThread( pThread );
}
TestSignalSlot::~TestSignalSlot()
{
pThread->exit( 0 );
pThread->wait();
delete pThread;
}

void TestSignalSlot::onDummySignalTraded(const QString &msg)
{
qDebug() << "TestSignalSlot::onDummySignalTraded():" << msg;
}

void TestSignalSlot::onDummySignalTicked(const QString &msg)
{
qDebug() << "TestSignalSlot::onDummySignalTicked() in thread: " << QThread::currentThreadId();
qDebug() << "TestSignalSlot::onDummySignalTicked():" << msg;
}

The signal/slot connected as below:


connect(pPosixIBClient, SIGNAL(DummySignalTraded(const QString&)), pTestSignalSlot, SLOT(onDummySignalTraded(const QString&)));
connect(pPosixIBClient, SIGNAL(DummySignalTicked(const QString&)), pTestSignalSlot, SLOT(onDummySignalTicked(const QString&)));

anda_skoa
8th October 2015, 15:21
The only thing that is definitely problematic is the moveToThread after attempting thread start.

Have you checked if your reveicer object gets deleted?

Cheers,
_

polluxus
9th October 2015, 02:41
The only thing that is definitely problematic is the moveToThread after attempting thread start.

Have you checked if your reveicer object gets deleted?

Cheers,
_

Thanks. the TestSignalSlot object is deleted in the main gui class destructor.


TestAppWidget::~TestAppWidget()
{
if(!pPosixIBClient) delete pPosixIBClient;
if(!pTestSignalSlot) delete pTestSignalSlot;

if(!pDataMsgProcessor) delete pDataMsgProcessor;

}

Maybe, I should try not use member variable for the QThread. Will move pThread to the main GUI class. Thanks.

Added after 20 minutes:

I tried moving QThread out to my Gui class, but still can not solve the described problem. The onDummySignalTicked(), onDummySignalTraded() work only about 1 minute and then signal/slot stop. If I otherwise let signals emit without those slots connected, the signals emitted continuously as expected. So I believe it is my TestSignalSlot class resulting the signal/slot stop.



TestAppWidget::TestAppWidget(QWidget *parent)
: QWidget(parent)
{
.......
......
.........

pThreadTestSignalSlot = new QThread();
pThreadPosixIBClient= new QThread();

pPosixIBClient = new PosixIBClient();
pDataMsgProcessor = new MessageProcessor(pPosixIBClient);

pTestSignalSlot = new TestSignalSlot();

pPosixIBClient->moveToThread(pThreadPosixIBClient);
pTestSignalSlot->moveToThread(pThreadTestSignalSlot);

pThreadPosixIBClient->start();
pThreadTestSignalSlot->start();
}

anda_skoa
9th October 2015, 12:33
Add a timer to TestSignalSlot and make it trigger a log output continuously.
If that stops as well, then the event loop of the thread running TestSignalSlot got blocked somehow.

Cheers,
_

polluxus
9th October 2015, 17:58
Add a timer to TestSignalSlot and make it trigger a log output continuously.
If that stops as well, then the event loop of the thread running TestSignalSlot got blocked somehow.

Cheers,
_

Thx for the advice. I have done a similar test already. Looks like the problem is in the data API. In the PosixIBClient class I have a slot which runs an infinite loop checking socket. If market data available, tickPrice(), tickSize() etc. get called. DummySignalTicked then emitted. To do the test, I simply commented all codes in the infinite loop, and replaced with emit DummySignalTicked(msg). The TestSignalSlot::onDummySignalSlotTicked() worked without any problem. So the receiver thread is not blocked. The cause of the problem might be related to socket reading in API.

I am still checking what happended in market data socket reading. Why without slot connection or use directconnection signal/slot work normally, while slot connection causes blocking?Any suggestion?

Lesiok
10th October 2015, 08:31
Because "direct connection" works like standard procedure call - does not need event loop. "Qued connection" requires working event loop on the receiving end. I think that this infinite loop locks event loop.

polluxus
10th October 2015, 10:12
Because "direct connection" works like standard procedure call - does not need event loop. "Qued connection" requires working event loop on the receiving end. I think that this infinite loop locks event loop.

Thanks Lesiok. But the thing is if I bypass the data API socket checking and simply emit DummySignalTicked(msg) in the infinite loop PosixIBClient::onProcessMessages(), the receiver thread works fine. In processMessages() the API function pClient->onReceive() is called to read market data buffer, and PosixIBClient::tickPrice(), tickSize, tickString() are used as data API callback functions. I just do not understand why onReceive() could cause the receiver thread get blocked.



void PosixIBClient::onProcessMessages()
{
while (isConnected())
{
emit DummySignalTicked(QDateTime::currentDateTime().toS tring("hh:mm:ss:zzz"));
//processMessages();
}
}

void PosixIBClient::processMessages()
{
fd_set readSet, writeSet, errorSet;
struct timeval tval;
tval.tv_usec = 0;
tval.tv_sec = 0;
time_t now = time(NULL);
if( sleepDeadline > 0) {
// initialize timeout with m_sleepDeadline - now
tval.tv_sec = sleepDeadline - now;
qDebug() << "PosixClient::processMessages - tv_sec:" << tval.tv_sec;
}

if( pClient->fd() >= 0 )
{
FD_ZERO( &readSet);
writeSet = readSet;
FD_SET( pClient->fd(), &readSet);
if( !pClient->isOutBufferEmpty()) FD_SET( pClient->fd(), &writeSet);
FD_SET( pClient->fd(), &errorSet);
int ret = select( pClient->fd() + 1, &readSet, &writeSet, NULL, &tval);
if( ret == 0) return;
if( ret < 0) { // error
disconnect();
return;
}
if( pClient->fd() < 0) return;
if( FD_ISSET( pClient->fd(), &writeSet)) {
// socket is ready for writing
qDebug() << "PosixClient::processMessages: onSend()";
pClient->onSend();
}
if( pClient->fd() < 0) return;
if( FD_ISSET( pClient->fd(), &readSet)) {
// socket is ready for reading
//qDebug() << "PosixClient::processMessages: onReceive()";
pClient->onReceive();
}
}
}

The following is onReceive():



void EPosixClientSocket::onReceive()
{
if( !handleSocketError()) return;
checkMessages();
}

bool EClientSocketBase::checkMessages()
{
qDebug() <<"EClientSocketBase::checkMessages-------------------------";
if( !isSocketOK())
return false;

if( bufferedRead() <= 0) {;
return false;
}

const char* beginPtr = &m_inBuffer[0];
const char* ptr = beginPtr;
const char* endPtr = ptr + m_inBuffer.size();

try {
while( (m_connected ? processMsg( ptr, endPtr)
: processConnectAck( ptr, endPtr)) > 0) {
if( (ptr - beginPtr) >= (int)m_inBuffer.size())
{

break;
}
}
}
catch (...) {
CleanupBuffer( m_inBuffer, (ptr - beginPtr));
throw;
}

CleanupBuffer( m_inBuffer, (ptr - beginPtr));
return true;
}

int EClientSocketBase::processMsg(const char*& beginPtr, const char* endPtr)
{
// process a single message from the buffer;
// return number of bytes consumed
qDebug() <<"EClientSocketBase::processMsg-------------------------";
assert( beginPtr && beginPtr < endPtr);

try {

const char* ptr = beginPtr;

int msgId;
DECODE_FIELD( msgId);

switch( msgId) {
case TICK_PRICE:
{
int version;
int tickerId;
int tickTypeInt;
double price;

int size;
int canAutoExecute;

DECODE_FIELD( version);
DECODE_FIELD( tickerId);
DECODE_FIELD( tickTypeInt);
DECODE_FIELD( price);

DECODE_FIELD( size); // ver 2 field
DECODE_FIELD( canAutoExecute); // ver 3 field

m_pEWrapper->tickPrice( tickerId, (TickType)tickTypeInt, price, canAutoExecute);

// process ver 2 fields
{
TickType sizeTickType = NOT_SET;
switch( (TickType)tickTypeInt) {
case BID:
sizeTickType = BID_SIZE;
break;
case ASK:
sizeTickType = ASK_SIZE;
break;
case LAST:
sizeTickType = LAST_SIZE;
break;
default:
break;
}
if( sizeTickType != NOT_SET)
m_pEWrapper->tickSize( tickerId, sizeTickType, size);
}

break;
}

case TICK_SIZE:
{
int version;
int tickerId;
int tickTypeInt;
int size;

DECODE_FIELD( version);
DECODE_FIELD( tickerId);
DECODE_FIELD( tickTypeInt);
DECODE_FIELD( size);

m_pEWrapper->tickSize( tickerId, (TickType)tickTypeInt, size);
break;
}

............
}
}

anda_skoa
10th October 2015, 10:29
I think that this infinite loop locks event loop.

It would block the event loop of the PosixIBClient thread, which seems to be only used as a sender so it would not require an event loop at all.

Aside from the question whether it is a good idea to use blocking socket IO instead of nicely event driven one, blocking the socket thread should not have any impact on the other thread.

In any case, running the application in the debugger and inspecting the call stack of the receiver thread when it gets stuck should also give valuable information on why it is no longer processing events.

Cheers,
_

polluxus
10th October 2015, 17:52
I debugged into the data API, and found it is the data API socket reading operation somehow blocked. If I emit signals in PosixIBClient, I got errno == EWOULDBLOCK in EPosixClientSocket::handleSocketError(). So in such case, checkMessages() in EPosixClientSocket::onReceive() would never be called, and signals in tickPrice(), tickSize() would also never be emitted. But I still not quite understand what caused the API block, not enough socket buffer size due to signal emitting? Any suggestion would be appreciated.



void EPosixClientSocket::onReceive()
{
if( !handleSocketError())
{
qDebug() << "EPosixClientSocket::onReceive(): in handleSocketError";
return;
}

checkMessages();
}

bool EPosixClientSocket::handleSocketError()
{
// no error
if( errno == 0)
return true;

// Socket is already connected
if( errno == EISCONN) {
return true;
}

if( errno == EWOULDBLOCK)
{
qDebug() << "errno == EWOULDBLOCK";
return false;

}

if( errno == ECONNREFUSED) {
qDebug() << "errno == ECONNREFUSED";
getWrapper()->error( NO_VALID_ID, CONNECT_FAIL.code(), CONNECT_FAIL.msg());

}
else {
qDebug() << "errno != ECONNREFUSED";
getWrapper()->error( NO_VALID_ID, SOCKET_EXCEPTION.code(),
SOCKET_EXCEPTION.msg() + strerror(errno));
}
// reset errno
errno = 0;
qDebug() << "eDisconnect() called";
eDisconnect();
return false;
}

Added after 1 42 minutes:

To make the signal/slot work, I just let the EPosixClientSocket::handleSocketError() return true if errno == EWOULDBLOCK.

But what caused errno == EWOULDBLOCK? still no idea.