Python 3.5 Asyncio and Multiple Websocket Servers

Python 3.5 Asyncio and Multiple websocket servers

You don't need threads to run multiple asyncio tasks - allowing multiple agents to share the same event loop is the strong suit of asyncio. You should be able to replace both thread-based classes with code like this:

loop = asyncio.new_event_loop()
loop.run_until_complete(websockets.serve(vlWebsocketServer, '192.168.1.3', 8777))
loop.run_until_complete(websockets.serve(irWebsocketServer, '192.168.1.3', 8555))
loop.run_forever()

While it is not exactly wrong to mix threads and asyncio, doing so correctly requires care not to mix up the separate asyncio instances. The safe way to use threads for asyncio is with loop.run_in_executor(), which runs synchronous code in a separate thread without blocking the event loop, while returning an object awaitable from the loop.

Note: the above code was written prior to the advent of asyncio.run() and manually spins the event loop. In Python 3.7 and later one would probably write something like:

async def main():
server1 = await websockets.serve(vlWebsocketServer, '192.168.1.3', 8777)
server2 = await websockets.serve(irWebsocketServer, '192.168.1.3', 8555)
await asyncio.gather(server1.wait_closed(), server2.wait_closed())

asyncio.run(main())

How to manage websockets across multiple servers / workers

Update (Feb 2017)

Channels was (fortunately) not merged into Django. It will probably remain a great project, but it didn't really belong in Django proper.

Also, I would highly recommend taking a look at Postgres's relatively new, built-in support for pub/sub. It will probably outperform anything else, and building a custom solution atop aiohttp, using Postgres as the backing service, might be your best bet.

Original

Though not aiohttp, Django Channels, which is likely to be merged into Django 1.10, solves this problem in a very intuitive way, and it's written by Andrew Godwin, the author of Django migrations.

Django Channels abstracts the notion of "many processes on many servers" by creating a routing layer in front of a Django app. This routing layer speaks with a backend (e.g., Redis) to maintain a sharable state among processes, and uses a new ASGI protocol to facilitate handling both HTTP requests and WebSockets, while delegating each to their respective "consumers" (e.g., ships with a built-in handler for HTTP requests, and you can write your own for WebSockets).

Django Channels has a concept called Groups, which handles the "broadcast" nature of the problem; that is to say, it allows an event which occurs on a server to trigger messages to clients that are in that Group, regardless of whether they're connected to the same or different process or server.

IMHO, Django Channels is very likely to be abstracted into a more general Python library. There are a couple other Python libraries that achieve Go-like Channels but, as of this writing, there is nothing noteworthy that offers network transparency; the ability for Channels to communicate between processes and servers.

Python3 and asyncio: how to implement websocket server as asyncio instance?

Well, finally I've implemented WebServer for using in another thread with asyncio. The code (WebServer code):

from aiohttp import web

class WebServer(BaseServer):

def __init__(self, host, port):
super().__init__(host, port)

@staticmethod
async def handle_connection(self, request: web.web_request):
ws = web.WebSocketResponse()
await ws.prepare(request)

async for msg in ws:
Logger.debug('[Web Server]: {}'.format(msg))

return ws

@staticmethod
def run():
app = web.Application()
web.run_app(app, host=Connection.WEB_SERVER_HOST.value, port=Connection.WEB_SERVER_PORT.value)

And how to run:

executor = ProcessPoolExecutor()

loop.run_until_complete(
asyncio.gather(
login_server.get_instance(),
world_server.get_instance(),
loop.run_in_executor(executor, WebServer.run)
)
)

Python - how to run multiple coroutines concurrently using asyncio?

TL;DR Use asyncio.ensure_future() to run several coroutines concurrently.


Maybe this scenario requires a framework based on events/callbacks rather than one based on coroutines? Tornado?

No, you don't need any other framework for this. The whole idea the asynchronous application vs synchronous is that it doesn't block, while waiting for result. It doesn't matter how it is implemented, using coroutines or callbacks.

I mean, because connection_handler is constantly waiting for incoming messages, the server can only take action after it has received a message from the client, right? What am I missing here?

In synchronous application you will write something like msg = websocket.recv(), which would block whole application until you receive message (as you described). But in the asynchronous application it's completely different.

When you do msg = yield from websocket.recv() you say something like: suspend execution of connection_handler() until websocket.recv() will produce something. Using yield from inside coroutine returns control back to the event loop, so some other code can be executed, while we're waiting for result of websocket.recv(). Please, refer to documentation to better understand how coroutines work.

Let's say we – additionally – wanted to send a message to the client whenever some event happens. For simplicity, let's send a message periodically every 60 seconds. How would we do that?

You can use asyncio.async() to run as many coroutines as you want, before executing blocking call for starting event loop.

import asyncio

import websockets

# here we'll store all active connections to use for sending periodic messages
connections = []

@asyncio.coroutine
def connection_handler(connection, path):
connections.append(connection) # add connection to pool
while True:
msg = yield from connection.recv()
if msg is None: # connection lost
connections.remove(connection) # remove connection from pool, when client disconnects
break
else:
print('< {}'.format(msg))
yield from connection.send(msg)
print('> {}'.format(msg))

@asyncio.coroutine
def send_periodically():
while True:
yield from asyncio.sleep(5) # switch to other code and continue execution in 5 seconds
for connection in connections:
print('> Periodic event happened.')
yield from connection.send('Periodic event happened.') # send message to each connected client

start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.async(send_periodically()) # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()

Here is an example client implementation. It asks you to enter name, receives it back from the echo server, waits for two more messages from server (which are our periodic messages) and closes connection.

import asyncio

import websockets

@asyncio.coroutine
def hello():
connection = yield from websockets.connect('ws://localhost:8000/')
name = input("What's your name? ")
yield from connection.send(name)
print("> {}".format(name))
for _ in range(3):
msg = yield from connection.recv()
print("< {}".format(msg))

yield from connection.close()

asyncio.get_event_loop().run_until_complete(hello())

Important points:

  1. In Python 3.4.4 asyncio.async() was renamed to asyncio.ensure_future().
  2. There are special methods for scheduling delayed calls, but they don't work with coroutines.


Related Topics



Leave a reply



Submit