How to Properly Write to Fifos in Python

How do I properly write to FIFOs in Python?

read() doesn't return until it reaches EOF.

You can try specifying the number of bytes you want read, like read(4). This will still block until enough bytes have been written, so the producer must write at least that many bytes and then call flush().

using FIFOs for input and output in python

From the man page for mkfifo:

Opening a FIFO for reading normally blocks until some other process opens the same FIFO for writing, and vice versa. See fifo(7) for nonblocking handling of FIFO special files.

So the second time you open the FIFO for reading, the call blocks. This can be seen in the traceback after pressing Ctrl+C:

^CTraceback (most recent call last):
0
Traceback (most recent call last):
File "shell_fifo.py", line 51, in <module>
File "shell.py", line 4, in <module>
test()
File "shell_fifo.py", line 48, in test
read()
File "shell_fifo.py", line 29, in read
print i, open(FIFO_PATH, 'r').readline() # read() is blocked here
KeyboardInterrupt
Input=raw_input()
KeyboardInterrupt

Change your read function so that it only opens the FIFO once:

def read():
FIFO_PATH = '/tmp/my_fifo'
i = 0
with open(FIFO_PATH, 'r') as read_fifo:
while i < 10:
i += 1
print i, read_fifo.readline().rstrip()

You should see output like this:

in_fifo: <open file '/tmp/in_fifo', mode 'rw+' at 0x7f1ba655b5d0>
my_fifo: <open file '/tmp/my_fifo', mode 'rw+' at 0x7f1ba655b540>
1 shell called
2 input test
3 Yeehhaaaaa it works

Building FIFO data structure in python

In your example, you make Queue inherit from Node, but it appears that 'nodes' are really the elements in the queue, while the queue is managing the whole thing. Also, you named the methods 'pop' and 'push', but for a queue, you typically talk about reading and writing, since popping and pushing is what you do to a stack.

With that in mind, the code below has various fixes to your code, but the same overall structure and I think it works as you expect:

class Node:
def __init__(self, item):
self.item = item
self.next = None

class Queue:
def __init__(self):
self.first = None
self.last = None
self.size = 0

def write(self, item):
node = Node(item)

if self.first is None:
self.first = node

if self.last is None:
self.last = node
else:
self.last.next = node
self.last = node

self.size += 1

def read(self):
if self.is_empty():
raise Exception('Empty queue')

item = self.first.item

self.first = self.first.next
if self.first is None:
self.last = None

self.size -= 1
return item

def is_empty(self):
return self.size == 0

A few remarks on key differences:

  • Queue no longer inherits from Node
  • .next is used on instances of Node, so for example self.first.next
  • I've added a .last since you want access to both the start and the end of a queue

Here's some code to test it:

q = Queue()
q.write('a')
q.write('b')
print(q.read())
q.write('c')
print(q.read())
print(q.read())

# read from empty queue
print(q.read())

It works with the above, it didn't work with some of the other answers, but perhaps I'm missing how they expect the code to be used.

Output:

a
b
c
Traceback (most recent call last):
File "C:/dev/project/python/sandbox/q.py", line 53, in <module>
print(q.read())
File "C:/dev/project/python/sandbox/q.py", line 29, in read
raise Exception('Empty queue')
Exception: Empty queue

fifo - reading in a loop

A FIFO works (on the reader side) exactly this way: it can be read from, until all writers are gone. Then it signals EOF to the reader.

If you want the reader to continue reading, you'll have to open again and read from there. So your snippet is exactly the way to go.

If you have mutliple writers, you'll have to ensure that each data portion written by them is smaller than PIPE_BUF on order not to mix up the messages.

Create a temporary FIFO (named pipe) in Python?

os.mkfifo() will fail with exception OSError: [Errno 17] File exists if the file already exists, so there is no security issue here. The security issue with using tempfile.mktemp() is the race condition where it is possible for an attacker to create a file with the same name before you open it yourself, but since os.mkfifo() fails if the file already exists this is not a problem.

However, since mktemp() is deprecated you shouldn't use it. You can use tempfile.mkdtemp() instead:

import os, tempfile

tmpdir = tempfile.mkdtemp()
filename = os.path.join(tmpdir, 'myfifo')
print filename
try:
os.mkfifo(filename)
except OSError, e:
print "Failed to create FIFO: %s" % e
else:
fifo = open(filename, 'w')
# write stuff to fifo
print >> fifo, "hello"
fifo.close()
os.remove(filename)
os.rmdir(tmpdir)

EDIT: I should make it clear that, just because the mktemp() vulnerability is averted by this, there are still the other usual security issues that need to be considered; e.g. an attacker could create the fifo (if they had suitable permissions) before your program did which could cause your program to crash if errors/exceptions are not properly handled.

python write output to different fifo pipe file?

I think you're looking to create pipes for two separate, and apparently simultaneous, MySQL imports, from the same Python script?

While it's not impossible to do this via shell redirection, it's going to be painful. Your Python script has to somehow pass the file descriptors of its pipes to the shell, so your shell script can redirect those file descriptors to the MySQL commands.

A much easier solution is to do it in Python, with the subprocess module.

I don't know the tool and syntax you hope to use for doing the bulk load; all you've told us is that you want to give it a "pipe". So, I'll just assume that it's the mysqlimport command mentioned in hbristow's answer, and that it handles stdin via the usual Unix convention of giving it - as a filename; since this is just for demonstrating the actual interesting part, it doesn't matter very much anyway.

So:

from subprocess import Popen, stdin

args = ['mysqlimport', my_db_name, '-']
with Popen(args, stdin=PIPE) as import1, Popen(args, stdin=PIPE) as import2:
with open('giantfile.txt') as f:
for line in f:
data = parse(line)
if belongs_in_import2(data):
import2.stdin.write(make_sql(data))
else:
import1.stdin.write(make_sql(data))

We've created two separate child processes, each with its own separate stdin pipe, and we can write to them the same way we can to any other files.

You may need to import1.stdin.close() and import2.stdin.close() if the mysqlimport tool expects you to close/EOF the input file before actually waiting on it to exit.

If you're using Python 2.4-2.7, you should install and use the subprocess32 backport. If you can't do that for some reason (or if you're using Python 3.0-3.1 and can't upgrade for some reason), you can't use a with statement here; instead, you need to explicitly close the pipes and wait the processes.

Python and FIFOs

As other comments have alluded to, you have a race condition.

I suspect that in the failing case, the server gets suspended after one of these lines:

g.write(str(x**2) + "\n")
g.close()

The client is then able to read the result, print it to the screen, and loop back. It then reopens f - which succeeds, because it's still open on the server side - and writes the message. Meanwhile, the server has managed to close f. Next, the flush on the client side executes a write() syscall on the pipe, which triggers the SIGPIPE because it's now closed on the other side.

If I'm correct, you should be able to fix it by moving the server's f.close() to be above the g.write(...).



Related Topics



Leave a reply



Submit