PDA

View Full Version : QSslSocket/QTcpSocket not flushing data by itself



raheelgup
28th May 2014, 15:27
Hi,

I am writing a Client and Server application (with my own protocol) to transfer huge amounts of data (like 10GB+). I need to transfer lots of files which may be small and big.
I am using Qt 4.8.5 as I have to support CentOS and RedHat.

Flushing Issues :
Everything seems to be working except that the data written to the socket is not flushing when written to the QSslSocket. So it sits in the socket buffers. I wished it flushed on its own.

I order to flush, I need to explicitly call flush() to flush the data. (I dont want to call flush explicitly though)
Calling the flush() function often, causes the memory usage to keep on increasing although the bytestowrite and "encrypted bytes to write" are below 20MB (as per my flushing loop which is commented) e.g. if the memory usage is 50 MB with 20 MB in bytestowrite and 20 MB in "encrypted bytes to write", when the flushing loop completes (to read more data) and there is no data in the bytestowrite nor in "encrypted bytes to write", the memory should have dropped to close to 10 MB (or it could reserve the memory for next usage). But after transferring something like 100MB its total memory usage might be 70 MB with nothing in the buffers. After 500 MB transfer, it goes close to 150MB of memory usage. And as soon as the socket is closed or left to read for more data, the memory drops to 10 MB again.

Transfer Rate problem :
The transfer rate is extremely slow and I get like 4-5 MBs/second from one local VM to my desktop.
If I just read like 400 MB, it takes 12-13 seconds. With transferring data over the socket, it takes a whopping 80 seconds.

One last note is that, this protocol object is created inside the main() function and hence its running on the main thread.

Now my code is as follows :



// protocol.h
#ifndef PROTOCOL_H
#define PROTOCOL_H
#include <QtNetwork>

// myproto Thread class
class Protocol : public QObject {

Q_OBJECT

public:

// Constructor
Protocol(QObject *parent = 0);

// Destructor
~Protocol();

// The Socket pointer
QSslSocket * socket;

// Variables for logging in etc
QMap<QString, QString> vars;

unsigned int bblen; // The len of data in the bbuffer

int Compress; // Is the current partition to be compressed ?
unsigned int written; // How many packets have been written
unsigned int maxtx; // The amount of packets that can be written in one shot
int socket_rw_time; // The last time there was a socket read write
int sleepFor; // Will sleep for these many milliseconds
int doneCalled; // Is set to 1 or greater when done is called
QList<quint64> badBlocks; // Any bad blocks found

// Finish the backup process
void done();

// Write the data with the length header
qint64 rite (const QByteArray & data, bool keptAlive = false);

// Start the process to backup - Issues the login command
bool doit();

// Send Data
void sendBlocks();

signals:

void quit();

void finished();

private slots:

// When any data is available, it goes through this
void socketReadyRead();

// Called when the Encryption starts over SSL Sockets
void socketEncrypted();

// More slots

};

#endif




// protocol.cpp
#include "protocol.h"

// The constructor
Protocol::Protocol(QObject *parent) : QObject(parent) {

NULOG2("[Protocol::Protocol] Start");

// Codes

socket = new QSslSocket();
}

// Destructor
Protocol::~Protocol(){

NULOG2("[Protocol::~Protocol] Destructor");

delete socket;

}


// Finish the backup process, even in case of error
void Protocol::done(){

// Had to comment due to large post

}

// Socket write safe as it appends the length to the beginning of the string
qint64 Protocol::rite(const QByteArray & data, bool keptAlive){
quint16 qlen = ((quint16)data.size());
QByteArray len;

len.append(QByteArray(((char*)&qlen)+1,1));
len.append(QByteArray((char*)&qlen,1));

qint64 w = socket->write(len + data);
NULOG4("[Protocol::rite] Size : " << data.size() << "Written : " << w);

return w;
}

// Start the whole process
bool Protocol::doit(){

// Get the kernel data
//this->myproto_config();

connect(socket, SIGNAL(readyRead()),
this, SLOT(socketReadyRead()));
connect(socket, SIGNAL(bytesWritten(qint64)),
this, SLOT(socketBytesWritten(qint64)));
connect(socket, SIGNAL(encryptedBytesWritten(qint64)),
this, SLOT(socketencryptedBytesWritten(qint64)));
connect(socket, SIGNAL(sslErrors(QList<QSslError>)),
this, SLOT(sslErrors(QList<QSslError>)));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(serror(QAbstractSocket::SocketError)));
/*connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)) ,
this, SLOT(socketStateChanged(QAbstractSocket::SocketSta te)));*/

// Encrypted
if(true){

NULOG1("[Protocol::doit] Connecting to Secure Server " << vars["hostname"] << "Port :" << vars["port"]);

connect(socket, SIGNAL(encrypted()),
this, SLOT(socketEncrypted()));

// Connect to the server
socket->connectToHostEncrypted(vars["hostname"], vars["port"].toInt());

}
return true;

}

// Called when the connection is fully established
void Protocol::socketEncrypted(){

NULOG1("[Protocol::socketEncrypted] Connection Established with Server");

// This is the myproto protocol
socket->write("myproto\r\n");
socket->waitForBytesWritten();

QByteArray username = vars["username"].toUtf8();
QByteArray password = vars["password"].toUtf8();

// We first need to LOGIN
this->rite("LOGIN "+ username.toBase64() +" "+ password.toBase64());

}

// Called whenever there is something to READ in the buffer
void Protocol::socketReadyRead(){

QByteArray buf; // The buffer of the socket read
qint64 n;
QByteArray len;
QString str;
int Packetlen;

while(socket->bytesAvailable() != 0){

// Find the length of the command
if (dlen == 0) {

if(socket->bytesAvailable() < 2){
NULOG2("[Protocol::socketReadyRead] Less than 2 packets...breaking" << socket->bytesAvailable());
break;
}

len = socket->read(2);

// Was there an error in reading ?
if(len.size() != 2){
NULOG0("[Protocol::socketReadyRead] Could not read the Length of the Packet " << len.size());
return;
}

dlen = (qint64)((((quint8)len[0]) << 8) | (quint8)len[1]);

// The packet cannot be greater than buffer
if(dlen > 8192){
NULOG0("[Protocol::socketReadyRead] The packet cannot be greater than buffer");
return;
}

}

// If there are not many bytes available then break
if(socket->bytesAvailable() < dlen){
break;
}

buf = socket->read(dlen);

// Is the packet of the right size
if(buf.size() != ((int)dlen)){
NULOG0("[Protocol::socketReadyRead] The bytes read" << n << "dont match the packet size" << dlen);
return;
}

// Clear the string
str = buf;

// Reset the counter
Packetlen = (int)dlen;
dlen = (qint64)0;

// Send data
if (str[0].toLower() == QChar('gd')){

sendBlocks();

// Quit
}else if (str[0].toLower() == QChar('qu')){

done();return;

// Unknown Command
}else{

NULOG0("[Protocol::socketReadyRead] An Unknown Command has happened");

}

}// End of while

}

// Sends the data to the server
void Protocol::sendBlocks(){

// Open File
QFile fp("/path/to/large/file");

if(!fp.open(QIODevice::ReadOnly)){
return false;
}

qint64 read = 0;

while(fp.size() != read){

QByteArray finalData = fp.read(4096);

// Write data
this->rite(finalData);

this->bblen++;

// Sleep to reduce read load
if((this->bblen % 2500) == 0 && this->bblen > 0){

NULOG2("[Protocol::sendBlocks] Bytes " << this->socket->encryptedBytesToWrite() << this->socket->bytesToWrite());

/*// The following CODE is used to flush data as for some reason the data is not flushed automatically
if((this->bblen % 25000) == 0){

while(this->socket->encryptedBytesToWrite() > 10000000 || this->socket->bytesToWrite() > 10000000){
this->socket->flush();
}

}*/

}

}

// Wait for more socket read !

return true;

}





// Code to execute protocol in main.cpp
int main(){
Protocol protObj;
protObj.doit();
}

yeye_olive
3rd June 2014, 21:49
Your problem seems to be that you try to send all the data at once using write(), which simply enqueues the data in an unbounded buffer. Data will only be sent when you return to the event loop and calling flush() is a poor attempt at fixing a broken design. I suggest you take an asynchronous approach, just like you would use to read data.

You could add members to your class to keep track of how much data you have sent from your file. Simply making the QFile a member of the class may be enough. Then, connect the socket's encryptedBytesWritten() signal to a slot in your class, in which you first call encryptedBytesToWrite() to check how much data is yet to be sent and, if this number goes below a certain threshold, read some chunk of data from the file, write() it to the socket, and return to let control go back to the event loop and have the socket send whatever is waiting in its buffer. That way, you will keep memory usage under control by loading the file on demand as the data is sent over the network.

Note that this asynchronous design will spread through the program: instead of the current (synchronous) sendBlocks() method, you will need a method which starts sending data and returns immediately, and a signal that you emit when the work is done (emit this signal from within the slot connected to encryptedBytesWritten(), after reaching the end of, then closing, the file).

By the way, your program does connect encryptedBytesWritten() to some slot whose code your forgot to include in your snippet.

jefftee
10th June 2014, 09:48
Transfer Rate problem :
The transfer rate is extremely slow and I get like 4-5 MBs/second from one local VM to my desktop.
If I just read like 400 MB, it takes 12-13 seconds. With transferring data over the socket, it takes a whopping 80 seconds.
You are definitely slowing things down by calling waitForBytesWritten. Since you are running this all on the main thread, you're causing the main thread to block, which prevents you from reading more data to send. I also see that you are reading data in pretty small chunks of 4096 bytes and writing to the socket the same 4096 byte chunks. You should increase the amount of data read from the file in one read operation and write the same amount of data to the socket. You can benchmark different values to determine the optimal read/write values, etc. Then change your Protocol::socketReadyRead() slot to ReadAll into a QByteArray and append the new data read to the existing QByteArray buffer. You should also make the QByteArray buf a class variable so that its contents can persist across calls to Protocol::socketReadyRead() because Protocol::socketReadyRead() will not always have all bytes sent available at once (in fact, it rarely does when reading/writing lots of data).

Parse the "records" that may be in the QByteArray by looking at the length of data you're prepending in Protocol::rite() and determining whether or not you have a complete data item. If you do, extract that data from the QByteArray, then remove that data from QByteArray using QByteArray::remove(0, len) so that it's no longer in the buffer and the next "record" is at position 0 in the QByteArray, etc.

One more comment, I don't see where you are writing the commands 'gd' and 'qu' to the socket in your code, but in Protocol::socketReadyRead() you don't seem to be expecting them to be proceeded with the length of the command data item. I would suggest that you be consistent and each packet of data written by your app should include some common prefix like a length of the data, the "command", followed by any data specific to the "command" type (could be nothing, etc). You may also consider using a command like 'dt' (or any other unique value you want) to denote this is file data, etc. This way you can consistently determine what the packet of data is to be used for, etc.

You may also consider some type of checksum or hash value of the data so that you can verify the contents on the receiving end. You'll have the length of the data and a checksum/hash that you could compute on the received data to ensure it matches the data that was sent. This is not uncommon in backing up files across a network. You're better off finding out that what you received doesn't match what was sent at the time the data is received, so you could implement a new command (like 'rt' for retry) to retransmit that block of data, etc.

Hope this gets you pointed in the right direction.

Regards,

Jeff