How to Use Subprocess.Popen to Connect Multiple Processes by Pipes

How do I use subprocess.Popen to connect multiple processes by pipes?

You'd be a little happier with the following.

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )

Delegate part of the work to the shell. Let it connect two processes with a pipeline.

You'd be a lot happier rewriting 'script.awk' into Python, eliminating awk and the pipeline.

Edit. Some of the reasons for suggesting that awk isn't helping.

[There are too many reasons to respond via comments.]

  1. Awk is adding a step of no significant value. There's nothing unique about awk's processing that Python doesn't handle.

  2. The pipelining from awk to sort, for large sets of data, may improve elapsed processing time. For short sets of data, it has no significant benefit. A quick measurement of awk >file ; sort file and awk | sort will reveal of concurrency helps. With sort, it rarely helps because sort is not a once-through filter.

  3. The simplicity of "Python to sort" processing (instead of "Python to awk to sort") prevents the exact kind of questions being asked here.

  4. Python -- while wordier than awk -- is also explicit where awk has certain implicit rules that are opaque to newbies, and confusing to non-specialists.

  5. Awk (like the shell script itself) adds Yet Another Programming language. If all of this can be done in one language (Python), eliminating the shell and the awk programming eliminates two programming languages, allowing someone to focus on the value-producing parts of the task.

Bottom line: awk can't add significant value. In this case, awk is a net cost; it added enough complexity that it was necessary to ask this question. Removing awk will be a net gain.

Sidebar Why building a pipeline (a | b) is so hard.

When the shell is confronted with a | b it has to do the following.

  1. Fork a child process of the original shell. This will eventually become b.

  2. Build an os pipe. (not a Python subprocess.PIPE) but call os.pipe() which returns two new file descriptors that are connected via common buffer. At this point the process has stdin, stdout, stderr from its parent, plus a file that will be "a's stdout" and "b's stdin".

  3. Fork a child. The child replaces its stdout with the new a's stdout. Exec the a process.

  4. The b child closes replaces its stdin with the new b's stdin. Exec the b process.

  5. The b child waits for a to complete.

  6. The parent is waiting for b to complete.

I think that the above can be used recursively to spawn a | b | c, but you have to implicitly parenthesize long pipelines, treating them as if they're a | (b | c).

Since Python has os.pipe(), os.exec() and os.fork(), and you can replace sys.stdin and sys.stdout, there's a way to do the above in pure Python. Indeed, you may be able to work out some shortcuts using os.pipe() and subprocess.Popen.

However, it's easier to delegate that operation to the shell.

Python subprocess: how to use pipes thrice?

Just add a third command following the same example:

p1 = subprocess.Popen(['convert', fileIn, 'bmp:-'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['mkbitmap', '-f', '2', '-s', '2', '-t', '0.48'],
stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
p3 = subprocess.Popen(['potrace', '-t' , '5', '-s' , '-o', fileOut],
stdin=p2.stdout,stdout=subprocess.PIPE)
p2.stdout.close()

output = p3.communicate()[0]

How to use `subprocess` command with pipes

To use a pipe with the subprocess module, you have to pass shell=True.

However, this isn't really advisable for various reasons, not least of which is security. Instead, create the ps and grep processes separately, and pipe the output from one into the other, like so:

ps = subprocess.Popen(('ps', '-A'), stdout=subprocess.PIPE)
output = subprocess.check_output(('grep', 'process_name'), stdin=ps.stdout)
ps.wait()

In your particular case, however, the simple solution is to call subprocess.check_output(('ps', '-A')) and then str.find on the output.

How do subprocess.Popen pipes work in Python?

Q1. In either case of subprocess.PIPE or just PIPE, they are referencing the same symbol, which is PIPE from the subprocess module. The following are identical:

# Version 1
import subprocess
proc = subprocess.Popen(['ls'], stdout=subprocess.PIPE)
output = proc.communicate()[0]

# Version 2
from subprocess import PIPE, Popen
proc = Popen(['ls'], stdout=PIPE)
output = proc.communicate()[0]

Q2. What communicate() is actually doing is sending input into p1's STDIN stream. Although the subprocesses are indeed alive and running when the Popen constructors are called, in your particular case the convert utility seems like it won't do anything until it actually receives content via STDIN. If it were a less interactive command (like ls for example) then it wouldn't wait until communicate() to do anything.

UPDATED CONTENT
In your case, instead of using communicate to send input into p1, try instead the following:

p1.stdin.write(img.content)
p1.stdin.close()
p1.stdout.close() # Protect against the busted pipe condition when p2 finishes before p1
output = p2.communicate()[0]
print output

See if you have better luck with that.

Multiple pipes in subprocess

To emulate the bash process substitution:

#!/usr/bin/env python
from subprocess import check_call

check_call('someprogram <(someprocess) <(anotherprocess)',
shell=True, executable='/bin/bash')

In Python, you could use named pipes:

#!/usr/bin/env python
from subprocess import Popen

with named_pipes(n=2) as paths:
someprogram = Popen(['someprogram'] + paths)
processes = []
for path, command in zip(paths, ['someprocess', 'anotherprocess']):
with open(path, 'wb', 0) as pipe:
processes.append(Popen(command, stdout=pipe, close_fds=True))
for p in [someprogram] + processes:
p.wait()

where named_pipes(n) is:

import os
import shutil
import tempfile
from contextlib import contextmanager

@contextmanager
def named_pipes(n=1):
dirname = tempfile.mkdtemp()
try:
paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)]
for path in paths:
os.mkfifo(path)
yield paths
finally:
shutil.rmtree(dirname)

Another and more preferable way (no need to create a named entry on disk) to implement the bash process substitution is to use /dev/fd/N filenames (if they are available) as suggested by @Dunes. On FreeBSD, fdescfs(5) (/dev/fd/#) creates entries for all file descriptors opened by the process. To test availability, run:

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available

If it fails; try to symlink /dev/fd to proc(5) as it is done on some Linuxes:

$ ln -s /proc/self/fd /dev/fd

Here's /dev/fd-based implementation of someprogram <(someprocess) <(anotherprocess) bash command:

#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE

def kill(process):
if process.poll() is None: # still running
process.kill()

with ExitStack() as stack: # for proper cleanup
processes = []
for command in [['someprocess'], ['anotherprocess']]: # start child processes
processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
stack.callback(kill, processes[-1]) # kill on someprogram exit

fds = [p.stdout.fileno() for p in processes]
someprogram = stack.enter_context(
Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds))
for p in processes: # close pipes in the parent
p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn't go unnoticed
raise CalledProcessError(someprogram.returncode, someprogram.args)

Note: on my Ubuntu machine, the subprocess code works only in Python 3.4+, despite pass_fds being available since Python 3.2.

Python process pipes with subprocess.Popen

To run the grep 3 command you need the output from the previous command, so there is no way to run this successfully in a single command with subprocess.Popen.

If you always want to run grep 3 for all the files, you could just join the results of all the gunzip -c file_x.gz and then run the grep command only once on the entire list.

subprocess.Popen('gunzip -c file_1.gz'.split(), stdout=subprocess.PIPE)
subprocess.Popen('gunzip -c file_2.gz'.split(), stdout=subprocess.PIPE)
...
grep = subprocess.Popen('grep 3'.split(), stdin=all_gunzip_stdout, stdout=subprocess.PIPE)

python subprocess.call and pipes

This is a class that will run a command with an arbitrary number of pipes:

pipeline.py

import shlex
import subprocess

class Pipeline(object):
def __init__(self, command):
self.command = command
self.command_list = self.command.split('|')
self.output = None
self.errors = None
self.status = None
self.result = None

def run(self):
process_list = list()
previous_process = None
for command in self.command_list:
args = shlex.split(command)
if previous_process is None:
process = subprocess.Popen(args, stdout=subprocess.PIPE)
else:
process = subprocess.Popen(args,
stdin=previous_process.stdout,
stdout=subprocess.PIPE)
process_list.append(process)
previous_process = process
last_process = process_list[-1]
self.output, self.errors = last_process.communicate()
self.status = last_process.returncode
self.result = (0 == self.status)
return self.result

This example shows how to use the class:

harness.py

from pipeline import Pipeline

if __name__ == '__main__':
command = '|'.join([
"sort %s",
"uniq",
"sed -e 's/bigstring of word/ smaller /'",
"column -t -s '=>'"
])
command = command % 'sample.txt'
pipeline = Pipeline(command)
if not pipeline.run():
print "ERROR: Pipeline failed"
else:
print pipeline.output

I created this sample file to for testing:

sample.txt

word1>word2=word3
list1>list2=list3
a>bigstring of word=b
blah1>blah2=blah3

Output

a       smaller   b
blah1 blah2 blah3
list1 list2 list3
word1 word2 word3


Related Topics



Leave a reply



Submit