Protocol Buffer over socket in C++
At last I could get it working . I am posting the code here so that one can review and comment on it as well as if some one wants to implement it in c++, this piece of code can help. Its a shabby code my intention was to get Protobuf working in length prefixed manner. I have taken the code of client server from some site which I don't remember and I have modified it to accommodate protobuf. Here the server first peeks into the socket and gets the length of the total packet and then actual socket read is done to read the entire packet. There can be zillion ways to do it but for quick solution I did it in this manner. But I need to find a better way to avoid 2 recv per packets, but in my condition all the messages are of different size, so this is the only way I guess.
Proto file
message log_packet {
required fixed64 log_time =1;
required fixed32 log_micro_sec =2;
required fixed32 sequence_no =3;
required fixed32 shm_app_id =4;
required string packet_id =5;
required string log_level=6;
required string log_msg=7;
}
Protocol buffer Client Code
#include <unistd.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
using namespace google::protobuf::io;
using namespace std;
int main(int argv, char** argc){
/* Coded output stram */
log_packet payload ;
payload.set_log_time(10);
payload.set_log_micro_sec(10);
payload.set_sequence_no(1);
payload.set_shm_app_id(101);
payload.set_packet_id("TST");
payload.set_log_level("DEBUG");
payload.set_log_msg("What shall we say then");
cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
int siz = payload.ByteSize()+4;
char *pkt = new char [siz];
google::protobuf::io::ArrayOutputStream aos(pkt,siz);
CodedOutputStream *coded_output = new CodedOutputStream(&aos);
coded_output->WriteVarint32(payload.ByteSize());
payload.SerializeToCodedStream(coded_output);
int host_port= 1101;
char* host_name="127.0.0.1";
struct sockaddr_in my_addr;
char buffer[1024];
int bytecount;
int buffer_len=0;
int hsock;
int * p_int;
int err;
hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1){
printf("Error initializing socket %d\n",errno);
goto FINISH;
}
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
printf("Error setting options %d\n",errno);
free(p_int);
goto FINISH;
}
free(p_int);
my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = inet_addr(host_name);
if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
if((err = errno) != EINPROGRESS){
fprintf(stderr, "Error connecting socket %d\n", errno);
goto FINISH;
}
}
for (int i =0;i<10000;i++){
for (int j = 0 ;j<10;j++) {
if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) {
fprintf(stderr, "Error sending data %d\n", errno);
goto FINISH;
}
printf("Sent bytes %d\n", bytecount);
usleep(1);
}
}
delete pkt;
FINISH:
close(hsock);
}
Protocol buffer Server Code
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <netinet/in.h>
#include <resolv.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
using namespace std;
using namespace google::protobuf::io;
void* SocketHandler(void*);
int main(int argv, char** argc){
int host_port= 1101;
struct sockaddr_in my_addr;
int hsock;
int * p_int ;
int err;
socklen_t addr_size = 0;
int* csock;
sockaddr_in sadr;
pthread_t thread_id=0;
hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1){
printf("Error initializing socket %d\n", errno);
goto FINISH;
}
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
printf("Error setting options %d\n", errno);
free(p_int);
goto FINISH;
}
free(p_int);
my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = INADDR_ANY ;
if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno);
goto FINISH;
}
if(listen( hsock, 10) == -1 ){
fprintf(stderr, "Error listening %d\n",errno);
goto FINISH;
}
//Now lets do the server stuff
addr_size = sizeof(sockaddr_in);
while(true){
printf("waiting for a connection\n");
csock = (int*)malloc(sizeof(int));
if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1){
printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr));
pthread_create(&thread_id,0,&SocketHandler, (void*)csock );
pthread_detach(thread_id);
}
else{
fprintf(stderr, "Error accepting %d\n", errno);
}
}
FINISH:
;//oops
}
google::protobuf::uint32 readHdr(char *buf)
{
google::protobuf::uint32 size;
google::protobuf::io::ArrayInputStream ais(buf,4);
CodedInputStream coded_input(&ais);
coded_input.ReadVarint32(&size);//Decode the HDR and get the size
cout<<"size of payload is "<<size<<endl;
return size;
}
void readBody(int csock,google::protobuf::uint32 siz)
{
int bytecount;
log_packet payload;
char buffer [siz+4];//size of the payload and hdr
//Read the entire buffer including the hdr
if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){
fprintf(stderr, "Error receiving data %d\n", errno);
}
cout<<"Second read byte count is "<<bytecount<<endl;
//Assign ArrayInputStream with enough memory
google::protobuf::io::ArrayInputStream ais(buffer,siz+4);
CodedInputStream coded_input(&ais);
//Read an unsigned integer with Varint encoding, truncating to 32 bits.
coded_input.ReadVarint32(&siz);
//After the message's length is read, PushLimit() is used to prevent the CodedInputStream
//from reading beyond that length.Limits are used when parsing length-delimited
//embedded messages
google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz);
//De-Serialize
payload.ParseFromCodedStream(&coded_input);
//Once the embedded message has been parsed, PopLimit() is called to undo the limit
coded_input.PopLimit(msgLimit);
//Print the message
cout<<"Message is "<<payload.DebugString();
}
void* SocketHandler(void* lp){
int *csock = (int*)lp;
char buffer[4];
int bytecount=0;
string output,pl;
log_packet logp;
memset(buffer, '\0', 4);
while (1) {
//Peek into the socket and get the packet size
if((bytecount = recv(*csock,
buffer,
4, MSG_PEEK))== -1){
fprintf(stderr, "Error receiving data %d\n", errno);
}else if (bytecount == 0)
break;
cout<<"First read byte count is "<<bytecount<<endl;
readBody(*csock,readHdr(buffer));
}
FINISH:
free(csock);
return 0;
}
Protobufer from c++ to c# over Socket
I found out the problem. Was not to fix the length of the received buffer. You have to set yor buffer same as the send message. For example if your message is 32 bytes length, your buffer will be :
byte[] data = new byte[32];
recv = newsock.ReceiveFrom(data, ref Remote);
The problem now is i do not know how length is going to be my message...anyway i found out the reason.
Send protocol buffer data via Socket and determine the class
It is impossible to detect which object was serialized, Protobuf don't do it. But you can handle that using protobuf very easy:
1) Method: just send message that has type and string body. To body you will serialize your objects, and in type you will show which object is serialized:
Something like that:
package MyGreatPackage;
message Pack
{
required bytes packcode = 1;
//code for data/query
required bytes mess = 2;
}
message Data
{
//anything you need to
}
message Query
{
//anything you need to
}
So, you will always send message Pack, where will be defined which object exactly is in "mess" field.
2) Method: protobuf allows this technique to achieve same thing without pack wrapper, look here: https://developers.google.com/protocol-buffers/docs/techniques?hl=ru#union
message OneMessage {
enum Type { FOO = 1; BAR = 2; BAZ = 3; }
// Identifies which field is filled in.
required Type type = 1;
// One of the following will be filled in.
optional Foo foo = 2;
optional Bar bar = 3;
optional Baz baz = 4;
}
So, you can set all classes you may send as optional and determine their types by required parameter.
Still, for me first varians seems better, choose what you like.
C++ Google Protocol Buffers open http socket
Protocol Buffers is just a serialization tool. It will convert your message object into bytes, and will convert bytes back into a message object. The library does not directly implement a way to transport those bytes.
If you have a raw socket, you could wrap it in FileInputStream
/FileOutputStream
to read/write protobufs from/to it*. Since protobufs are not self-delimiting, you need to write the size as a prefix, followed by the data, and interpret these correctly on the receiving end. See my answer to this other question for code to do that: Are there C++ equivalents for the Protocol Buffers delimited I/O functions in Java?
Another alternative is to use a higher-level transport library like ZeroMQ, which implements sending and receiving of byte-blob messages. Use protobufs to encode/decode the byte blobs, then hand them off to ZeroMQ for transport.
* This works on Unix, where sockets are file descriptors. On Windows, they aren't, so you'll need to implement the ZeroCopyInputStream
/ZeroCopyOutputStream
directly in terms of send()
/recv()
, which is not too hard if you use CopyingInputStreamAdapter
/CopyingOutputStreamAdaptor
.
how to send classes defined in .proto (protocol-buffers) over a socket
You're not supposed to write the protobuf object itself to the socket. Use the SerializeXXX
family of methods to get a sequence of bytes which you can write to the socket.
std::string buf;
data.SerializeToString(&buf);
// now you can write buf.data() to the socket
multiple different protobuf messages in a socket
Protobuf v2.6.0 introduced the oneof
keyword for this. Example:
message BigMessage {
oneof message {
SdkHGetRet hgetret = 1;
SdkHPut hput = 2;
...
}
}
oneof
ensures that exactly one of the fields is set, and lets you switch()
on which one.
Note that even before Protobuf 2.6.0, the best solution would have been a series of optional fields, perhaps with an enum to specify which one is set:
message BigMessage {
enum Type { HGETRET = 0, HPUT = 1, ... }
required Type t = 1;
optional SdkHGetRet hgetret = 2;
optional SdkHPut hput = 3;
...
}
Related Topics
How Can Duff's Device Code Be Compiled
When Does a Std::Vector Reallocate Its Memory Array
How to Make Sure That Std::Random_Shuffle Always Produces a Different Result
Displacement Map Filter in Opencv
How to Generate and Run Native Code Dynamically
How to #Include When There Is a Circular Dependency
Protected Data in Parent Class Not Available in Child Class
C++: Unresolved External Symbol _Sprintf and _Sscanf in Visual Studio 2015
Difference Between Size_T and Std::Size_T
How Would One Call Std::Forward on All Arguments in a Variadic Function
Restrict Passed Parameter to a String Literal
How to Read/Write Std::String Values From/To Binary Files
Checking If a Folder Exists (And Creating Folders) in Qt, C++
How to Resume Input Stream After Stopped by Eof in C++
How to Typedef a Type and the Same Type's Pointer
The Difference Between C and C++ Regarding the ++ Operator
How to Get Position of a Certain Element in Strings Vector, to Use It as an Index in Ints Vector
What Happens When an Exception Goes Unhandled in a Multithreaded C++11 Program