#ifndef SYNAPSE_H
#define SYNAPSE_H
#include <QtNetwork>
#include "packet.h"
Q_OBJECT
public:
Synapse
(QObject *parent_,
QTcpSocket *tcpSocket_,
int queuelimit_, uint32_t blocksize_
);
Synapse
(QObject *parent_,
QHostAddress address_, quint16 port_,
int queuelimit_, uint32_t blocksize_
);
QHostAddress get_peeraddress
(void){return tcpSocket
->peerAddress
();
} QString errorString
(void){return errorStr;
} bool is_connected(void){return connected;}
protected:
void run();
public slots:
void close(void);
bool add_packet(Packet *packet);
private slots:
void receive_packet();
bool send_packet(Packet *packet);
void send_next_packet(void);
private:
Packet *curPacket;
QList<Packet*> packet_queue;
int queuetlimit;
uint32_t curSize, maxSize, blockSize;
bool connected, sending;
void ident_packet();
signals:
void get_next_packet(void);
};
#endif // SYNAPSE_H
#ifndef SYNAPSE_H
#define SYNAPSE_H
#include <QtNetwork>
#include "packet.h"
class Synapse : public QThread{
Q_OBJECT
public:
Synapse(QObject *parent_, QTcpSocket *tcpSocket_, int queuelimit_, uint32_t blocksize_);
Synapse(QObject *parent_, QHostAddress address_, quint16 port_, int queuelimit_, uint32_t blocksize_);
QHostAddress get_peeraddress(void){return tcpSocket->peerAddress();}
QString errorString(void){return errorStr;}
bool is_connected(void){return connected;}
protected:
void run();
public slots:
void close(void);
bool add_packet(Packet *packet);
private slots:
void receive_packet();
bool send_packet(Packet *packet);
void send_next_packet(void);
private:
QTcpSocket *tcpSocket;
QObject *parent;
Packet *curPacket;
QList<Packet*> packet_queue;
int queuetlimit;
uint32_t curSize, maxSize, blockSize;
bool connected, sending;
QString errorStr;
void ident_packet();
signals:
void send_dbgmsg(QString);
void get_next_packet(void);
};
#endif // SYNAPSE_H
To copy to clipboard, switch view to plain text mode
#include "synapse.h"
Synapse
::Synapse(QObject *parent_,
QTcpSocket *tcpSocket_,
int queuelimit_, uint32_t blocksize_
){ tcpSocket = tcpSocket_;
parent = parent_;
queuetlimit = queuelimit_;
blockSize = blocksize_;
maxSize = curSize = 0;
sending = false;
connected = true;
}
Synapse
::Synapse(QObject *parent_,
QHostAddress address_, quint16 port_,
int queuelimit_, uint32_t blocksize_
){ parent = parent_;
tcpSocket->connectToHost(address_,port_);
if (!tcpSocket->waitForConnected()){
errorStr = tcpSocket->errorString(); //this is where the timeout occurs :(
connected = false;
return;
}
queuetlimit = queuelimit_;
blockSize = blocksize_;
sending = false;
maxSize = curSize = 0;
connected = true;
}
void Synapse::run(){
if (!connected) return;
QObject::connect(this,
SIGNAL(get_next_packet
()),
this,
SLOT(send_next_packet
()));
QObject::connect(tcpSocket,
SIGNAL(readyRead
()),
this,
SLOT(receive_packet
()));
QObject::connect(tcpSocket,
SIGNAL(disconnected
()),
this,
SLOT(close
()));
QObject::connect(parent,
SIGNAL(destroyed
()),
this,
SLOT(close
()));
}
void Synapse::close(){
//disconnect everything
emit send_dbgmsg("Synapse: disconnected");
if (connected){
tcpSocket->close();
QObject::disconnect(tcpSocket,
SIGNAL(readyRead
()),
this,
SLOT(receive_packet
()));
QObject::disconnect(this,
SIGNAL(get_next_packet
()),
this,
SLOT(send_next_packet
()));
QObject::disconnect(tcpSocket,
SIGNAL(disconnected
()),
this,
SLOT(close
()));
QObject::disconnect(parent,
SIGNAL(destroyed
()),
this,
SLOT(close
()));
if(maxSize != 0 && maxSize != 0){
curPacket->clear();
}
for (int i = 0;i<packet_queue.size();i++){
packet_queue[i]->clear();
delete packet_queue[i];
}
packet_queue.clear();
}
this->deleteLater();
}
void Synapse::receive_packet(){
qint64 blocksize = tcpSocket->bytesAvailable();
if (blocksize == 0){
emit send_dbgmsg("Synapse: No data avaliable on socket");
return;
}
//allocate the buffer
char *data = new char[blocksize];
//read form socket
if (tcpSocket->read(data,blocksize) != blocksize){
emit send_dbgmsg
(QString("Synapse: %1").
arg(tcpSocket
->errorString
()));
delete data;
return;
}
//check if this is a new package
if (curSize == 0 && blocksize >= DESCRIPTOR_SIZE){
//new packet
curPacket = new Packet(data,blocksize);
maxSize = curPacket->get_packet_descriptor().Packetsize;
curSize = curPacket->get_size();//blocksize;
}else{
//write packet block to stream
if(curPacket){
curPacket->serialize_DATA(data,blocksize);
curSize = curPacket->get_size();//blocksize;
}else{
emit send_dbgmsg("Synapse: Invalid reference to packet");
delete data;
curSize = maxSize = 0;
return;
}
}
if(curPacket){
if(curSize == maxSize && maxSize > 0){
//stop receiving packets, digest current pne
QObject::disconnect(tcpSocket,
SIGNAL(readyRead
()),
this,
SLOT(receive_packet
()));
delete data;
if(curPacket->get_packet_descriptor().Version != DESCRIPTOR_VERSION || curPacket->is_descriptor_checksum_valid() == false){
emit send_dbgmsg
(QString("Synapse: DescriptorVersion - got %1 expected %2 PacketDescriptor-Checksum - got %3 expected %4").
arg(curPacket
->get_packet_descriptor
().
Version).
arg(DESCRIPTOR_VERSION
).
arg(curPacket
->compute_descriptor_checksum
()).
arg(curPacket
->get_packet_descriptor
().
Checksum));
curSize = maxSize = 0;
delete data;
return;
}
curPacket->uncompress_packet();
ident_packet();
return;
}else if(curSize > maxSize){
delete data;
emit send_dbgmsg
(QString("Synapse: Received corrupted packet - got %1 expected %2").
arg(curSize
).
arg(maxSize
));
curSize = maxSize = 0;
return;
}
}
}
bool Synapse::send_packet(Packet *packet){
if(!packet) return false;
packet->finalize_packet();
qint64 size = packet->get_size();
emit send_dbgmsg
(QString("Synapse: Sending Format %1 Type %2 Size %3 ID %4 Checksum %5").
arg(packet
->get_packet_descriptor
().
Format).
arg(packet
->get_packet_descriptor
().
Type).
arg(size
).
arg(packet
->get_packet_descriptor
().
JobID).
arg(packet
->get_packet_descriptor
().
Checksum));
if (size <= blockSize){
if (tcpSocket->write(packet->get_data(),size) == -1) return false;
tcpSocket->waitForBytesWritten();
}else{
int i = 0;
while (size >= blockSize){
if (tcpSocket->write(packet->get_data()+(i*blockSize),blockSize) == -1) return false;
tcpSocket->waitForBytesWritten();
size -= blockSize;
i++;
}
if(size > 0){
if(tcpSocket->write(packet->get_data()+(i*blockSize),size) == -1)return false;
tcpSocket->waitForBytesWritten();
}
}
return true;
}
void Synapse::ident_packet(void){
Packet::PacketDescriptor descriptor = curPacket->get_packet_descriptor();
emit send_dbgmsg
(QString("Synapse: Received Format %1 Type %2 Size %3 ID %4 Checksum %5").
arg(descriptor.
Format).
arg(descriptor.
Type).
arg(descriptor.
Packetsize).
arg(descriptor.
JobID).
arg(descriptor.
Checksum));
switch(descriptor.Type){
case Packet::UNKNOWN:
break;
case Packet::CHAT_MSG:
emit send_dbgmsg
(QString("Synapse: CHAT_MSG %1").
arg(curPacket
->deserialize_CHAT_MSG
()));
break;
case Packet::REQ_PEERLIST:
break;
//submit to PeerManager and PeerManager sends PeerList backto this Peer to send it
case Packet::REP_PEERLIST:
break;
//submit to PeerManager to add to PeerList
case Packet::FILE:
//debuggin methode
emit send_dbgmsg("Synapse: Writing test.bin...");
std::fbytestream stream; // create the new file
stream.open("test.bin",std::ios::out | std::ios::binary);
if(!stream.is_open()) return;
stream.put<std::bytes>(curPacket->get_content());
stream.flush();
stream.close();
emit send_dbgmsg("Synapse: Writing test.bin was successfull");
break;
}
//add to request list...
curSize = maxSize = 0;
//start receiving again after adding this to the request handler
QObject::connect(tcpSocket,
SIGNAL(readyRead
()),
this,
SLOT(receive_packet
()));
}
bool Synapse::add_packet(Packet *packet){
//add readwrite lock
if(packet_queue.size()+1 >= queuetlimit) return false;
if(!packet) return false;
packet_queue.append(packet);
if (packet_queue.size() == 1 && sending == false) emit get_next_packet();
return true;
}
void Synapse::send_next_packet(void){
//add readwrite lock
if(!packet_queue.isEmpty()){
sending = true;
if (send_packet(packet_queue[0])){
//if sending was successfull remove packet from queue
packet_queue[0]->clear();
delete packet_queue[0];
packet_queue.removeFirst();
}else{
//try again two times
if (!send_packet(packet_queue[0])) send_packet(packet_queue[0]);
//remove it whether it worked or not
packet_queue[0]->clear();
delete packet_queue[0];
packet_queue.removeFirst();
}
//reenter the loop
if(!packet_queue.isEmpty()) emit get_next_packet();
sending = false;
}
}
#include "synapse.h"
Synapse::Synapse(QObject *parent_, QTcpSocket *tcpSocket_, int queuelimit_, uint32_t blocksize_){
tcpSocket = tcpSocket_;
parent = parent_;
queuetlimit = queuelimit_;
blockSize = blocksize_;
maxSize = curSize = 0;
sending = false;
connected = true;
}
Synapse::Synapse(QObject *parent_, QHostAddress address_, quint16 port_, int queuelimit_, uint32_t blocksize_){
parent = parent_;
tcpSocket = new QTcpSocket(this);
tcpSocket->connectToHost(address_,port_);
if (!tcpSocket->waitForConnected()){
errorStr = tcpSocket->errorString(); //this is where the timeout occurs :(
connected = false;
return;
}
queuetlimit = queuelimit_;
blockSize = blocksize_;
sending = false;
maxSize = curSize = 0;
connected = true;
}
void Synapse::run(){
if (!connected) return;
QObject::connect(this,SIGNAL(get_next_packet()),this,SLOT(send_next_packet()));
QObject::connect(tcpSocket,SIGNAL(readyRead()),this,SLOT(receive_packet()));
QObject::connect(tcpSocket,SIGNAL(disconnected()),this,SLOT(close()));
QObject::connect(parent,SIGNAL(destroyed()),this,SLOT(close()));
}
void Synapse::close(){
//disconnect everything
emit send_dbgmsg("Synapse: disconnected");
if (connected){
tcpSocket->close();
QObject::disconnect(tcpSocket,SIGNAL(readyRead()),this,SLOT(receive_packet()));
QObject::disconnect(this,SIGNAL(get_next_packet()),this,SLOT(send_next_packet()));
QObject::disconnect(tcpSocket,SIGNAL(disconnected()),this,SLOT(close()));
QObject::disconnect(parent,SIGNAL(destroyed()),this,SLOT(close()));
if(maxSize != 0 && maxSize != 0){
curPacket->clear();
}
for (int i = 0;i<packet_queue.size();i++){
packet_queue[i]->clear();
delete packet_queue[i];
}
packet_queue.clear();
}
this->deleteLater();
}
void Synapse::receive_packet(){
qint64 blocksize = tcpSocket->bytesAvailable();
if (blocksize == 0){
emit send_dbgmsg("Synapse: No data avaliable on socket");
return;
}
//allocate the buffer
char *data = new char[blocksize];
//read form socket
if (tcpSocket->read(data,blocksize) != blocksize){
emit send_dbgmsg(QString("Synapse: %1").arg(tcpSocket->errorString()));
delete data;
return;
}
//check if this is a new package
if (curSize == 0 && blocksize >= DESCRIPTOR_SIZE){
//new packet
curPacket = new Packet(data,blocksize);
maxSize = curPacket->get_packet_descriptor().Packetsize;
curSize = curPacket->get_size();//blocksize;
}else{
//write packet block to stream
if(curPacket){
curPacket->serialize_DATA(data,blocksize);
curSize = curPacket->get_size();//blocksize;
}else{
emit send_dbgmsg("Synapse: Invalid reference to packet");
delete data;
curSize = maxSize = 0;
return;
}
}
if(curPacket){
if(curSize == maxSize && maxSize > 0){
//stop receiving packets, digest current pne
QObject::disconnect(tcpSocket,SIGNAL(readyRead()),this,SLOT(receive_packet()));
delete data;
if(curPacket->get_packet_descriptor().Version != DESCRIPTOR_VERSION || curPacket->is_descriptor_checksum_valid() == false){
emit send_dbgmsg(QString("Synapse: DescriptorVersion - got %1 expected %2 PacketDescriptor-Checksum - got %3 expected %4").arg(curPacket->get_packet_descriptor().Version).arg(DESCRIPTOR_VERSION).arg(curPacket->compute_descriptor_checksum()).arg(curPacket->get_packet_descriptor().Checksum));
curSize = maxSize = 0;
delete data;
return;
}
curPacket->uncompress_packet();
ident_packet();
return;
}else if(curSize > maxSize){
delete data;
emit send_dbgmsg(QString("Synapse: Received corrupted packet - got %1 expected %2").arg(curSize).arg(maxSize));
curSize = maxSize = 0;
return;
}
}
}
bool Synapse::send_packet(Packet *packet){
if(!packet) return false;
packet->finalize_packet();
qint64 size = packet->get_size();
emit send_dbgmsg(QString("Synapse: Sending Format %1 Type %2 Size %3 ID %4 Checksum %5").arg(packet->get_packet_descriptor().Format).arg(packet->get_packet_descriptor().Type).arg(size).arg(packet->get_packet_descriptor().JobID).arg(packet->get_packet_descriptor().Checksum));
if (size <= blockSize){
if (tcpSocket->write(packet->get_data(),size) == -1) return false;
tcpSocket->waitForBytesWritten();
}else{
int i = 0;
while (size >= blockSize){
if (tcpSocket->write(packet->get_data()+(i*blockSize),blockSize) == -1) return false;
tcpSocket->waitForBytesWritten();
size -= blockSize;
i++;
}
if(size > 0){
if(tcpSocket->write(packet->get_data()+(i*blockSize),size) == -1)return false;
tcpSocket->waitForBytesWritten();
}
}
return true;
}
void Synapse::ident_packet(void){
Packet::PacketDescriptor descriptor = curPacket->get_packet_descriptor();
emit send_dbgmsg(QString("Synapse: Received Format %1 Type %2 Size %3 ID %4 Checksum %5").arg(descriptor.Format).arg(descriptor.Type).arg(descriptor.Packetsize).arg(descriptor.JobID).arg(descriptor.Checksum));
switch(descriptor.Type){
case Packet::UNKNOWN:
break;
case Packet::CHAT_MSG:
emit send_dbgmsg(QString("Synapse: CHAT_MSG %1").arg(curPacket->deserialize_CHAT_MSG()));
break;
case Packet::REQ_PEERLIST:
break;
//submit to PeerManager and PeerManager sends PeerList backto this Peer to send it
case Packet::REP_PEERLIST:
break;
//submit to PeerManager to add to PeerList
case Packet::FILE:
//debuggin methode
emit send_dbgmsg("Synapse: Writing test.bin...");
std::fbytestream stream; // create the new file
stream.open("test.bin",std::ios::out | std::ios::binary);
if(!stream.is_open()) return;
stream.put<std::bytes>(curPacket->get_content());
stream.flush();
stream.close();
emit send_dbgmsg("Synapse: Writing test.bin was successfull");
break;
}
//add to request list...
curSize = maxSize = 0;
//start receiving again after adding this to the request handler
QObject::connect(tcpSocket,SIGNAL(readyRead()),this,SLOT(receive_packet()));
}
bool Synapse::add_packet(Packet *packet){
//add readwrite lock
if(packet_queue.size()+1 >= queuetlimit) return false;
if(!packet) return false;
packet_queue.append(packet);
if (packet_queue.size() == 1 && sending == false) emit get_next_packet();
return true;
}
void Synapse::send_next_packet(void){
//add readwrite lock
if(!packet_queue.isEmpty()){
sending = true;
if (send_packet(packet_queue[0])){
//if sending was successfull remove packet from queue
packet_queue[0]->clear();
delete packet_queue[0];
packet_queue.removeFirst();
}else{
//try again two times
if (!send_packet(packet_queue[0])) send_packet(packet_queue[0]);
//remove it whether it worked or not
packet_queue[0]->clear();
delete packet_queue[0];
packet_queue.removeFirst();
}
//reenter the loop
if(!packet_queue.isEmpty()) emit get_next_packet();
sending = false;
}
}
To copy to clipboard, switch view to plain text mode
Bookmarks