raheelgup
28th May 2014, 14:27
Hi,
I am writing a Client and Server application (with my own protocol) to transfer huge amounts of data (like 10GB+). I need to transfer lots of files which may be small and big.
I am using Qt 4.8.5 as I have to support CentOS and RedHat.
Flushing Issues :
Everything seems to be working except that the data written to the socket is not flushing when written to the QSslSocket. So it sits in the socket buffers. I wished it flushed on its own.
I order to flush, I need to explicitly call flush() to flush the data. (I dont want to call flush explicitly though)
Calling the flush() function often, causes the memory usage to keep on increasing although the bytestowrite and "encrypted bytes to write" are below 20MB (as per my flushing loop which is commented) e.g. if the memory usage is 50 MB with 20 MB in bytestowrite and 20 MB in "encrypted bytes to write", when the flushing loop completes (to read more data) and there is no data in the bytestowrite nor in "encrypted bytes to write", the memory should have dropped to close to 10 MB (or it could reserve the memory for next usage). But after transferring something like 100MB its total memory usage might be 70 MB with nothing in the buffers. After 500 MB transfer, it goes close to 150MB of memory usage. And as soon as the socket is closed or left to read for more data, the memory drops to 10 MB again.
Transfer Rate problem :
The transfer rate is extremely slow and I get like 4-5 MBs/second from one local VM to my desktop.
If I just read like 400 MB, it takes 12-13 seconds. With transferring data over the socket, it takes a whopping 80 seconds.
One last note is that, this protocol object is created inside the main() function and hence its running on the main thread.
Now my code is as follows :
// protocol.h
#ifndef PROTOCOL_H
#define PROTOCOL_H
#include <QtNetwork>
// myproto Thread class
class Protocol : public QObject {
Q_OBJECT
public:
// Constructor
Protocol(QObject *parent = 0);
// Destructor
~Protocol();
// The Socket pointer
QSslSocket * socket;
// Variables for logging in etc
QMap<QString, QString> vars;
unsigned int bblen; // The len of data in the bbuffer
int Compress; // Is the current partition to be compressed ?
unsigned int written; // How many packets have been written
unsigned int maxtx; // The amount of packets that can be written in one shot
int socket_rw_time; // The last time there was a socket read write
int sleepFor; // Will sleep for these many milliseconds
int doneCalled; // Is set to 1 or greater when done is called
QList<quint64> badBlocks; // Any bad blocks found
// Finish the backup process
void done();
// Write the data with the length header
qint64 rite (const QByteArray & data, bool keptAlive = false);
// Start the process to backup - Issues the login command
bool doit();
// Send Data
void sendBlocks();
signals:
void quit();
void finished();
private slots:
// When any data is available, it goes through this
void socketReadyRead();
// Called when the Encryption starts over SSL Sockets
void socketEncrypted();
// More slots
};
#endif
// protocol.cpp
#include "protocol.h"
// The constructor
Protocol::Protocol(QObject *parent) : QObject(parent) {
NULOG2("[Protocol::Protocol] Start");
// Codes
socket = new QSslSocket();
}
// Destructor
Protocol::~Protocol(){
NULOG2("[Protocol::~Protocol] Destructor");
delete socket;
}
// Finish the backup process, even in case of error
void Protocol::done(){
// Had to comment due to large post
}
// Socket write safe as it appends the length to the beginning of the string
qint64 Protocol::rite(const QByteArray & data, bool keptAlive){
quint16 qlen = ((quint16)data.size());
QByteArray len;
len.append(QByteArray(((char*)&qlen)+1,1));
len.append(QByteArray((char*)&qlen,1));
qint64 w = socket->write(len + data);
NULOG4("[Protocol::rite] Size : " << data.size() << "Written : " << w);
return w;
}
// Start the whole process
bool Protocol::doit(){
// Get the kernel data
//this->myproto_config();
connect(socket, SIGNAL(readyRead()),
this, SLOT(socketReadyRead()));
connect(socket, SIGNAL(bytesWritten(qint64)),
this, SLOT(socketBytesWritten(qint64)));
connect(socket, SIGNAL(encryptedBytesWritten(qint64)),
this, SLOT(socketencryptedBytesWritten(qint64)));
connect(socket, SIGNAL(sslErrors(QList<QSslError>)),
this, SLOT(sslErrors(QList<QSslError>)));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(serror(QAbstractSocket::SocketError)));
/*connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)) ,
this, SLOT(socketStateChanged(QAbstractSocket::SocketSta te)));*/
// Encrypted
if(true){
NULOG1("[Protocol::doit] Connecting to Secure Server " << vars["hostname"] << "Port :" << vars["port"]);
connect(socket, SIGNAL(encrypted()),
this, SLOT(socketEncrypted()));
// Connect to the server
socket->connectToHostEncrypted(vars["hostname"], vars["port"].toInt());
}
return true;
}
// Called when the connection is fully established
void Protocol::socketEncrypted(){
NULOG1("[Protocol::socketEncrypted] Connection Established with Server");
// This is the myproto protocol
socket->write("myproto\r\n");
socket->waitForBytesWritten();
QByteArray username = vars["username"].toUtf8();
QByteArray password = vars["password"].toUtf8();
// We first need to LOGIN
this->rite("LOGIN "+ username.toBase64() +" "+ password.toBase64());
}
// Called whenever there is something to READ in the buffer
void Protocol::socketReadyRead(){
QByteArray buf; // The buffer of the socket read
qint64 n;
QByteArray len;
QString str;
int Packetlen;
while(socket->bytesAvailable() != 0){
// Find the length of the command
if (dlen == 0) {
if(socket->bytesAvailable() < 2){
NULOG2("[Protocol::socketReadyRead] Less than 2 packets...breaking" << socket->bytesAvailable());
break;
}
len = socket->read(2);
// Was there an error in reading ?
if(len.size() != 2){
NULOG0("[Protocol::socketReadyRead] Could not read the Length of the Packet " << len.size());
return;
}
dlen = (qint64)((((quint8)len[0]) << 8) | (quint8)len[1]);
// The packet cannot be greater than buffer
if(dlen > 8192){
NULOG0("[Protocol::socketReadyRead] The packet cannot be greater than buffer");
return;
}
}
// If there are not many bytes available then break
if(socket->bytesAvailable() < dlen){
break;
}
buf = socket->read(dlen);
// Is the packet of the right size
if(buf.size() != ((int)dlen)){
NULOG0("[Protocol::socketReadyRead] The bytes read" << n << "dont match the packet size" << dlen);
return;
}
// Clear the string
str = buf;
// Reset the counter
Packetlen = (int)dlen;
dlen = (qint64)0;
// Send data
if (str[0].toLower() == QChar('gd')){
sendBlocks();
// Quit
}else if (str[0].toLower() == QChar('qu')){
done();return;
// Unknown Command
}else{
NULOG0("[Protocol::socketReadyRead] An Unknown Command has happened");
}
}// End of while
}
// Sends the data to the server
void Protocol::sendBlocks(){
// Open File
QFile fp("/path/to/large/file");
if(!fp.open(QIODevice::ReadOnly)){
return false;
}
qint64 read = 0;
while(fp.size() != read){
QByteArray finalData = fp.read(4096);
// Write data
this->rite(finalData);
this->bblen++;
// Sleep to reduce read load
if((this->bblen % 2500) == 0 && this->bblen > 0){
NULOG2("[Protocol::sendBlocks] Bytes " << this->socket->encryptedBytesToWrite() << this->socket->bytesToWrite());
/*// The following CODE is used to flush data as for some reason the data is not flushed automatically
if((this->bblen % 25000) == 0){
while(this->socket->encryptedBytesToWrite() > 10000000 || this->socket->bytesToWrite() > 10000000){
this->socket->flush();
}
}*/
}
}
// Wait for more socket read !
return true;
}
// Code to execute protocol in main.cpp
int main(){
Protocol protObj;
protObj.doit();
}
I am writing a Client and Server application (with my own protocol) to transfer huge amounts of data (like 10GB+). I need to transfer lots of files which may be small and big.
I am using Qt 4.8.5 as I have to support CentOS and RedHat.
Flushing Issues :
Everything seems to be working except that the data written to the socket is not flushing when written to the QSslSocket. So it sits in the socket buffers. I wished it flushed on its own.
I order to flush, I need to explicitly call flush() to flush the data. (I dont want to call flush explicitly though)
Calling the flush() function often, causes the memory usage to keep on increasing although the bytestowrite and "encrypted bytes to write" are below 20MB (as per my flushing loop which is commented) e.g. if the memory usage is 50 MB with 20 MB in bytestowrite and 20 MB in "encrypted bytes to write", when the flushing loop completes (to read more data) and there is no data in the bytestowrite nor in "encrypted bytes to write", the memory should have dropped to close to 10 MB (or it could reserve the memory for next usage). But after transferring something like 100MB its total memory usage might be 70 MB with nothing in the buffers. After 500 MB transfer, it goes close to 150MB of memory usage. And as soon as the socket is closed or left to read for more data, the memory drops to 10 MB again.
Transfer Rate problem :
The transfer rate is extremely slow and I get like 4-5 MBs/second from one local VM to my desktop.
If I just read like 400 MB, it takes 12-13 seconds. With transferring data over the socket, it takes a whopping 80 seconds.
One last note is that, this protocol object is created inside the main() function and hence its running on the main thread.
Now my code is as follows :
// protocol.h
#ifndef PROTOCOL_H
#define PROTOCOL_H
#include <QtNetwork>
// myproto Thread class
class Protocol : public QObject {
Q_OBJECT
public:
// Constructor
Protocol(QObject *parent = 0);
// Destructor
~Protocol();
// The Socket pointer
QSslSocket * socket;
// Variables for logging in etc
QMap<QString, QString> vars;
unsigned int bblen; // The len of data in the bbuffer
int Compress; // Is the current partition to be compressed ?
unsigned int written; // How many packets have been written
unsigned int maxtx; // The amount of packets that can be written in one shot
int socket_rw_time; // The last time there was a socket read write
int sleepFor; // Will sleep for these many milliseconds
int doneCalled; // Is set to 1 or greater when done is called
QList<quint64> badBlocks; // Any bad blocks found
// Finish the backup process
void done();
// Write the data with the length header
qint64 rite (const QByteArray & data, bool keptAlive = false);
// Start the process to backup - Issues the login command
bool doit();
// Send Data
void sendBlocks();
signals:
void quit();
void finished();
private slots:
// When any data is available, it goes through this
void socketReadyRead();
// Called when the Encryption starts over SSL Sockets
void socketEncrypted();
// More slots
};
#endif
// protocol.cpp
#include "protocol.h"
// The constructor
Protocol::Protocol(QObject *parent) : QObject(parent) {
NULOG2("[Protocol::Protocol] Start");
// Codes
socket = new QSslSocket();
}
// Destructor
Protocol::~Protocol(){
NULOG2("[Protocol::~Protocol] Destructor");
delete socket;
}
// Finish the backup process, even in case of error
void Protocol::done(){
// Had to comment due to large post
}
// Socket write safe as it appends the length to the beginning of the string
qint64 Protocol::rite(const QByteArray & data, bool keptAlive){
quint16 qlen = ((quint16)data.size());
QByteArray len;
len.append(QByteArray(((char*)&qlen)+1,1));
len.append(QByteArray((char*)&qlen,1));
qint64 w = socket->write(len + data);
NULOG4("[Protocol::rite] Size : " << data.size() << "Written : " << w);
return w;
}
// Start the whole process
bool Protocol::doit(){
// Get the kernel data
//this->myproto_config();
connect(socket, SIGNAL(readyRead()),
this, SLOT(socketReadyRead()));
connect(socket, SIGNAL(bytesWritten(qint64)),
this, SLOT(socketBytesWritten(qint64)));
connect(socket, SIGNAL(encryptedBytesWritten(qint64)),
this, SLOT(socketencryptedBytesWritten(qint64)));
connect(socket, SIGNAL(sslErrors(QList<QSslError>)),
this, SLOT(sslErrors(QList<QSslError>)));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(serror(QAbstractSocket::SocketError)));
/*connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)) ,
this, SLOT(socketStateChanged(QAbstractSocket::SocketSta te)));*/
// Encrypted
if(true){
NULOG1("[Protocol::doit] Connecting to Secure Server " << vars["hostname"] << "Port :" << vars["port"]);
connect(socket, SIGNAL(encrypted()),
this, SLOT(socketEncrypted()));
// Connect to the server
socket->connectToHostEncrypted(vars["hostname"], vars["port"].toInt());
}
return true;
}
// Called when the connection is fully established
void Protocol::socketEncrypted(){
NULOG1("[Protocol::socketEncrypted] Connection Established with Server");
// This is the myproto protocol
socket->write("myproto\r\n");
socket->waitForBytesWritten();
QByteArray username = vars["username"].toUtf8();
QByteArray password = vars["password"].toUtf8();
// We first need to LOGIN
this->rite("LOGIN "+ username.toBase64() +" "+ password.toBase64());
}
// Called whenever there is something to READ in the buffer
void Protocol::socketReadyRead(){
QByteArray buf; // The buffer of the socket read
qint64 n;
QByteArray len;
QString str;
int Packetlen;
while(socket->bytesAvailable() != 0){
// Find the length of the command
if (dlen == 0) {
if(socket->bytesAvailable() < 2){
NULOG2("[Protocol::socketReadyRead] Less than 2 packets...breaking" << socket->bytesAvailable());
break;
}
len = socket->read(2);
// Was there an error in reading ?
if(len.size() != 2){
NULOG0("[Protocol::socketReadyRead] Could not read the Length of the Packet " << len.size());
return;
}
dlen = (qint64)((((quint8)len[0]) << 8) | (quint8)len[1]);
// The packet cannot be greater than buffer
if(dlen > 8192){
NULOG0("[Protocol::socketReadyRead] The packet cannot be greater than buffer");
return;
}
}
// If there are not many bytes available then break
if(socket->bytesAvailable() < dlen){
break;
}
buf = socket->read(dlen);
// Is the packet of the right size
if(buf.size() != ((int)dlen)){
NULOG0("[Protocol::socketReadyRead] The bytes read" << n << "dont match the packet size" << dlen);
return;
}
// Clear the string
str = buf;
// Reset the counter
Packetlen = (int)dlen;
dlen = (qint64)0;
// Send data
if (str[0].toLower() == QChar('gd')){
sendBlocks();
// Quit
}else if (str[0].toLower() == QChar('qu')){
done();return;
// Unknown Command
}else{
NULOG0("[Protocol::socketReadyRead] An Unknown Command has happened");
}
}// End of while
}
// Sends the data to the server
void Protocol::sendBlocks(){
// Open File
QFile fp("/path/to/large/file");
if(!fp.open(QIODevice::ReadOnly)){
return false;
}
qint64 read = 0;
while(fp.size() != read){
QByteArray finalData = fp.read(4096);
// Write data
this->rite(finalData);
this->bblen++;
// Sleep to reduce read load
if((this->bblen % 2500) == 0 && this->bblen > 0){
NULOG2("[Protocol::sendBlocks] Bytes " << this->socket->encryptedBytesToWrite() << this->socket->bytesToWrite());
/*// The following CODE is used to flush data as for some reason the data is not flushed automatically
if((this->bblen % 25000) == 0){
while(this->socket->encryptedBytesToWrite() > 10000000 || this->socket->bytesToWrite() > 10000000){
this->socket->flush();
}
}*/
}
}
// Wait for more socket read !
return true;
}
// Code to execute protocol in main.cpp
int main(){
Protocol protObj;
protObj.doit();
}