Protocol Buffer Over Socket in C++

Protocol Buffer over socket in C++

At last I could get it working . I am posting the code here so that one can review and comment on it as well as if some one wants to implement it in c++, this piece of code can help. Its a shabby code my intention was to get Protobuf working in length prefixed manner. I have taken the code of client server from some site which I don't remember and I have modified it to accommodate protobuf. Here the server first peeks into the socket and gets the length of the total packet and then actual socket read is done to read the entire packet. There can be zillion ways to do it but for quick solution I did it in this manner. But I need to find a better way to avoid 2 recv per packets, but in my condition all the messages are of different size, so this is the only way I guess.

Proto file

  message log_packet {
required fixed64 log_time =1;
required fixed32 log_micro_sec =2;
required fixed32 sequence_no =3;
required fixed32 shm_app_id =4;
required string packet_id =5;
required string log_level=6;
required string log_msg=7;
}

Protocol buffer Client Code

#include <unistd.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;

using namespace std;
int main(int argv, char** argc){

/* Coded output stram */

log_packet payload ;

payload.set_log_time(10);
payload.set_log_micro_sec(10);
payload.set_sequence_no(1);
payload.set_shm_app_id(101);
payload.set_packet_id("TST");
payload.set_log_level("DEBUG");
payload.set_log_msg("What shall we say then");

cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
int siz = payload.ByteSize()+4;
char *pkt = new char [siz];
google::protobuf::io::ArrayOutputStream aos(pkt,siz);
CodedOutputStream *coded_output = new CodedOutputStream(&aos);
coded_output->WriteVarint32(payload.ByteSize());
payload.SerializeToCodedStream(coded_output);

int host_port= 1101;
char* host_name="127.0.0.1";

struct sockaddr_in my_addr;

char buffer[1024];
int bytecount;
int buffer_len=0;

int hsock;
int * p_int;
int err;

hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1){
printf("Error initializing socket %d\n",errno);
goto FINISH;
}

p_int = (int*)malloc(sizeof(int));
*p_int = 1;

if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
printf("Error setting options %d\n",errno);
free(p_int);
goto FINISH;
}
free(p_int);

my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);

memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = inet_addr(host_name);
if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
if((err = errno) != EINPROGRESS){
fprintf(stderr, "Error connecting socket %d\n", errno);
goto FINISH;
}
}

for (int i =0;i<10000;i++){
for (int j = 0 ;j<10;j++) {

if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) {
fprintf(stderr, "Error sending data %d\n", errno);
goto FINISH;
}
printf("Sent bytes %d\n", bytecount);
usleep(1);
}
}
delete pkt;

FINISH:
close(hsock);

}

Protocol buffer Server Code

#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <netinet/in.h>
#include <resolv.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>

using namespace std;
using namespace google::protobuf::io;

void* SocketHandler(void*);

int main(int argv, char** argc){

int host_port= 1101;

struct sockaddr_in my_addr;

int hsock;
int * p_int ;
int err;

socklen_t addr_size = 0;
int* csock;
sockaddr_in sadr;
pthread_t thread_id=0;

hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1){
printf("Error initializing socket %d\n", errno);
goto FINISH;
}

p_int = (int*)malloc(sizeof(int));
*p_int = 1;

if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
printf("Error setting options %d\n", errno);
free(p_int);
goto FINISH;
}
free(p_int);

my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);

memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = INADDR_ANY ;

if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno);
goto FINISH;
}
if(listen( hsock, 10) == -1 ){
fprintf(stderr, "Error listening %d\n",errno);
goto FINISH;
}

//Now lets do the server stuff

addr_size = sizeof(sockaddr_in);

while(true){
printf("waiting for a connection\n");
csock = (int*)malloc(sizeof(int));
if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1){
printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr));
pthread_create(&thread_id,0,&SocketHandler, (void*)csock );
pthread_detach(thread_id);
}
else{
fprintf(stderr, "Error accepting %d\n", errno);
}
}

FINISH:
;//oops
}

google::protobuf::uint32 readHdr(char *buf)
{
google::protobuf::uint32 size;
google::protobuf::io::ArrayInputStream ais(buf,4);
CodedInputStream coded_input(&ais);
coded_input.ReadVarint32(&size);//Decode the HDR and get the size
cout<<"size of payload is "<<size<<endl;
return size;
}

void readBody(int csock,google::protobuf::uint32 siz)
{
int bytecount;
log_packet payload;
char buffer [siz+4];//size of the payload and hdr
//Read the entire buffer including the hdr
if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){
fprintf(stderr, "Error receiving data %d\n", errno);
}
cout<<"Second read byte count is "<<bytecount<<endl;
//Assign ArrayInputStream with enough memory
google::protobuf::io::ArrayInputStream ais(buffer,siz+4);
CodedInputStream coded_input(&ais);
//Read an unsigned integer with Varint encoding, truncating to 32 bits.
coded_input.ReadVarint32(&siz);
//After the message's length is read, PushLimit() is used to prevent the CodedInputStream
//from reading beyond that length.Limits are used when parsing length-delimited
//embedded messages
google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz);
//De-Serialize
payload.ParseFromCodedStream(&coded_input);
//Once the embedded message has been parsed, PopLimit() is called to undo the limit
coded_input.PopLimit(msgLimit);
//Print the message
cout<<"Message is "<<payload.DebugString();

}

void* SocketHandler(void* lp){
int *csock = (int*)lp;

char buffer[4];
int bytecount=0;
string output,pl;
log_packet logp;

memset(buffer, '\0', 4);

while (1) {
//Peek into the socket and get the packet size
if((bytecount = recv(*csock,
buffer,
4, MSG_PEEK))== -1){
fprintf(stderr, "Error receiving data %d\n", errno);
}else if (bytecount == 0)
break;
cout<<"First read byte count is "<<bytecount<<endl;
readBody(*csock,readHdr(buffer));
}

FINISH:
free(csock);
return 0;
}

Protobufer from c++ to c# over Socket

I found out the problem. Was not to fix the length of the received buffer. You have to set yor buffer same as the send message. For example if your message is 32 bytes length, your buffer will be :

byte[] data = new byte[32];
recv = newsock.ReceiveFrom(data, ref Remote);

The problem now is i do not know how length is going to be my message...anyway i found out the reason.

Send protocol buffer data via Socket and determine the class

It is impossible to detect which object was serialized, Protobuf don't do it. But you can handle that using protobuf very easy:

1) Method: just send message that has type and string body. To body you will serialize your objects, and in type you will show which object is serialized:

Something like that:

package MyGreatPackage;

message Pack
{
required bytes packcode = 1;
//code for data/query
required bytes mess = 2;
}

message Data
{
//anything you need to
}

message Query
{
//anything you need to
}

So, you will always send message Pack, where will be defined which object exactly is in "mess" field.

2) Method: protobuf allows this technique to achieve same thing without pack wrapper, look here: https://developers.google.com/protocol-buffers/docs/techniques?hl=ru#union

message OneMessage {
enum Type { FOO = 1; BAR = 2; BAZ = 3; }

// Identifies which field is filled in.
required Type type = 1;

// One of the following will be filled in.
optional Foo foo = 2;
optional Bar bar = 3;
optional Baz baz = 4;
}

So, you can set all classes you may send as optional and determine their types by required parameter.

Still, for me first varians seems better, choose what you like.

C++ Google Protocol Buffers open http socket

Protocol Buffers is just a serialization tool. It will convert your message object into bytes, and will convert bytes back into a message object. The library does not directly implement a way to transport those bytes.

If you have a raw socket, you could wrap it in FileInputStream/FileOutputStream to read/write protobufs from/to it*. Since protobufs are not self-delimiting, you need to write the size as a prefix, followed by the data, and interpret these correctly on the receiving end. See my answer to this other question for code to do that: Are there C++ equivalents for the Protocol Buffers delimited I/O functions in Java?

Another alternative is to use a higher-level transport library like ZeroMQ, which implements sending and receiving of byte-blob messages. Use protobufs to encode/decode the byte blobs, then hand them off to ZeroMQ for transport.

* This works on Unix, where sockets are file descriptors. On Windows, they aren't, so you'll need to implement the ZeroCopyInputStream/ZeroCopyOutputStream directly in terms of send()/recv(), which is not too hard if you use CopyingInputStreamAdapter/CopyingOutputStreamAdaptor.

how to send classes defined in .proto (protocol-buffers) over a socket

You're not supposed to write the protobuf object itself to the socket. Use the SerializeXXX family of methods to get a sequence of bytes which you can write to the socket.

std::string buf;
data.SerializeToString(&buf);
// now you can write buf.data() to the socket

multiple different protobuf messages in a socket

Protobuf v2.6.0 introduced the oneof keyword for this. Example:

message BigMessage {
oneof message {
SdkHGetRet hgetret = 1;
SdkHPut hput = 2;
...
}
}

oneof ensures that exactly one of the fields is set, and lets you switch() on which one.

Note that even before Protobuf 2.6.0, the best solution would have been a series of optional fields, perhaps with an enum to specify which one is set:

message BigMessage {
enum Type { HGETRET = 0, HPUT = 1, ... }
required Type t = 1;
optional SdkHGetRet hgetret = 2;
optional SdkHPut hput = 3;
...
}


Related Topics



Leave a reply



Submit