Linux C/C++ Socket Send in Multi-Thread Code

Linux C/C++ socket send in multi-thread code

It depends upon which primitives you're using to submit data to the socket.

If you're using write(2), send(2), sendto(2), or sendmsg(2) and the size of your message is small enough to fit entirely within the kernel buffers for the socket, then that entire write will be sent as a block without other data being interspersed.

If you're using fwrite(3) (or any other higher-level buffered IO abstraction), then there is a chance that your data will be sent without any other data being interspersed, but I would not rely upon this behavior.

I can't speak to sendfile(2) behavior. I'd like to think that the sendfile(2) operation "writes" the entire contents of the file to the socket before any other write(2) requests on the socket, but the documentation I've read doesn't say a word about it, so you better not make the assumption that it is in any sense "atomic".

The safest mechanism is for only a single thread to ever be submitting data to a socket.

multi thread receiver in single socket

recv doesn't support content-filtered data receiving.

You should distinguish those kind of messages on your own.

How to concurrently send data to multiple servers using C++ socket programming?

Not sure this is the only problem, but given what you shared, the main thread is not waiting for the worker threads to finish their tasks.

Per this answer: When the main thread exits, do other threads also exit?, the process will be terminated when the main thread returns from main().

Proposed fix:

int main() {
// .. your logic here
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
std::thread t(send_data, server_ips[i], port, bufdata);
threads.push_back(std::move(t));
}

for (int i = 0; i < 4; ++i) {
if (threads[i].joinable())
threads[i].join(); // wait for threads[i] to finish
}

// .. clean up
}

socket used in multithreading?

Answering the question:

Having multiple threads listing on the same socket does work, as for recent implementations accept() is thread save.

However one has to take care to check the outcome of all those parallel accept()s as multiple of tehm might return on a client attempting to connect, but only one accept() does this without an error.

Also one could argue this scenario is inefficient, due to those multiple returns.


However these calls

for ( i = 0; i < number_thread; i++ )
{
pthread_create(pid + i, &attr, (void *)fMsgIn, (void*)&fd);
}

to create threads are potential killers, as they pass down to the thread function a reference to a variable local to

void init_accept_req(socketfd fd, int number_thread);

namely fd.

As soon as init_accept_req() has returned, fd is not valid anymore, nor is what the references, which had been passed to the thread functions, are pointing to.

To fix this pass a reference to the listening socket all the way down like so:

void init_accept_req(socketfd * pfd, int number_thread)
{
[...]

for ( i = 0; i < number_thread; i++ )
{
/* control accept requst */
pthread_create(pid + i, &attr, (void *)fMsgIn, (void*) pfd);
}
}

void initDatas(socketfd * pfd)
{
[...]

init_accept_req(pfd, num_accept_req);

[...]

int main(void)
{
int listenfd;

/* initial the message queue. */
/* and start to run ...*/
initDatas(&listenfd);

[...]

Using this approach one only has to make sure main() does end (so that the listening socket listenfd stays valid) as long any of the accepting thread are doing their job.

A solution a bit dirty would be to misuse the thread function's void * typed user-data argument as int and pass down the socket descriptor by value like so:

pthread_create(pid + i, &attr, (void *)fMsgIn, (void*) fd);

Not nice, but feasable as long as sizeof(void*) isn't smaller then sizeof(int).

Is it thread safe to call socket send to different sockets from different threads?

It is perfectly safe for separate threads to call send() to different sockets at the same time.

Like you said, the only time you need to worry is when separate threads are trying to send() to the same socket at the same time, that needs to be coordinated accordingly.

multi-threaded file transfer with socket

I fixed most of the bugs that others have mentioned.

Key points to get multithread/multiclient working:

Eliminate mutex.

Consolidate all arrays previously indexed by socket_index into a new "control" struct. main thread does a malloc for the struct, fills it in, and passes off the struct pointer to the thread.

Remove pthread_join from main thread and run all threads detached. main no longer does any close/cleanup for the client thread.

client thread now does the close/cleanup/free.

Even with all that, the server/client code still needs some work, but now, it does work with multiple simultaneous client connections which I believe was the main issue.

Note: I've answered a similar question before: executing commands via sockets with popen() Pay particular attention to the discussion of the "flag" character.

Anyway, Here's the code. I've cleaned it, annotated the bugs and fixes and wrapped the old/new code with #if 0. Note that some of the "old" code isn't purely original code, but an interim version of mine. [please pardon the gratuitous style cleanup]:


server.c:

/*
Soner
Receive a file over a socket.

Saves it to output.tmp by default.

Interface:

./executable [<port>]

Defaults:

- output_file: output.tmp
- port: 12345
*/

#define _XOPEN_SOURCE 700

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h> /* getprotobyname */
#include <netinet/in.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <unistd.h>

#include <pthread.h>

// NOTE: this consolidates four arrays that were indexed by socket_index
struct client {
socklen_t client_len;
struct sockaddr_in client_address;
int client_sockfd;
pthread_t thread;
};

// NOTE: no longer used/needed for true multiclient
#if 0
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
#endif

// NOTE/BUG: this didn't provide enough space for a 5 digit port + EOS char
#if 0
enum { PORTSIZE = 5 };
#else
enum { PORTSIZE = 6 };
#endif

void *forClient(void *ptr);

void
sig_handler(int signo)
{
if (signo == SIGINT)
printf("!! OUCH, CTRL - C received by server !!\n");
}

int
main(int argc, char **argv)
{
struct addrinfo hints,
*res;
int enable = 1;
//int filefd; // NOTE: this is never initialized/used
int server_sockfd;
unsigned short server_port = 12345u;
char portNum[PORTSIZE];

// NOTE: now all client related data is malloc'ed
#if 0
int socket_index = 0;
#else
struct client *ctl;
#endif

if (argc != 2) {
fprintf(stderr, "Usage ./server <port>\n");
exit(EXIT_FAILURE);
}
server_port = strtol(argv[1], NULL, 10);

memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET; // ipv4
hints.ai_socktype = SOCK_STREAM; // tcp
hints.ai_flags = AI_PASSIVE; // fill in my IP for me

sprintf(portNum, "%d", server_port);
getaddrinfo(NULL, portNum, &hints, &res);

server_sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (server_sockfd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}

if (setsockopt(server_sockfd, SOL_SOCKET, (SO_REUSEPORT | SO_REUSEADDR), &enable, sizeof(enable)) < 0) {
perror("setsockopt(SO_REUSEADDR) failed");
exit(EXIT_FAILURE);
}

if (bind(server_sockfd, res->ai_addr, res->ai_addrlen) == -1) {
perror("bind");
exit(EXIT_FAILURE);
}

if (listen(server_sockfd, 5) == -1) {
perror("listen");
exit(EXIT_FAILURE);
}
fprintf(stderr, "listening on port %d\n", server_port);

// NOTE: we want the threads to run detached so we don't have to wait
// for them to do cleanup -- the thread now does its own close/cleanup
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,1);

while (1) {
// NOTE/BUG: using a fixed list, if you actually let threads detach,
// you don't know which thread completes allowing its control struct
// to be reused
// the solution is to allocate a fresh one, fill it, pass it to the
// thread and let the _thread_ do all the closes and cleanup
#if 0
ctl = &control_list[socket_index];
#else
ctl = malloc(sizeof(struct client));
if (ctl == NULL) {
perror("malloc");
exit(EXIT_FAILURE);
}
#endif

ctl->client_len = sizeof(ctl->client_address);
puts("waiting for client");

ctl->client_sockfd = accept(server_sockfd,
(struct sockaddr *) &ctl->client_address, &ctl->client_len);

if (ctl->client_sockfd < 0) {
perror("Cannot accept connection\n");
close(server_sockfd);
exit(EXIT_FAILURE);
}

// NOTE: we're running the threads detached now and we're passing down
// extra information just in case the client loop needs it
#if 0
pthread_create(&ctl->thread, NULL, forClient, ctl);
#else
pthread_create(&ctl->thread, &attr, forClient, ctl);
#endif

#if 0
if (BUFSIZ == socket_index) {
socket_index = 0;
}
else {
++socket_index;
}
#endif

// NOTE/BUG: this is why you couldn't do multiple clients at the same
// time -- you are doing a thread join
// but you _had_ to because the main thread didn't know when a thread
// was done with the control struct without the join
#if 0
pthread_join(threads[socket_index], NULL);
close(filefd);
close(client_sockfd[socket_index]);
#endif
}

return EXIT_SUCCESS;
}

void *
forClient(void *ptr)
{
#if 0
int connect_socket = (int) ptr;
#else
struct client *ctl = ptr;
int connect_socket = ctl->client_sockfd;
#endif
int filefd;
ssize_t read_return;
char buffer[BUFSIZ];
char *file_path;
long long file_length;
char receiveFileName[BUFSIZ];

//int ret = 1;

// Thread number means client's id
printf("Thread number %ld\n", pthread_self());

// NOTE: to run parallel threads, this prevents that
#if 0
pthread_mutex_lock(&mutex1);
#endif

// until stop receiving go on taking information
while (recv(connect_socket, receiveFileName, sizeof(receiveFileName), 0)) {
// NOTE/FIX2: now we have the client send us the file length so we
// know when to stop the read loop below
file_length = strtoll(receiveFileName,&file_path,10);

if (*file_path != ',') {
fprintf(stderr,"syntax error in request -- '%s'\n",
receiveFileName);
exit(EXIT_FAILURE);
}
file_path += 1;

fprintf(stderr, "is the file name received? ? => %s [%lld bytes]\n",
file_path,file_length);

// NOTE: if you want to see _why_ sending the length is necessary,
// uncomment this line and the "unable to send two files" bug will
// reappear
//file_length = 1LL << 62;

filefd = open(file_path,
O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
if (filefd == -1) {
perror("open");
exit(EXIT_FAILURE);
}

// NOTE/BUG2/FIX: now we only read up to what we're told to read
// previously, we would keep trying to read, so on the _second_
// send, our read call here would get the data that _should_ have
// gone into the recv above
// in other words, we'd lose synchronization with what the client
// was sending us [and we'd put the second filename into the first
// file as data at the bottom]
for (; file_length > 0; file_length -= read_return) {
read_return = BUFSIZ;
if (read_return > file_length)
read_return = file_length;

read_return = read(connect_socket, buffer, read_return);
if (read_return == -1) {
perror("read");
exit(EXIT_FAILURE);
}
if (read_return == 0)
break;

if (write(filefd, buffer, read_return) == -1) {
perror("write");
exit(EXIT_FAILURE);
}
}

fprintf(stderr,"file complete\n");

// NOTE/BUG: filefd was never closed
#if 1
close(filefd);
#endif
}

#if 0
pthread_mutex_unlock(&mutex1);
#endif

fprintf(stderr, "Client dropped connection\n");

// NOTE: do all client related cleanup here
// previously, the main thread was doing the close, which is why it had
// to do the pthread_join
close(connect_socket);
free(ctl);

// NOTE: this needs a void * value like below
#if 0
pthread_exit(&ret);
#endif

return (void *) 0;
}

client.c:

/*
Soner
Send a file over a socket.

Interface:

./executable [<sever_hostname> [<port>]]

Defaults:

- server_hostname: 127.0.0.1
- port: 12345
*/

#define _XOPEN_SOURCE 700

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <signal.h>

#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h> /* getprotobyname */
#include <netinet/in.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <unistd.h>

// NOTE/BUG: this didn't provide enough space for a 5 digit port + EOS char
#if 0
enum { PORTSIZE = 5 };
#else
enum { PORTSIZE = 6 };
#endif

// NOTE2: the "volatile" attribute here is critical to proper operation
volatile int signo_taken;

// NOTE/BUG2: don't use BUFSIZ when you really want something else
#define MAXFILES 1000

void
sig_handler(int signo)
{

// NOTE/BUG2/FIX: doing printf within a signal handler is _not_ [AFAIK] a
// safe thing to do because it can foul up the internal structure data of
// stdout if the base task was doing printf/puts and the signal occurred
// in the middle -- there are a number of other restrictions, such as
// _no_ malloc, etc.

// so, just alert the base layer and let it handle things when it's in a
// "safe" state to do so ...
signo_taken = signo;
}

int
main(int argc, char **argv)
{
struct addrinfo hints,
*res;
char *server_hostname = "127.0.0.1";
char file_path[BUFSIZ];
char *server_reply = NULL;
char *user_input = NULL;
char buffer[BUFSIZ];
int filefd;
int sockfd;
struct stat st;
ssize_t read_return;
struct hostent *hostent;
unsigned short server_port = 12345;
char portNum[PORTSIZE];
char remote_file[BUFSIZ];
int select;
char *client_server_files[MAXFILES];
int i = 0;
int j;

// char filename_to_send[BUFSIZ];

if (argc != 3) {
fprintf(stderr, "Usage ./client <ip> <port>\n");
exit(EXIT_FAILURE);
}

server_hostname = argv[1];
server_port = strtol(argv[2], NULL, 10);

/* Prepare hint (socket address input). */
hostent = gethostbyname(server_hostname);
if (hostent == NULL) {
fprintf(stderr, "error: gethostbyname(\"%s\")\n", server_hostname);
exit(EXIT_FAILURE);
}

memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET; // ipv4
hints.ai_socktype = SOCK_STREAM; // tcp
hints.ai_flags = AI_PASSIVE; // fill in my IP for me

sprintf(portNum, "%d", server_port);
getaddrinfo(NULL, portNum, &hints, &res);

sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (sockfd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}

/* Do the actual connection. */
if (connect(sockfd, res->ai_addr, res->ai_addrlen) == -1) {
perror("connect");
return EXIT_FAILURE;
}

// NOTE/FIX2: this only needs to be done once, since the desired action is
// to [cleanly] stop the program
signal(SIGINT, sig_handler);

// NOTES:
// (1) instead of using signo_taken as is done, below there are alternate
// ways to handle signals with sigsetjmp and siglongjmp
// (2) but the main reason to _not_ do this is to prevent the handler
// from messing up a file transfer
while (! signo_taken) {
puts("connected to the server");
#if 0
puts("-----------------");
puts("|1 - listLocal| \n|2 - listServer| \n|3 - sendFile| \n|4 - help| \n|5 - exit| ");
puts("-----------------");
#endif

while (! signo_taken) {
// NOTE: not a bug, but it helps the user to output the menu each
// time
#if 1
puts("-----------------");
puts("|1 - listLocal| \n|2 - listServer| \n|3 - sendFile| \n|4 - help| \n|5 - exit| ");
puts("-----------------");
#endif

scanf("%d", &select);

// NOTE: we should check this after _any_ call that requests user
// input (e.g. scanf, fgets(...,stdin), etc.)
if (signo_taken)
break;

switch (select) {
case 1: // list files of client's directory
system("find . -maxdepth 1 -type f | sort");
break;

case 2: // listServer
puts("---- Files btw Server and the Client ----");
for (j = 0; j < i; ++j) {
puts(client_server_files[j]);
}
break;

case 3: // send file
fputs("Enter filename: ",stdout);
fflush(stdout);

memset(file_path, 0, sizeof file_path);
scanf("%s", file_path);

if (signo_taken)
break;

// NOTE/FIX: check the file _before_ sending request to server
// and we [now] want to know the file length so we can send
// that to the server so it will know when to stop receiving
#if 1
filefd = open(file_path, O_RDONLY);
if (filefd == -1) {
perror("open send file");
// exit(EXIT_FAILURE);
break;
}

// get the file's byte length
if (fstat(filefd,&st) < 0) {
perror("stat send file");
// exit(EXIT_FAILURE);
close(filefd);
break;
}
#endif

// send file name to server
memset(remote_file, 0, sizeof(remote_file));
#if 0
sprintf(remote_file, "%s", file_path);
#else
sprintf(remote_file, "%lld,%s",
(long long) st.st_size,file_path);
#endif
send(sockfd, remote_file, sizeof(remote_file), 0);

// NOTE/BUG2: this should be done above to _not_ confuse server
#if 0
filefd = open(file_path, O_RDONLY);
if (filefd == -1) {
perror("open send file");
// exit(EXIT_FAILURE);
break;
}
#endif

while (1) {
read_return = read(filefd, buffer, BUFSIZ);
if (read_return == 0)
break;

if (read_return == -1) {
perror("read");
// exit(EXIT_FAILURE);
break;
}

if (write(sockfd, buffer, read_return) == -1) {
perror("write");
// exit(EXIT_FAILURE);
break;
}
}

close(filefd);

// add files in char pointer array
// NOTE/BUG2: file_path gets overwritten, so we must save it
// here
#if 0
client_server_files[i++] = file_path;
#else
if (i < MAXFILES)
client_server_files[i++] = strdup(file_path);
#endif

puts("file complete");
break;

case 5:
free(user_input);
free(server_reply);
exit(EXIT_SUCCESS);
break;

default:
puts("Wrong selection!");
break;
}

}
}

// NOTE/FIX2: we output this here when it's save to do so
if (signo_taken)
printf("!! OUCH, CTRL - C received on client !!\n");

free(user_input);
free(server_reply);
exit(EXIT_SUCCESS);
}

UPDATE:

I have solved my connection-interruption problem but signal is still occurring. I left two problems more times file sending and signal handling

I have reworked the client signal handling so that it works as expected [which is to print the message and stop the client].

I have also fixed the problem where only one file could be sent. To understand this, consider the actions of both client and server.

To send a file, client prompts for filename, does a send call with the filename in it. It then opens the file and does a read/write loop to send the file data to the server [and then closes the file descriptor].

To receive a file, server does a recv call to get the filename. It then opens the file [for output] and does a read/write to write the data from the socket to the file [and then closes the file descriptor].

Here is the problem: The termination condition for the server's read/write loop is to wait until the read(connect_socket,...) call returns 0. But, it will not return zero [unless the socket has been closed].

So, now the client does a send call to send the second filename. But, the data for this, instead of going into the server's recv call, will merely be part of the read buffer. That is, the second filename will just be appended to the first file as data.

The solution is to have the client tell the server what the file size is. So, instead of the client doing a send of filename, it now does a send of filesize,filename

The server will now decode this filesize and split off the filename in the recv buffer. Now, the server's read/write loop will maintain a count of how many bytes still need to be read and the loop stops when the remaining count hits zero.

There were one or two other minor bugs. I've updated both client.c and server.c with the bug fixes and annotations



Related Topics



Leave a reply



Submit