Multithreaded Web Server in Python

Multithreaded web server in python

Check this post from Doug Hellmann's blog.

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from SocketServer import ThreadingMixIn
import threading

class Handler(BaseHTTPRequestHandler):

def do_GET(self):
self.send_response(200)
self.end_headers()
message = threading.currentThread().getName()
self.wfile.write(message)
self.wfile.write('\n')
return

class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Handle requests in a separate thread."""

if __name__ == '__main__':
server = ThreadedHTTPServer(('localhost', 8080), Handler)
print 'Starting server, use <Ctrl-C> to stop'
server.serve_forever()

Simple Multithreaded Web Server in Python

You have several errors in your code that I went through in the posted code below. General notes are:

  • use SO_REUSEADDR on the server so you don't have to wait to execute a second time
  • shutdown a socket before close or the underlying socket may stay open, hanging your browser
  • you close client sockets in two places... catch errors so that you can close them all
  • that 'Bad file descriptor' error is a good thing. It happened because you closed the socket and it lets your server thread know its time to terminate. You could catch it an exit gracefully if you want
  • you need a better way to deal with background threads. Right now your list just grows forever and over time most will be stale. I marked them as daemon threads so that the program would terminate at the end, but you need something else.

Fixing up the errors I came up with

from socket import *
import threading
import time

class serverThread(threading.Thread):
def __init__(self, serverPort):
threading.Thread.__init__(self)
self.serverPort = serverPort
self.serverSocket = socket(AF_INET, SOCK_STREAM)
self.serverSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.connectionThreads = []
def run(self):
self.serverSocket.bind(('', self.serverPort))
self.serverSocket.listen(1)
while True:
#Establish the connection
print 'Ready to serve...'
connectionSocket,addr = self.serverSocket.accept()
message = connectionSocket.recv(1024) #Get message
print "Message recieved, opening new thread"
self.connectionThreads.append(connectionThread(connectionSocket, message))
self.connectionThreads[-1].daemon = 1
self.connectionThreads[-1].start()
def close(self):
for t in self.connectionThreads:
try:
t.connSocket.shutdown(SHUT_RDWR)
t.connSocket.close()
except socket.error:
pass
self.serverSocket.shutdown(SHUT_RDWR)
self.serverSocket.close()

class connectionThread (threading.Thread):
def __init__(self, connSocket, message):
threading.Thread.__init__(self)
self.connSocket = connSocket
self.message = message
def run(self):
try:
filename = self.message.split()[1] #Getting requested HTML page
f = open(filename[1:]) #Opening data stream from HTML
outputdata = f.read() #Reading HTML page
f.close() #Closing data stream from HTML
self.connSocket.send("HTTP/1.0 200 OK\r\n") #Send one HTTP header line into socket
for i in range(0, len(outputdata)): #Send the content of the requested file to the client
self.connSocket.send(outputdata[i])
except IOError: #Triggered if user requests bad link
self.connSocket.send("404 Not Found") #Send response message for file not found
finally:
self.connSocket.shutdown(SHUT_RDWR)
self.connSocket.close()

def main():
server = serverThread(8031)
server.daemon = 1
server.start()
end = raw_input("Press enter to stop server...")
server.close()
print "Program complete"

main()

A good multithreaded python webserver?

CherryPy. Features, as listed from the website:

  • A fast, HTTP/1.1-compliant, WSGI thread-pooled webserver. Typically, CherryPy itself takes only 1-2ms per page!
  • Support for any other WSGI-enabled webserver or adapter, including Apache, IIS, lighttpd, mod_python, FastCGI, SCGI, and mod_wsgi
  • Easy to run multiple HTTP servers (e.g. on multiple ports) at once
  • A powerful configuration system for developers and deployers alike
  • A flexible plugin system
  • Built-in tools for caching, encoding, sessions, authorization, static content, and many more
  • A native mod_python adapter
  • A complete test suite
  • Swappable and customizable...everything.
  • Built-in profiling, coverage, and testing support.

Multithreaded webserver in Python

The correct netstat usage is:

netstat -tanp

because you need the -a option to display listening sockets. Add grep to locate your program quickly:

netstat -tanp| grep 8080

Multiprocessing with any popular python webserver

If you need to have several http web server to work with just http requests, you can use Gunicorn which create several instances of your app as child processes.

If you have CPU bound OPs, they will eventually block all http ops, so they should be distributed to other processes. So on start every of your http servers creates several children processes which do heavy tasks.

So the scheme is Gunicorn->http servers->CPU heavy processes

Example with aiohttp:

from aiohttp import web
import time
import multiprocessing as mp
from random import randint

def cpu_heavy_operation(num):
"""Just some CPU heavy task"""
if num not in range(1, 10):
return 0
return str(num**1000000)[0:10]

def process_worker(q: mp.Queue, name: str):
"""Target function for mp.Process. Better convert it to class"""
print(f"{name} Started worker process")
while True:
i = q.get()
if i == "STOP": # poison pill to stop child process gracefully
break
else:
print(f"{name}: {cpu_heavy_operation(i)}")
print(f"{name} Finished worker process")

async def add_another_worker_process(req: web.Request) -> web.Response:
"""Create another one child process"""
q = req.app["cpu_bound_q"]
name = randint(100000, 999999)
pr = mp.Process(
daemon=False,
target=process_worker,
args=(q, f"CPU-Bound_Pr-New-{name}",),
)
pr.start()
req.app["children_pr"] += 1
return web.json_response({"New": name, "Children": req.app["children_pr"]})

async def test_endpoint(req: web.Request) -> web.Response:
"""Just endpoint which feed child processes with tasks"""
x = req.match_info.get("num")
req.app["cpu_bound_q"].put(int(x))
return web.json_response({"num": x})

async def stop_ops(app: web.Application) -> None:
"""To do graceful shutdowns"""
for i in range(app["children_pr"]):
app["cpu_bound_q"].put("STOP")

time.sleep(30) # give child processes chance to stop gracefully

async def init_func_standalone(args=None) -> web.Application:
"""Application factory for standalone run"""
app = web.Application()
app.router.add_get(r"/test/{num:\d+}", test_endpoint)
app.router.add_get("/add", add_another_worker_process)

# create cpu_bound_ops processes block
cpu_bound_q = mp.Queue()
prcs = [
mp.Process(
daemon=False,
target=process_worker,
args=(cpu_bound_q, f"CPU-Bound_Pr-{i}",),
) for i in range(4)
]
[i.start() for i in prcs]
app["children_pr"] = 4 # you should know how many children processes you need to stop gracefully
app["cpu_bound_q"] = cpu_bound_q # Queue for cpu bound ops - multiprocessing module

app.on_cleanup.append(stop_ops)

return app

async def init_func_gunicorn() -> web.Application:
"""is used to run aiohttp with Gunicorn"""
app = await init_func_standalone()
return app

if __name__ == '__main__':
_app = init_func_standalone()
web.run_app(_app, host='0.0.0.0', port=9999)

You see that I multiprocessing, I do it because I like to have more manual control, other option is to go with concurrent.futures. asyncio has run_in_executor method. So just create pool than send CPU heavy tasks to run_in_executor, but before wrap them is create_task asyncio method.



Related Topics



Leave a reply



Submit