How to Prevent Brokenpipeerror When Doing a Flush in Python

How to prevent BrokenPipeError when doing a flush in Python?

The BrokenPipeError is normal as said phantom because the reading process (head) terminates and closes its end of the pipe while the writing process (python) still tries to write.

Is is an abnormal condition, and the python scripts receives a BrokenPipeError - more exactly, the Python interpreter receives a system SIGPIPE signal that it catches and raises the BrokenPipeError to allow the script to process the error.

And you effectively can process the error, because in your last example, you only see a message saying that the exception was ignored - ok it is not true, but seems related to this open issue in Python : Python developpers think important to warn user of the abnormal condition.

What really happens is that AFAIK the python interpreter always signals this on stderr, even if you catch the exception. But you just have to close stderr before exiting to get rid of the message.

I slightly changed your script to :

  • catch the error as you did in your last example
  • catch either IOError (that I get in Python34 on Windows64) or BrokenPipeError (in Python 33 on FreeBSD 9.0) - and display a message for that
  • display a custom Done message on stderr (stdout is closed due to the broken pipe)
  • close stderr before exiting to get rid of the message

Here is the script I used :

import sys

try:
for i in range(4000):
print(i, flush=True)
except (BrokenPipeError, IOError):
print ('BrokenPipeError caught', file = sys.stderr)

print ('Done', file=sys.stderr)
sys.stderr.close()

and here the result of python3.3 pipe.py | head -10 :

0
1
2
3
4
5
6
7
8
9
BrokenPipeError caught
Done

If you do not want the extraneous messages just use :

import sys

try:
for i in range(4000):
print(i, flush=True)
except (BrokenPipeError, IOError):
pass

sys.stderr.close()

BrokenPipeError in Python but not in Perl

What's going on here is that in both cases you have a process writing to a pipe whose read end was closed (by head exiting after a certain number of bytes).

This causes a SIGPIPE signal to be sent to the writing process. By default this kills the process. The process can ignore the signal if it wants to, which just makes the write call fail with an EPIPE error.

Starting with version 3.3, Python raises a BrokenPipeError exception in this case, so it looks like Python 1) ignores SIGPIPE by default and 2) translates EPIPE to a BrokenPipeError exception.

Perl does not ignore or handle signals by default. That means it gets killed by SIGPIPE in your example, but because it is not the last command in a pipeline (that would be head here), the shell just ignores it. You can make it more visible by not using a pipeline:

perl pipe.pl > >(head -n3000 >/dev/null)

This piece of bash trickery makes perl write to a pipe, but not as part of a shell pipeline. I can't test it now, but at minimum this will set $? (the command exit status) to 141 in the shell (128 + signal number, which for SIGPIPE is 13), and it may also report a Broken pipe.

You can deal with it manually in the Perl code, though:

  • Variant 1: Throw an error from the signal handler

    $SIG{PIPE} = sub { die "BrokenPipeError" };
  • Variant 2: Ignore the signal, handle write errors

    $SIG{PIPE} = 'IGNORE';
    ...
    print $i, "\n" or die "Can't print: $!";

    Note that in this case you have to think about buffering, however. If you don't enable autoflush (as in STDOUT->autoflush(1)) and output is going to a pipe or file, Perl will collect the text in an internal buffer first (and the print call will succeed). Only when the buffer gets full (or when the filehandle is closed, whichever happens first) is the text actually written out and the buffer emptied. This is why close can also report write errors.

How to prevent Brokenpipe error in my server script

There was a bunch of things I had to do to make this work.

First, the Broken pipe can occur at two different places in the server code. First is when a new client connects and the server tries to send message New Client Connected to all the clients and second is when an existing client sends a message. So, we need to handle the exception at both the places.

So, we put try/except on both the blocks. where it says if c != conn.

Now, about how to handle the exception.

As I first thought, simply removing the client c from the list on clients will work, but the for loop, for c in clients will throw out runtime error as we try to modify the set clients during iteration.

I tried different methods to get over this problem, but this is the most efficient working method I got.

I changed clients from a set() to an empty list [ ]

Then I changed clients.add to clients.append

Then I changed the for loop to for c in range(len(clients)) and used clients[c] to access clients.

But when I tried this, I saw that the if statement if clients[c] != conn may throw out list index out of bounds error if the program tries to go over a non existant client after removal. So I put it too in a try/except block and let the program continue on exception.

for c in range(len(clients)):
try:
if clients[c] != conn:
try:
message = connection_message.encode(FORMAT)
msg_length = len(message)
send_length = str(msg_length).encode(FORMAT)
send_length += b' ' * (HEADER - len(send_length))
clients[c].sendall(send_length)
clients[c].sendall(message)
except:
clients.remove(clients[c])
except:
continue

The last problem was that even after removal of the client, the thread is still alive, so the active thread count returns more than the number of clients connected. So, instead of printing out the number of active connection as number of alive threads - 1, I print len(clients) + 1, + 1 because on connection of a new client, it prints this line before appending the client to list.

print(f"[ACTIVE CONNECTIONS] {len(clients) + 1}")

So, the entire program now is:

import socket
import threading

HEADER = 64
PORT = 5050
SERVER = socket.gethostbyname(socket.gethostname())
ADDR = (SERVER, PORT)
FORMAT = 'utf-8'
DISCONNECT_MESSAGE = "!DISCONNECT"

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(ADDR)

clients = []
clients_lock = threading.Lock()

def handle_client(conn, addr):
name = conn.recv(HEADER).decode(FORMAT)
if name:
name = int(name)
msg_name = conn.recv(name).decode(FORMAT)
print(f"[NEW CONNECTION] {msg_name} connected.")
connection_message = f"{msg_name} connected."
with clients_lock:
for c in range(len(clients)):
try:
if clients[c] != conn:
try:
message = connection_message.encode(FORMAT)
msg_length = len(message)
send_length = str(msg_length).encode(FORMAT)
send_length += b' ' * (HEADER - len(send_length))
clients[c].sendall(send_length)
clients[c].sendall(message)
except:
clients.remove(clients[c])

except:
continue

with clients_lock:
clients.append(conn)

connected = True
try:
while connected:
msg_length = conn.recv(HEADER).decode(FORMAT)
if msg_length:
msg_length = int(msg_length)
msg1 = conn.recv(msg_length).decode(FORMAT)
msg = f"{msg_name}: {msg1}"
if msg1 == DISCONNECT_MESSAGE:
connected = False
print(f"{msg}")
with clients_lock:
for c in range(len(clients)):
try:
if clients[c] != conn:
try:
message = msg.encode(FORMAT)
msg_length = len(message)
send_length = str(msg_length).encode(FORMAT)
send_length += b' ' * (HEADER - len(send_length))
clients[c].sendall(send_length)
clients[c].sendall(message)
except:
clients.remove(clients[c])
except:
continue
msg = f"You: {msg1}"
message = msg.encode(FORMAT)
msg_length = len(message)
send_length = str(msg_length).encode(FORMAT)
send_length += b' ' * (HEADER - len(send_length))
conn.send(send_length)
conn.send(message)

finally:
with clients_lock:
clients.remove(conn)
conn.close()

def start():
server.listen()
print(f"[LISTENING] Server is listening on {SERVER}")
while True:
conn, addr = server.accept()
thread = threading.Thread(target=handle_client, args=(conn, addr))
thread.daemon = True
thread.start()
print(f"[ACTIVE CONNECTIONS] {len(clients) + 1}")

print("[STARTING] server is starting...")
start()

How to prevent errno 32 broken pipe?

Your server process has received a SIGPIPE writing to a socket. This usually happens when you write to a socket fully closed on the other (client) side. This might be happening when a client program doesn't wait till all the data from the server is received and simply closes a socket (using close function).

In a C program you would normally try setting to ignore SIGPIPE signal or setting a dummy signal handler for it. In this case a simple error will be returned when writing to a closed socket. In your case a python seems to throw an exception that can be handled as a premature disconnect of the client.

How to prevent BrokenPipeErrors after receiving a SIGINT while using process shared objects in Python?

A solution to this issue is to override the default SIGINT signal handler with a handler that will ignore the signal, for instance with the signal.SIG_IGN standard signal handler. It is possible by calling the signal.signal function at the start of the manager's child process:

import concurrent.futures
import multiprocessing.managers
import signal
import time

def init():
signal.signal(signal.SIGINT, signal.SIG_IGN)

class A:

def __init__(self):
manager = multiprocessing.managers.SyncManager()
manager.start(init)
self.event = manager.Event()

def start(self):
try:
while True:
if self.event.is_set():
break
print("processing")
time.sleep(1)
except BaseException as e:
print(type(e).__name__ + " (from pool thread):", e)

def shutdown(self):
self.event.set()

if __name__ == "__main__":
try:
a = A()
pool = concurrent.futures.ThreadPoolExecutor(1)
future = pool.submit(a.start)
while not future.done():
concurrent.futures.wait([future], timeout=0.1)
except BaseException as e:
print(type(e).__name__ + " (from main thread):", e)
finally:
a.shutdown()
pool.shutdown()

Note. — This program also works with a concurrent.futures.ProcessPoolExecutor.



Related Topics



Leave a reply



Submit