Results 1 to 9 of 9

Thread: How to correctly use signal/slot in a data streaming application?

  1. #1
    Join Date
    Oct 2015
    Posts
    8
    Qt products
    Qt5
    Platforms
    Unix/X11

    Default How to correctly use signal/slot in a data streaming application?

    I am using signal/slot in a multithread stock trading system. Apart from the main GUI thread, I have a data feed API thread and a data processing thread. The data feed API thread monitors the market data and if data received, signals are emitted. The data processing thread has slots listen to the signals, and simply print the msg in signals. The signals are sent very frequent, about 100 signals per second. Currently, my slots only work for 1 to 2 minutes then stop response to signals if use default connection. If use directconnection, slots work normally, but in the same thread of data API, which is not I want. The followings are codes related to signal/slot communication. Please kindly help, thanks.

    PosixIBClient::tickPrice( ) and tickSize() runs in data feed API thread

    Qt Code:
    1. void PosixIBClient::tickPrice( TickerId tickerId, TickType field, double price, int canAutoExecute)
    2. {
    3. qDebug() << "PosixIBClient: tickString() in thread: " << QThread::currentThreadId();
    4. qDebug() << "tickPrice():" << tickerId <<", TickType:" << field << ", price:" << price;
    5.  
    6. emit DummySignalTicked(QString::number(tickerId));
    7. }
    8.  
    9. void PosixIBClient::tickSize( TickerId tickerId, TickType field, int size)
    10. {
    11. emit DummySignalTicked(QString::number(tickerId));
    12. }
    To copy to clipboard, switch view to plain text mode 

    TestSignalSlot - runs in data processing thread

    Qt Code:
    1. #include <QObject>
    2. #include <QThread>
    3.  
    4. class TestSignalSlot : public QObject
    5. {
    6. Q_OBJECT
    7. public:
    8. explicit TestSignalSlot(QObject *parent = 0);
    9. virtual ~TestSignalSlot();
    10.  
    11. QThread *pThread;
    12. signals:
    13.  
    14. public slots:
    15. void onDummySignalTraded(const QString &msg);
    16. void onDummySignalTicked(const QString &msg);
    17. };
    18.  
    19. #endif // TESTSIGNALSLOT_H
    To copy to clipboard, switch view to plain text mode 



    Qt Code:
    1. #include "testsignalslot.h"
    2. #include <QDebug>
    3.  
    4. TestSignalSlot::TestSignalSlot(QObject *parent) : QObject(parent)
    5. {
    6. pThread = new QThread;
    7. pThread->start();
    8. moveToThread( pThread );
    9. }
    10. TestSignalSlot::~TestSignalSlot()
    11. {
    12. pThread->exit( 0 );
    13. pThread->wait();
    14. delete pThread;
    15. }
    16.  
    17. void TestSignalSlot::onDummySignalTraded(const QString &msg)
    18. {
    19. qDebug() << "TestSignalSlot::onDummySignalTraded():" << msg;
    20. }
    21.  
    22. void TestSignalSlot::onDummySignalTicked(const QString &msg)
    23. {
    24. qDebug() << "TestSignalSlot::onDummySignalTicked() in thread: " << QThread::currentThreadId();
    25. qDebug() << "TestSignalSlot::onDummySignalTicked():" << msg;
    26. }
    To copy to clipboard, switch view to plain text mode 

    The signal/slot connected as below:

    Qt Code:
    1. connect(pPosixIBClient, SIGNAL(DummySignalTraded(const QString&)), pTestSignalSlot, SLOT(onDummySignalTraded(const QString&)));
    2. connect(pPosixIBClient, SIGNAL(DummySignalTicked(const QString&)), pTestSignalSlot, SLOT(onDummySignalTicked(const QString&)));
    To copy to clipboard, switch view to plain text mode 

  2. #2
    Join Date
    Jan 2006
    Location
    Graz, Austria
    Posts
    8,416
    Thanks
    37
    Thanked 1,544 Times in 1,494 Posts
    Qt products
    Qt3 Qt4 Qt5
    Platforms
    Unix/X11 Windows

    Default Re: How to correctly use signal/slot in a data streaming application?

    The only thing that is definitely problematic is the moveToThread after attempting thread start.

    Have you checked if your reveicer object gets deleted?

    Cheers,
    _

  3. #3
    Join Date
    Oct 2015
    Posts
    8
    Qt products
    Qt5
    Platforms
    Unix/X11

    Default Re: How to correctly use signal/slot in a data streaming application?

    Quote Originally Posted by anda_skoa View Post
    The only thing that is definitely problematic is the moveToThread after attempting thread start.

    Have you checked if your reveicer object gets deleted?

    Cheers,
    _
    Thanks. the TestSignalSlot object is deleted in the main gui class destructor.

    Qt Code:
    1. TestAppWidget::~TestAppWidget()
    2. {
    3. if(!pPosixIBClient) delete pPosixIBClient;
    4. if(!pTestSignalSlot) delete pTestSignalSlot;
    5.  
    6. if(!pDataMsgProcessor) delete pDataMsgProcessor;
    7.  
    8. }
    To copy to clipboard, switch view to plain text mode 

    Maybe, I should try not use member variable for the QThread. Will move pThread to the main GUI class. Thanks.


    Added after 20 minutes:


    I tried moving QThread out to my Gui class, but still can not solve the described problem. The onDummySignalTicked(), onDummySignalTraded() work only about 1 minute and then signal/slot stop. If I otherwise let signals emit without those slots connected, the signals emitted continuously as expected. So I believe it is my TestSignalSlot class resulting the signal/slot stop.

    Qt Code:
    1. TestAppWidget::TestAppWidget(QWidget *parent)
    2. : QWidget(parent)
    3. {
    4. .......
    5. ......
    6. .........
    7.  
    8. pThreadTestSignalSlot = new QThread();
    9. pThreadPosixIBClient= new QThread();
    10.  
    11. pPosixIBClient = new PosixIBClient();
    12. pDataMsgProcessor = new MessageProcessor(pPosixIBClient);
    13.  
    14. pTestSignalSlot = new TestSignalSlot();
    15.  
    16. pPosixIBClient->moveToThread(pThreadPosixIBClient);
    17. pTestSignalSlot->moveToThread(pThreadTestSignalSlot);
    18.  
    19. pThreadPosixIBClient->start();
    20. pThreadTestSignalSlot->start();
    21. }
    To copy to clipboard, switch view to plain text mode 
    Last edited by polluxus; 9th October 2015 at 02:59.

  4. #4
    Join Date
    Jan 2006
    Location
    Graz, Austria
    Posts
    8,416
    Thanks
    37
    Thanked 1,544 Times in 1,494 Posts
    Qt products
    Qt3 Qt4 Qt5
    Platforms
    Unix/X11 Windows

    Default Re: How to correctly use signal/slot in a data streaming application?

    Add a timer to TestSignalSlot and make it trigger a log output continuously.
    If that stops as well, then the event loop of the thread running TestSignalSlot got blocked somehow.

    Cheers,
    _

  5. #5
    Join Date
    Oct 2015
    Posts
    8
    Qt products
    Qt5
    Platforms
    Unix/X11

    Default Re: How to correctly use signal/slot in a data streaming application?

    Quote Originally Posted by anda_skoa View Post
    Add a timer to TestSignalSlot and make it trigger a log output continuously.
    If that stops as well, then the event loop of the thread running TestSignalSlot got blocked somehow.

    Cheers,
    _
    Thx for the advice. I have done a similar test already. Looks like the problem is in the data API. In the PosixIBClient class I have a slot which runs an infinite loop checking socket. If market data available, tickPrice(), tickSize() etc. get called. DummySignalTicked then emitted. To do the test, I simply commented all codes in the infinite loop, and replaced with emit DummySignalTicked(msg). The TestSignalSlot:nDummySignalSlotTicked() worked without any problem. So the receiver thread is not blocked. The cause of the problem might be related to socket reading in API.

    I am still checking what happended in market data socket reading. Why without slot connection or use directconnection signal/slot work normally, while slot connection causes blocking?Any suggestion?

  6. #6
    Join Date
    Mar 2008
    Location
    Kraków, Poland
    Posts
    1,536
    Thanked 284 Times in 279 Posts
    Qt products
    Qt4
    Platforms
    Unix/X11 Windows

    Default Re: How to correctly use signal/slot in a data streaming application?

    Because "direct connection" works like standard procedure call - does not need event loop. "Qued connection" requires working event loop on the receiving end. I think that this infinite loop locks event loop.

  7. #7
    Join Date
    Oct 2015
    Posts
    8
    Qt products
    Qt5
    Platforms
    Unix/X11

    Default Re: How to correctly use signal/slot in a data streaming application?

    Quote Originally Posted by Lesiok View Post
    Because "direct connection" works like standard procedure call - does not need event loop. "Qued connection" requires working event loop on the receiving end. I think that this infinite loop locks event loop.
    Thanks Lesiok. But the thing is if I bypass the data API socket checking and simply emit DummySignalTicked(msg) in the infinite loop PosixIBClient:nProcessMessages(), the receiver thread works fine. In processMessages() the API function pClient->onReceive() is called to read market data buffer, and PosixIBClient::tickPrice(), tickSize, tickString() are used as data API callback functions. I just do not understand why onReceive() could cause the receiver thread get blocked.


    Qt Code:
    1. void PosixIBClient::onProcessMessages()
    2. {
    3. while (isConnected())
    4. {
    5. emit DummySignalTicked(QDateTime::currentDateTime().toString("hh:mm:ss:zzz"));
    6. //processMessages();
    7. }
    8. }
    9.  
    10. void PosixIBClient::processMessages()
    11. {
    12. fd_set readSet, writeSet, errorSet;
    13. struct timeval tval;
    14. tval.tv_usec = 0;
    15. tval.tv_sec = 0;
    16. time_t now = time(NULL);
    17. if( sleepDeadline > 0) {
    18. // initialize timeout with m_sleepDeadline - now
    19. tval.tv_sec = sleepDeadline - now;
    20. qDebug() << "PosixClient::processMessages - tv_sec:" << tval.tv_sec;
    21. }
    22.  
    23. if( pClient->fd() >= 0 )
    24. {
    25. FD_ZERO( &readSet);
    26. writeSet = readSet;
    27. FD_SET( pClient->fd(), &readSet);
    28. if( !pClient->isOutBufferEmpty()) FD_SET( pClient->fd(), &writeSet);
    29. FD_SET( pClient->fd(), &errorSet);
    30. int ret = select( pClient->fd() + 1, &readSet, &writeSet, NULL, &tval);
    31. if( ret == 0) return;
    32. if( ret < 0) { // error
    33. disconnect();
    34. return;
    35. }
    36. if( pClient->fd() < 0) return;
    37. if( FD_ISSET( pClient->fd(), &writeSet)) {
    38. // socket is ready for writing
    39. qDebug() << "PosixClient::processMessages: onSend()";
    40. pClient->onSend();
    41. }
    42. if( pClient->fd() < 0) return;
    43. if( FD_ISSET( pClient->fd(), &readSet)) {
    44. // socket is ready for reading
    45. //qDebug() << "PosixClient::processMessages: onReceive()";
    46. pClient->onReceive();
    47. }
    48. }
    49. }
    To copy to clipboard, switch view to plain text mode 

    The following is onReceive():

    Qt Code:
    1. void EPosixClientSocket::onReceive()
    2. {
    3. if( !handleSocketError()) return;
    4. checkMessages();
    5. }
    6.  
    7. bool EClientSocketBase::checkMessages()
    8. {
    9. qDebug() <<"EClientSocketBase::checkMessages-------------------------";
    10. if( !isSocketOK())
    11. return false;
    12.  
    13. if( bufferedRead() <= 0) {;
    14. return false;
    15. }
    16.  
    17. const char* beginPtr = &m_inBuffer[0];
    18. const char* ptr = beginPtr;
    19. const char* endPtr = ptr + m_inBuffer.size();
    20.  
    21. try {
    22. while( (m_connected ? processMsg( ptr, endPtr)
    23. : processConnectAck( ptr, endPtr)) > 0) {
    24. if( (ptr - beginPtr) >= (int)m_inBuffer.size())
    25. {
    26.  
    27. break;
    28. }
    29. }
    30. }
    31. catch (...) {
    32. CleanupBuffer( m_inBuffer, (ptr - beginPtr));
    33. throw;
    34. }
    35.  
    36. CleanupBuffer( m_inBuffer, (ptr - beginPtr));
    37. return true;
    38. }
    39.  
    40. int EClientSocketBase::processMsg(const char*& beginPtr, const char* endPtr)
    41. {
    42. // process a single message from the buffer;
    43. // return number of bytes consumed
    44. qDebug() <<"EClientSocketBase::processMsg-------------------------";
    45. assert( beginPtr && beginPtr < endPtr);
    46.  
    47. try {
    48.  
    49. const char* ptr = beginPtr;
    50.  
    51. int msgId;
    52. DECODE_FIELD( msgId);
    53.  
    54. switch( msgId) {
    55. case TICK_PRICE:
    56. {
    57. int version;
    58. int tickerId;
    59. int tickTypeInt;
    60. double price;
    61.  
    62. int size;
    63. int canAutoExecute;
    64.  
    65. DECODE_FIELD( version);
    66. DECODE_FIELD( tickerId);
    67. DECODE_FIELD( tickTypeInt);
    68. DECODE_FIELD( price);
    69.  
    70. DECODE_FIELD( size); // ver 2 field
    71. DECODE_FIELD( canAutoExecute); // ver 3 field
    72.  
    73. m_pEWrapper->tickPrice( tickerId, (TickType)tickTypeInt, price, canAutoExecute);
    74.  
    75. // process ver 2 fields
    76. {
    77. TickType sizeTickType = NOT_SET;
    78. switch( (TickType)tickTypeInt) {
    79. case BID:
    80. sizeTickType = BID_SIZE;
    81. break;
    82. case ASK:
    83. sizeTickType = ASK_SIZE;
    84. break;
    85. case LAST:
    86. sizeTickType = LAST_SIZE;
    87. break;
    88. default:
    89. break;
    90. }
    91. if( sizeTickType != NOT_SET)
    92. m_pEWrapper->tickSize( tickerId, sizeTickType, size);
    93. }
    94.  
    95. break;
    96. }
    97.  
    98. case TICK_SIZE:
    99. {
    100. int version;
    101. int tickerId;
    102. int tickTypeInt;
    103. int size;
    104.  
    105. DECODE_FIELD( version);
    106. DECODE_FIELD( tickerId);
    107. DECODE_FIELD( tickTypeInt);
    108. DECODE_FIELD( size);
    109.  
    110. m_pEWrapper->tickSize( tickerId, (TickType)tickTypeInt, size);
    111. break;
    112. }
    113.  
    114. ............
    115. }
    116. }
    To copy to clipboard, switch view to plain text mode 

  8. #8
    Join Date
    Jan 2006
    Location
    Graz, Austria
    Posts
    8,416
    Thanks
    37
    Thanked 1,544 Times in 1,494 Posts
    Qt products
    Qt3 Qt4 Qt5
    Platforms
    Unix/X11 Windows

    Default Re: How to correctly use signal/slot in a data streaming application?

    Quote Originally Posted by Lesiok View Post
    I think that this infinite loop locks event loop.
    It would block the event loop of the PosixIBClient thread, which seems to be only used as a sender so it would not require an event loop at all.

    Aside from the question whether it is a good idea to use blocking socket IO instead of nicely event driven one, blocking the socket thread should not have any impact on the other thread.

    In any case, running the application in the debugger and inspecting the call stack of the receiver thread when it gets stuck should also give valuable information on why it is no longer processing events.

    Cheers,
    _

  9. #9
    Join Date
    Oct 2015
    Posts
    8
    Qt products
    Qt5
    Platforms
    Unix/X11

    Default Re: How to correctly use signal/slot in a data streaming application?

    I debugged into the data API, and found it is the data API socket reading operation somehow blocked. If I emit signals in PosixIBClient, I got errno == EWOULDBLOCK in EPosixClientSocket::handleSocketError(). So in such case, checkMessages() in EPosixClientSocket:nReceive() would never be called, and signals in tickPrice(), tickSize() would also never be emitted. But I still not quite understand what caused the API block, not enough socket buffer size due to signal emitting? Any suggestion would be appreciated.


    Qt Code:
    1. void EPosixClientSocket::onReceive()
    2. {
    3. if( !handleSocketError())
    4. {
    5. qDebug() << "EPosixClientSocket::onReceive(): in handleSocketError";
    6. return;
    7. }
    8.  
    9. checkMessages();
    10. }
    11.  
    12. bool EPosixClientSocket::handleSocketError()
    13. {
    14. // no error
    15. if( errno == 0)
    16. return true;
    17.  
    18. // Socket is already connected
    19. if( errno == EISCONN) {
    20. return true;
    21. }
    22.  
    23. if( errno == EWOULDBLOCK)
    24. {
    25. qDebug() << "errno == EWOULDBLOCK";
    26. return false;
    27.  
    28. }
    29.  
    30. if( errno == ECONNREFUSED) {
    31. qDebug() << "errno == ECONNREFUSED";
    32. getWrapper()->error( NO_VALID_ID, CONNECT_FAIL.code(), CONNECT_FAIL.msg());
    33.  
    34. }
    35. else {
    36. qDebug() << "errno != ECONNREFUSED";
    37. getWrapper()->error( NO_VALID_ID, SOCKET_EXCEPTION.code(),
    38. SOCKET_EXCEPTION.msg() + strerror(errno));
    39. }
    40. // reset errno
    41. errno = 0;
    42. qDebug() << "eDisconnect() called";
    43. eDisconnect();
    44. return false;
    45. }
    To copy to clipboard, switch view to plain text mode 


    Added after 1 42 minutes:


    To make the signal/slot work, I just let the EPosixClientSocket::handleSocketError() return true if errno == EWOULDBLOCK.

    But what caused errno == EWOULDBLOCK? still no idea.
    Last edited by polluxus; 10th October 2015 at 17:52.

Similar Threads

  1. Replies: 1
    Last Post: 14th August 2014, 17:08
  2. Replies: 3
    Last Post: 25th October 2013, 10:39
  3. Passing data via signal/slot
    By gib in forum Qt Programming
    Replies: 4
    Last Post: 1st November 2010, 05:49
  4. disconnect SIGNAL/SLOT directly after emitting data
    By donglebob in forum Qt Programming
    Replies: 1
    Last Post: 4th February 2009, 22:53
  5. How signal-slot works across DLL and application???
    By Shuchi Agrawal in forum Newbie
    Replies: 4
    Last Post: 15th May 2007, 11:24

Tags for this Thread

Bookmarks

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
Digia, Qt and their respective logos are trademarks of Digia Plc in Finland and/or other countries worldwide.