Python Sockets Multiple Messages on Same Connection

python sockets multiple messages on same connection

Adding a two types of server-client one is over multi process and the other is asynchronous, they do almost the same thing, the asynchronous one is more robust, read why here:
Threads vs. Async.

My examples:
Using multi process:

import multiprocessing
import socket
import time

HOST = "0.0.0.0"
PORT = 9000


def handle(connection, address):

try:
while True:
data = connection.recv(1024)
connection.sendall(data + ' server time {}'.format(time.time()))
except:
pass
finally:
connection.close()


class Server(object):

def __init__(self, hostname, port):
self.hostname = hostname
self.port = port

def start(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.hostname, self.port))
self.socket.listen(1)

while True:
conn, address = self.socket.accept()
process = multiprocessing.Process(
target=handle, args=(conn, address))
process.daemon = True
process.start()


if __name__ == "__main__":
server = Server(HOST, PORT)
try:
print 'start'
server.start()
except:
print 'something wrong happened, a keyboard break ?'
finally:
for process in multiprocessing.active_children():
process.terminate()
process.join()
print 'Goodbye'

And the client for it :

    import sys
import threading
import time
import socket

SOCKET_AMOUNT = 100
HOST = "localhost"
PORT = 9000


def myclient(ip, port, message):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
sock.sendall(message)
result = sock.recv(1024)
print result + ' final clnt time {}'.format(time.time())
sock.close()

if __name__ == "__main__":
thread_list = []
for i in range(SOCKET_AMOUNT):
msg = "Thread #{}, clnt time {}".format(i, time.time())
client_thread = threading.Thread(
target=myclient, args=(HOST, PORT, msg))
thread_list.append(client_thread)
client_thread.start()

waiting = time.time()
[x.join() for x in thread_list]
done = time.time()
print 'DONE {}. Waiting for {} seconds'.format(done, done-waiting)

The next server is a lot more robust !!! data is not getting lost !!!
the server:

import asyncore
import socket
import time
import logging
import json


class Server(asyncore.dispatcher):

def __init__(self, host, port):

self.logger = logging.getLogger('SERVER')
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(('', port))
self.listen(confjson.get('SERVER_QUEUE_SIZE', None))
self.logger.debug('binding to {}'.format(self.socket.getsockname()))

def handle_accept(self):
socket, address = self.accept()
self.logger.debug('new connection accepted')
EchoHandler(socket)


class EchoHandler(asyncore.dispatcher_with_send):

def handle_read(self):

msg = self.recv(confjson.get('RATE', None))
self.out_buffer = msg
self.out_buffer += ' server recieve: {}'.format(time.time())
if not self.out_buffer:
self.close()


if __name__ == "__main__":

logging.basicConfig(level=logging.DEBUG,
format='%(name)s: %(message)s',
)
with open('config.json', 'r') as jfile:
confjson = json.load(jfile)
try:
logging.debug('Server start')
server = Server(confjson.get('HOST', None),
confjson.get('PORT', None))
asyncore.loop()
except:
logging.error('Something happened,\n'
'if it was not a keyboard break...\n'
'check if address taken, '
'or another instance is running. Exit')
finally:
logging.debug('Goodbye')

And the async client:

import asyncore
import socket
import time
import logging
import json


class Client(asyncore.dispatcher_with_send):

def __init__(self, host, port, message, pk):
self.logger = logging.getLogger('CLIENT')
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.host = host
self.port = port
self.connect((host, port))
self.out_buffer = message
self.clientID = pk
self.logger.debug('Connected #{}'.format(self.clientID))

def handle_close(self):
self.close()

def handle_read(self):
rec_msg = self.recv(confjson.get('RATE', None))
self.logger.debug('#{}, {} back at client {}'.format(self.clientID,
rec_msg,
time.time()
)
)
self.close()


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG,
format='%(name)s: %(message)s',
)

with open('config.json', 'r') as jfile:
confjson = json.load(jfile)
clients = []
for idx in range(confjson.get('SOCKET_AMOUNT', None)):
msg = "Start: {}".format(time.time())
clients.append(Client(confjson.get('HOST', None),
confjson.get('PORT', None),
msg,
idx)
)
start = time.time()
logging.debug(
'Starting async loop for all connections, unix time {}'.format(start))
asyncore.loop()
logging.debug('{}'.format(time.time() - start))

and a small config file:

{
"HOST": "127.0.0.1",
"PORT": 5007,
"RATE": 8096,
"SERVER_QUEUE_SIZE": 16,
"SOCKET_AMOUNT": 100
}

python sockets can't send multiple messages -- data is referenced before assignment--

The problem is that you are calling self.sock.close() after each request without creating a new socket. You will need to create a new socket after each time you close it.

You can solve this by creating a connection per request as follows:

class Connect(object):

def connect(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting to host')
sock.connect(('127.0.0.1',10000))
return sock

def send(self, command):
sock = self.connect()
recv_data = ""
data = True

print('sending: ' + command)
sock.sendall(command)

while data:
data = sock.recv(1024)
recv_data += data
print('received: ' + data)

sock.close()
return recv_data


def main():
connect = Connect()
print connect.send("STATUS")
print connect.send("MEASURE")

Python socket receiving multiple messages at the same time

TCP is a byte-stream protocol. There are no messages but just a bunch of bytes coming in. You must implement a protocol and buffer the data received until you know you have a complete message.

You can use the built-in socket.makefile() method to implement a line-oriented protocol. Example:

server.py

from socket import *

s = socket()
s.bind(('',5000))
s.listen(1)

while True:
c,a = s.accept()
print(f'connect: {a}')
read = c.makefile('r')
write = c.makefile('w')

with c,read,write:
while True:
data = read.readline()
if not data: break
cmd = data.strip()
print(f'cmd: {cmd}')
if cmd == 'LIST':
write.write('C1\nC2\nC3\nDONE\n')
write.flush()

print(f'disconnect: {a}')

client.py

from socket import *

s = socket()
s.connect(('localhost',5000))
read = s.makefile('r',)
write = s.makefile('w')

def send(cmd):
print(cmd)
write.write(cmd + '\n')
write.flush()

with s,read,write:
send('TEST')
send('LIST')
while True:
data = read.readline()
if not data: break
item = data.strip()
if item == 'DONE': break
print(f'item: {item}')
send('OTHER')

Server Output:

connect: ('127.0.0.1', 13338)
cmd: TEST
cmd: LIST
cmd: OTHER
disconnect: ('127.0.0.1', 13338)

Client Output:

TEST
LIST
item: C1
item: C2
item: C3
OTHER

When keeping a client socket open to send several messages, why does the program give an error when there is a big delay in messages?

Your server reads exactly once from the client and receives up to 72 bytes then closes the connection. If the client can send all 10 messages fast enough it won't get an error. Read about the Nagle Algorithm which:

inhibit[s] the sending of new TCP segments when new outgoing data arrives from the user if any previously transmitted data on the connection remains unacknowledged.

Your server will never receive all the messages (total 91 bytes) but the client won't error if it gets them all sent.

A small change to the server makes sure it receives all the data (no matter the delay) by waiting until the client closes the connection:

with connection:
while True:
data = connection.recv(72)
if not data: break # empty data means client closed connection.
print('Client says "{}"'.format(data))

This still won't receive messages one-by-one. TCP is a streaming protocol so the messages get concatenated together. You need to add buffering and a means to extract only complete messages from it. Example (Python 3.6+):

client.py

import socket

host = '127.0.0.1'
port = 65432

with socket.socket() as s:
s.connect((host, port))
for i in range(10):
# Use newline as message delimiter.
s.sendall(f'message {i+1}\n'.encode())

server.py

import socket

with socket.socket() as s:
s.bind(('127.0.0.1', 65432))
s.listen()

while True:
connection, address = s.accept()

with connection:
data = b''
while True:
chunk = connection.recv(16)
if not chunk: break # client closed connection
data += chunk # buffer input
while b'\n' in data: # break off messages and display
msg,_,data = data.partition(b'\n')
print(f'Client says "{msg.decode()}"')
if data:
print(f'Incomplete message: {data}')

Output:

Client says "message 1"
Client says "message 2"
Client says "message 3"
Client says "message 4"
Client says "message 5"
Client says "message 6"
Client says "message 7"
Client says "message 8"
Client says "message 9"
Client says "message 10"

Python - Socket Communication, multiple messages

The problem is, that you only read the first message from each open connection before moving on to the next. The accept() methods waits for a new connection and gives you the information needed when a new one comes in. the recv() method on the other hand, receives data from a existing connection and waits if there is none. If you want to receive multiple messages from a single client, you can just wait for the first connection and then wait for data with recv(). This could look like this:

s.listen(1)
conn, addr = s.accept()
while True:
data = conn.recv(BUFFER_SIZE)
if data == "Comms Shutdown":
print_write("------ REMOTE SHUTDOWN ------")
conn.close()
raise SystemExit
else:
print_write("[COMMS] " + str(addr) + " says: " + data)

If you want to be able to also manage multiple clients, you will have to create a thread for each one from a while loop waiting for new connections. This is a bit more complicated:

def client_handler(conn):
while True:
data = conn.recv(BUFFER_SIZE)
if data == "Comms Shutdown":
print_write("------ REMOTE SHUTDOWN ------")
conn.close()
raise SystemExit
# this will kill the server (remove the line above if you don't want that)
else:
print_write("[COMMS] " + str(addr) + " says: " + data)

while True:
s.listen(1)
conn, addr = s.accept()
recv_thread = threading.Thread(target=client_handler, args=(conn, ))
recv_thread.start()

All this code is untested. Be aware, that I omitted the logging part and the socket creation part as well as all imports.



Related Topics



Leave a reply



Submit