Multiple Pipes in Subprocess

Python subprocess: how to use pipes thrice?

Just add a third command following the same example:

p1 = subprocess.Popen(['convert', fileIn, 'bmp:-'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['mkbitmap', '-f', '2', '-s', '2', '-t', '0.48'],
stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
p3 = subprocess.Popen(['potrace', '-t' , '5', '-s' , '-o', fileOut],
stdin=p2.stdout,stdout=subprocess.PIPE)
p2.stdout.close()

output = p3.communicate()[0]

Multiple pipes in subprocess

To emulate the bash process substitution:

#!/usr/bin/env python
from subprocess import check_call

check_call('someprogram <(someprocess) <(anotherprocess)',
shell=True, executable='/bin/bash')

In Python, you could use named pipes:

#!/usr/bin/env python
from subprocess import Popen

with named_pipes(n=2) as paths:
someprogram = Popen(['someprogram'] + paths)
processes = []
for path, command in zip(paths, ['someprocess', 'anotherprocess']):
with open(path, 'wb', 0) as pipe:
processes.append(Popen(command, stdout=pipe, close_fds=True))
for p in [someprogram] + processes:
p.wait()

where named_pipes(n) is:

import os
import shutil
import tempfile
from contextlib import contextmanager

@contextmanager
def named_pipes(n=1):
dirname = tempfile.mkdtemp()
try:
paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)]
for path in paths:
os.mkfifo(path)
yield paths
finally:
shutil.rmtree(dirname)

Another and more preferable way (no need to create a named entry on disk) to implement the bash process substitution is to use /dev/fd/N filenames (if they are available) as suggested by @Dunes. On FreeBSD, fdescfs(5) (/dev/fd/#) creates entries for all file descriptors opened by the process. To test availability, run:

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available

If it fails; try to symlink /dev/fd to proc(5) as it is done on some Linuxes:

$ ln -s /proc/self/fd /dev/fd

Here's /dev/fd-based implementation of someprogram <(someprocess) <(anotherprocess) bash command:

#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE

def kill(process):
if process.poll() is None: # still running
process.kill()

with ExitStack() as stack: # for proper cleanup
processes = []
for command in [['someprocess'], ['anotherprocess']]: # start child processes
processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
stack.callback(kill, processes[-1]) # kill on someprogram exit

fds = [p.stdout.fileno() for p in processes]
someprogram = stack.enter_context(
Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds))
for p in processes: # close pipes in the parent
p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn't go unnoticed
raise CalledProcessError(someprogram.returncode, someprogram.args)

Note: on my Ubuntu machine, the subprocess code works only in Python 3.4+, despite pass_fds being available since Python 3.2.

How do I use subprocess.Popen to connect multiple processes by pipes?

You'd be a little happier with the following.

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )

Delegate part of the work to the shell. Let it connect two processes with a pipeline.

You'd be a lot happier rewriting 'script.awk' into Python, eliminating awk and the pipeline.

Edit. Some of the reasons for suggesting that awk isn't helping.

[There are too many reasons to respond via comments.]

  1. Awk is adding a step of no significant value. There's nothing unique about awk's processing that Python doesn't handle.

  2. The pipelining from awk to sort, for large sets of data, may improve elapsed processing time. For short sets of data, it has no significant benefit. A quick measurement of awk >file ; sort file and awk | sort will reveal of concurrency helps. With sort, it rarely helps because sort is not a once-through filter.

  3. The simplicity of "Python to sort" processing (instead of "Python to awk to sort") prevents the exact kind of questions being asked here.

  4. Python -- while wordier than awk -- is also explicit where awk has certain implicit rules that are opaque to newbies, and confusing to non-specialists.

  5. Awk (like the shell script itself) adds Yet Another Programming language. If all of this can be done in one language (Python), eliminating the shell and the awk programming eliminates two programming languages, allowing someone to focus on the value-producing parts of the task.

Bottom line: awk can't add significant value. In this case, awk is a net cost; it added enough complexity that it was necessary to ask this question. Removing awk will be a net gain.

Sidebar Why building a pipeline (a | b) is so hard.

When the shell is confronted with a | b it has to do the following.

  1. Fork a child process of the original shell. This will eventually become b.

  2. Build an os pipe. (not a Python subprocess.PIPE) but call os.pipe() which returns two new file descriptors that are connected via common buffer. At this point the process has stdin, stdout, stderr from its parent, plus a file that will be "a's stdout" and "b's stdin".

  3. Fork a child. The child replaces its stdout with the new a's stdout. Exec the a process.

  4. The b child closes replaces its stdin with the new b's stdin. Exec the b process.

  5. The b child waits for a to complete.

  6. The parent is waiting for b to complete.

I think that the above can be used recursively to spawn a | b | c, but you have to implicitly parenthesize long pipelines, treating them as if they're a | (b | c).

Since Python has os.pipe(), os.exec() and os.fork(), and you can replace sys.stdin and sys.stdout, there's a way to do the above in pure Python. Indeed, you may be able to work out some shortcuts using os.pipe() and subprocess.Popen.

However, it's easier to delegate that operation to the shell.

Python subprocesses with several pipes

You can't send the same pipe to two different processes. Or, rather, if you do, they end up accessing the same pipe, meaning if one process reads something, it's no longer available to the other one.

What you need to do is "tee" the data in some way.


If you don't need to stream the data as they come in, you can read all the output from p1, then send it as input to both p2 and p3. This is easy:

output = check_output(cmd1)
p2 = Popen(cmd2, stdin=PIPE)
p2.communicate(output)
p3 = Popen(cmd3, stdin=PIPE)
p3.communicate(output)

If you just need p2 and p3 to run in parallel, you can just run them each in a thread.

But if you actually need real-time streaming, you have to connect things up more carefully. If you can be sure that p2 and p3 will always consume their input, without blocking, faster than p1 can supply it, you can do this without threads (just loop on p1.stdout.read()), but otherwise, you'll need an output thread for each consumer process, and a Queue or some other way to pass the data around. See the source code to communicate for more ideas on how to synchronize the separate threads.

Sending multiple piped commands via subprocess with explicit quotations

This program appears to do what you want. Each of the processes must be run separately. As you build them, the output from one gets piped out to the input of the next. The files are handled independently and used at the beginning and ending of the process.

#! /usr/bin/env python3
import subprocess

def main():
with open('raw.txt', 'r') as stdin, open('clean.txt', 'w') as stdout:
step_1 = subprocess.Popen(
('tr', '-c', '[:alpha:]', ' '),
stdin=stdin,
stdout=subprocess.PIPE
)
step_2 = subprocess.Popen(
('sed', '-E', 's/ +/ /g'),
stdin=step_1.stdout,
stdout=subprocess.PIPE
)
step_3 = subprocess.Popen(
('tr', '[:upper:]', '[:lower:]'),
stdin=step_2.stdout,
stdout=stdout
)
step_3.wait()

if __name__ == '__main__':
main()

piping together several subprocesses

Maybe this can help:

import sys
import tempfile
from subprocess import Popen, PIPE

cmd = [sys.executable, '-c', 'print raw_input()']

# Using a temp file to give input data to the subprocess instead of stdin.write to avoid deadlocks.
with tempfile.TemporaryFile() as f:
f.write('foobar')
f.seek(0) # Return at the start of the file so that the subprocess p1 can read what we wrote.
p1 = Popen(cmd, stdin=f, stdout=PIPE)

p2 = Popen(cmd, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd, stdin=p2.stdout, stdout=PIPE)

# No order needed.
p1.stdout.close()
p2.stdout.close()

# Using communicate() instead of stdout.read to avoid deadlocks.
print p3.communicate()[0]

Output:

$ python test.py
foobar

Hope this can be hepfull.

IPython - pipe multiple subprocesses and show result of final one to stdout

The solution I found is to close the stdin of the process:

p1 = subprocess.Popen("rev", stdin=subprocess.PIPE)
p2 = subprocess.Popen("rev", stdout=p1.stdin, stdin=subprocess.PIPE)
subprocess.Popen("my_executable", stdout=p2.stdin)
p2.stdin.close()

After closing the stdin p1 will output its stdout.

Subprocess PIPE stdout to two different processes

The way that .read() generally works, in most cases that I'm aware of, in order to use it a second time, you'd have to use .seek() to rewind the read head back to where it was before.

see:

  • Why can't I call read() twice on an open file?

What you can do is use communicate and manually pass in the stdout data ( reads once, pass into both):

out, err = p2.communicate() # out is None, since you don't 

p2_output = ''.join(list(out))

p3 = Popen([...], stdin=PIPE, ...)
p4 = Popen([...], stdin=PIPE, ...)

stdout_data3, err = p3.communicate(input=p2_output)
stdout_data4, err = p4.communicate(input=p2_output)

Also note, this might change the way polling needs to be done, in comparison to what you currently have.

related:

  • How do I write to a Python subprocess' stdin?
  • How to read output from subprocess Popen correctly?


Related Topics



Leave a reply



Submit