PDA

View Full Version : multithread thread don't move



lzpmail
25th March 2011, 01:39
hello, i write a program that create 3 threads, and use QWaitCondition and QMutex let's them sync, but when the program run awhile, it stop in enqQue.wait(&mutex), and the program not die, it does not process data, please help me what wrong. thank you

mcosta
25th March 2011, 08:35
if you can post your code

lzpmail
27th March 2011, 04:51
this is my htree run functin code, this problem is the program run awhile, but after stoped in the second run() function's deqWait.wait(&mutQue), and now the dataBuff size is 8192(most big), the dataQue size is 0, please help what wrong. thanks


void UrineThread::run()
{
int readlen, retval, i;
char str[TEMPDATABUFSIZE];
fd_set rfds;
struct timeval tv ;

tv.tv_sec = 0;
tv.tv_usec = 100000;

printf("thread....\n");
while (!stopThread)
{
FD_ZERO(&rfds); // 清空串口接收端口 集
FD_SET(fd,&rfds); // 设置串口接收端口 ›†

while(FD_ISSET(fd, &rfds)) // æ£€æµ‹ä¸²å£æ˜¯å¦æœ‰è¯ å†™åŠ¨ä½œ
{
FD_ZERO(&rfds); // 清空串口接收端口 集 每次循环都要清空 ¼Œå¦åˆ™ä¸ä¼šæ£€æµ‹åˆ° 有变化
FD_SET(fd,&rfds); // 设置串口接收端口 ›†
retval = select(fd+1,&rfds,NULL,NULL,&tv);

if(retval == -1)
{
printf("an error accured.\n");
break;
}
else if(retval) //retval > 0
{
readlen = ::read(fd, str, TEMPDATABUFSIZE); //读取数据

printf("readlen value: %d\n", readlen);
printf("data Buff size : %d\n", dataBuff.size());

for (i=0; i<readlen; ++i)
{
mutex.lock();
while (dataBuff.size() >= DATABUFFSIZE)
buffFull.wait(&mutex); //ç*‰å¾…
dataBuff.enqueue(str);
buffEmpty.wakeAll();
enqWait.wakeAll(); //唤醒所有线程
mutex.unlock();
}
/* str[readlen] = '\0'; //ç»™è¯»å–çš„æ•°æ®åŠ ç» “尾符
dataBuff.append(str); */ //å°†æ•°æ®åŠ å…¥åˆ°ç¼“å ²åŒºä¸*

FD_ZERO(&rfds);
FD_SET(fd,&rfds);

retval = select(fd+1,&rfds,NULL,NULL,&tv); //判æ–*是否还有数æ

if(!retval) //å¦‚æžœæ²¡æœ‰æ•°æ®åˆ™é €å‡ºç¬¬äºŒå±‚å¾ªçŽ¯
{
break;
}
}
}
msleep(60); //æ— æ•°æ®æ—¶ï¼Œç¡çœ 100æ ¯«ç§’, 新增的
}
}

void UrineShowThread::run()
{
bool result;
PacketType currPack;

while (!stopThread)
{
mutQue.lock();
while (packQue.size() < 1)
deqWait.wait(&mutQue);
printf("data Buff size: %d, data Que size: %d\n", dataBuff.size(), packQue.size());
currPack = packQue.dequeue();
enqWait.wakeAll();
buffEmpty.wakeAll();
mutQue.unlock();

result = sHostPackHandler[gHostPackInfo[currPack.ID].type]( currPack );

if (FALSE == result)
{
printf("pack handler fault.\n");
// the command has not been processed, add code here to process it
}
}
}

void UrineProcThread::run()
{
printf("data process.\n"); //在æ*¤å¤„åŠ æ•°æ®å¤„ç ä»£ç 

gHostPackMan.MakePack(); //解包,并å*˜äºŽåŒ…队 列ä¸*
printf("out the make pack recursive\n");
}

void PackMan::MakePack( void )
{
UCHAR currChar;
BOOL result;
// repeat untill no data in receive buffer
while (!stopThread) //åªæœ‰å½“æ‰€æœ‰æ•°æ®é ƒ½å¤„理后才退出
{
mutex.lock();
// if (dataBuff.size() < 1)
while (dataBuff.size() < 1)
buffEmpty.wait(&mutex);
currChar = dataBuff.dequeue(); //从缓冲区ä¸*èŽ·å¾—ä¸ ä¸ªæ•°æ®
buffFull.wakeAll();
mutex.unlock();

// packet ID has been received
if (mPackIdGot)
{
// current byte is a valid packet data
if ( 0x80 <= currChar )
{
// be careful: data stored begin from the second byte
mCurrPack.buffer[mCurrPackLen] = currChar;
++mCurrPackLen;
--mRestByte;

// whole packet has been received,
if ( 0 >= mRestByte )
{
result = UnpackWithCheckSum( &mCurrPack.buffer[0], mCurrPackLen );

if (result) //解包成功
{
mutQue.lock();
while (packQue.size() >= PACKQUEUESIZE)
enqWait.wait(&mutQue);
packQue.enqueue(mCurrPack); //æŠŠè§£çš„åŒ…æ”¾åˆ°é˜Ÿåˆ ä¸*去
deqWait.wakeAll();
mutQue.unlock();
}
else
{
printf("check sum fault.\n");
// mErrorPack ++;
//
// if( NULL != gPtrView )
// {
// ::PostMessage( gPtrView->m_hWnd, WM_COMM_ERROR, mErrorPack, 0 );
// }
}
mPackIdGot = 0;
}
}
// current byte is not a valid packet data, maybe is a packet ID,
// unget it for further analysis
else
{
// there must be a error, because current packet is not integral
//mPacksReceived.Put(Pack_ErrPack);
// mErrorPack ++;
//
// if( NULL != gPtrView )
// {
// ::PostMessage( gPtrView->m_hWnd, WM_COMM_ERROR, mErrorPack, 0 );
// }
printf("the data is fault.\n");
mPackIdGot = 0;

mutex.lock();
while (dataBuff.size() >= DATABUFFSIZE)
buffFull.wait(&mutex); //ç*‰å¾…
dataBuff.prepend(currChar); //currChar 可能是一个包, 重新插入队列
buffEmpty.wakeAll(); //唤醒所有线程
mutex.unlock();
// mUART.mRxCharQue.Unget( );
}
}
// packet ID has not been received
else
{
// check whether currChar is a valid packet ID
if ( ( mMaxPackID > currChar ) && ( 0 < mPackInfo[currChar].len ) )
{
// ****** >>> ****** //
mRestByte = mPackInfo[currChar].len - 1 ;
mCurrPackLen = 1;
mCurrPack.ID = currChar;
mPackIdGot = 1;
printf("rest byte len: %d\n", mRestByte);

// if this kind of packet only has an ID, a whole packet received
if ( 0 == mRestByte )
{
mutQue.lock();
while (packQue.size() >= PACKQUEUESIZE)
enqWait.wait(&mutQue);
packQue.enqueue(mCurrPack); //æŠŠè§£çš„åŒ…æ”¾åˆ°é˜Ÿåˆ ä¸*去
deqWait.wakeAll();
mutQue.unlock();

mPackIdGot = 0;
}
}
// currChar is not a valid packet ID
else
{
printf("fault id: %d\n", currChar);
/* mErrorPack ++;

if( NULL != gPtrView )
{
::PostMessage( gPtrView->m_hWnd, WM_COMM_ERROR, mErrorPack, 0 );
}

// there must be a error, because current packet is not integral
mPacksReceived.Put(Pack_UnknownPack);
*/
}
}
} // while

return;
}

wysota
27th March 2011, 09:43
I hate to spoil your fun using threads but why don't you just use QSocketNotifier instead of all that C code?

lzpmail
27th March 2011, 09:54
yea, when i use thread before, i use the QSocketNotifier to listen the serial file, but i don't know why, the serial data lost sometime, and the interface will be die, so i change to use thread, i have no way.

wysota
27th March 2011, 09:58
Well, you must have been doing something wrong then. QSocketNotifier does more or less the same you're doing here only that it... works. There is no need for any threads here.

lzpmail
27th March 2011, 10:11
thank you, maybe i have something wrong, i paste my code of use QSocketNotifier

Added after 7 minutes:

this i use QSocketNotifier to notifier the serial.

void UrineCheck::checkDataWrite(int m_fd)
{
m_notifier = new QSocketNotifier(m_fd, QSocketNotifier::Read, this);
QObject::connect(m_notifier, SIGNAL(activated(int)), this, SLOT(readSerialData()));
}

static unsigned int bufflen = 2048;
void UrineCheck::readSerialData()
{
int readlen, notReadlen;
readlen = 0; //åˆå§‹åŒ–å·²è¯»æ•°æ®å¤ å°
char str[bufflen];
notReadlen = bufflen - readlen; //åˆå§‹åŒ–æœªè¯»çš„æ•°æ ®å¤§å°
QString string;
QByteArray byteArray, byteHex;

byteArray.clear();
byteHex.clear();
string.clear();

while ((readlen =::read(fd, &str[bufflen-notReadlen], notReadlen)) > 0)
{ //æ²¡æœ‰æ•°æ®å¯è¯»æ—¶æ‰ é€€å‡ºå¾ªçŽ¯
notReadlen -= readlen;

if (0 >= notReadlen)
{
readlen = 0; //初始化
notReadlen = bufflen - readlen;
totalSize += bufflen;
byteArray.clear(); //清除先前的数据
byteHex.clear(); //清空先前的数据
byteHex = byteArray.append(str).toHex(); //先转换为å*—èŠ‚æ•°ç» „,再转换为16进制 •°æ®
if ((index+8)%8 == 0) //æ*¤å¤–设了四个1024å* 节大小的缓冲区
{
index = 0;
totalStr.remove(0, bufflen); //从缓冲区ä¸*åˆ é™¤æ— §æ•°æ®
}
else
{
++index;
}
totalStr.append(byteHex); //å°†æ–°æ•°æ®åŠ å…¥ç¼“å ²åŒºä¸*
}
else
{
str[bufflen-notReadlen] = '\0';
}
textEdit->setPlainText(totalStr.mid(0, bufflen));
usleep(100000); //100毫秒
}

totalSize += bufflen - notReadlen;
byteArray.clear(); //清除先前的数据
byteHex.clear(); //清空先前的数据
byteHex = byteArray.append(str).toHex(); //先转换为å*—èŠ‚æ•°ç» „,再转换为16进制 •°æ®
if ((index+8)%8 == 0) //æ*¤å¤–设了四个1024å* 节大小的缓冲区
{
index = 0;
totalStr.remove(0, bufflen); //从缓冲区ä¸*åˆ é™¤æ— §æ•°æ®
}
else
{
++index;
}
totalStr.append(byteHex); //å°†æ–°æ•°æ®åŠ å…¥ç¼“å ²åŒºä¸*
textEdit->setPlainText(totalStr.mid(0, bufflen));

}

wysota
27th March 2011, 14:05
What kind of device does m_fd represent? Something like /dev/ttySX?

lzpmail
27th March 2011, 14:37
m_fd is type of int, is open the serial file and return an id to stored in m_fd.

the m_fd is same with in read fun's fd.

wysota
27th March 2011, 15:20
I'm not asking about the datatype, I'm asking what kind of stream it represents. Is it more like a fifo or like a regular file. In general you should be able to use it either via QLocalSocket or QFile without any special treatment. Your code should be as simple as:

QLocalSocket *device = new QLocalSocket;
device->setSocketDescriptor(m_fd);
connect(device, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
// ...
void X::onReadyRead() {
QByteArray text = device->readAll();
textEdit->setPlainText(text);
}

The problem with your code is that you are thinking in terms of blocking API whereas Qt prefers non-blocking.

lzpmail
28th March 2011, 02:35
i will remember what you say, to say with you very happy. thanks. because the serial port will receive more data a second, i want to know the interface is will be die.

wysota
28th March 2011, 09:09
because the serial port will receive more data a second, i want to know the interface is will be die.
I have no idea what you mean by that.