Link Several Popen Commands with Pipes

How do I use subprocess.Popen to connect multiple processes by pipes?

You'd be a little happier with the following.

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )

Delegate part of the work to the shell. Let it connect two processes with a pipeline.

You'd be a lot happier rewriting 'script.awk' into Python, eliminating awk and the pipeline.

Edit. Some of the reasons for suggesting that awk isn't helping.

[There are too many reasons to respond via comments.]

  1. Awk is adding a step of no significant value. There's nothing unique about awk's processing that Python doesn't handle.

  2. The pipelining from awk to sort, for large sets of data, may improve elapsed processing time. For short sets of data, it has no significant benefit. A quick measurement of awk >file ; sort file and awk | sort will reveal of concurrency helps. With sort, it rarely helps because sort is not a once-through filter.

  3. The simplicity of "Python to sort" processing (instead of "Python to awk to sort") prevents the exact kind of questions being asked here.

  4. Python -- while wordier than awk -- is also explicit where awk has certain implicit rules that are opaque to newbies, and confusing to non-specialists.

  5. Awk (like the shell script itself) adds Yet Another Programming language. If all of this can be done in one language (Python), eliminating the shell and the awk programming eliminates two programming languages, allowing someone to focus on the value-producing parts of the task.

Bottom line: awk can't add significant value. In this case, awk is a net cost; it added enough complexity that it was necessary to ask this question. Removing awk will be a net gain.

Sidebar Why building a pipeline (a | b) is so hard.

When the shell is confronted with a | b it has to do the following.

  1. Fork a child process of the original shell. This will eventually become b.

  2. Build an os pipe. (not a Python subprocess.PIPE) but call os.pipe() which returns two new file descriptors that are connected via common buffer. At this point the process has stdin, stdout, stderr from its parent, plus a file that will be "a's stdout" and "b's stdin".

  3. Fork a child. The child replaces its stdout with the new a's stdout. Exec the a process.

  4. The b child closes replaces its stdin with the new b's stdin. Exec the b process.

  5. The b child waits for a to complete.

  6. The parent is waiting for b to complete.

I think that the above can be used recursively to spawn a | b | c, but you have to implicitly parenthesize long pipelines, treating them as if they're a | (b | c).

Since Python has os.pipe(), os.exec() and os.fork(), and you can replace sys.stdin and sys.stdout, there's a way to do the above in pure Python. Indeed, you may be able to work out some shortcuts using os.pipe() and subprocess.Popen.

However, it's easier to delegate that operation to the shell.

Sending multiple piped commands via subprocess with explicit quotations

This program appears to do what you want. Each of the processes must be run separately. As you build them, the output from one gets piped out to the input of the next. The files are handled independently and used at the beginning and ending of the process.

#! /usr/bin/env python3
import subprocess

def main():
with open('raw.txt', 'r') as stdin, open('clean.txt', 'w') as stdout:
step_1 = subprocess.Popen(
('tr', '-c', '[:alpha:]', ' '),
stdin=stdin,
stdout=subprocess.PIPE
)
step_2 = subprocess.Popen(
('sed', '-E', 's/ +/ /g'),
stdin=step_1.stdout,
stdout=subprocess.PIPE
)
step_3 = subprocess.Popen(
('tr', '[:upper:]', '[:lower:]'),
stdin=step_2.stdout,
stdout=stdout
)
step_3.wait()

if __name__ == '__main__':
main()

Multiple pipes in subprocess

To emulate the bash process substitution:

#!/usr/bin/env python
from subprocess import check_call

check_call('someprogram <(someprocess) <(anotherprocess)',
shell=True, executable='/bin/bash')

In Python, you could use named pipes:

#!/usr/bin/env python
from subprocess import Popen

with named_pipes(n=2) as paths:
someprogram = Popen(['someprogram'] + paths)
processes = []
for path, command in zip(paths, ['someprocess', 'anotherprocess']):
with open(path, 'wb', 0) as pipe:
processes.append(Popen(command, stdout=pipe, close_fds=True))
for p in [someprogram] + processes:
p.wait()

where named_pipes(n) is:

import os
import shutil
import tempfile
from contextlib import contextmanager

@contextmanager
def named_pipes(n=1):
dirname = tempfile.mkdtemp()
try:
paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)]
for path in paths:
os.mkfifo(path)
yield paths
finally:
shutil.rmtree(dirname)

Another and more preferable way (no need to create a named entry on disk) to implement the bash process substitution is to use /dev/fd/N filenames (if they are available) as suggested by @Dunes. On FreeBSD, fdescfs(5) (/dev/fd/#) creates entries for all file descriptors opened by the process. To test availability, run:

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available

If it fails; try to symlink /dev/fd to proc(5) as it is done on some Linuxes:

$ ln -s /proc/self/fd /dev/fd

Here's /dev/fd-based implementation of someprogram <(someprocess) <(anotherprocess) bash command:

#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE

def kill(process):
if process.poll() is None: # still running
process.kill()

with ExitStack() as stack: # for proper cleanup
processes = []
for command in [['someprocess'], ['anotherprocess']]: # start child processes
processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
stack.callback(kill, processes[-1]) # kill on someprogram exit

fds = [p.stdout.fileno() for p in processes]
someprogram = stack.enter_context(
Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds))
for p in processes: # close pipes in the parent
p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn't go unnoticed
raise CalledProcessError(someprogram.returncode, someprogram.args)

Note: on my Ubuntu machine, the subprocess code works only in Python 3.4+, despite pass_fds being available since Python 3.2.

Python subprocess: how to use pipes thrice?

Just add a third command following the same example:

p1 = subprocess.Popen(['convert', fileIn, 'bmp:-'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['mkbitmap', '-f', '2', '-s', '2', '-t', '0.48'],
stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
p3 = subprocess.Popen(['potrace', '-t' , '5', '-s' , '-o', fileOut],
stdin=p2.stdout,stdout=subprocess.PIPE)
p2.stdout.close()

output = p3.communicate()[0]

How to use `subprocess` command with pipes

To use a pipe with the subprocess module, you have to pass shell=True.

However, this isn't really advisable for various reasons, not least of which is security. Instead, create the ps and grep processes separately, and pipe the output from one into the other, like so:

ps = subprocess.Popen(('ps', '-A'), stdout=subprocess.PIPE)
output = subprocess.check_output(('grep', 'process_name'), stdin=ps.stdout)
ps.wait()

In your particular case, however, the simple solution is to call subprocess.check_output(('ps', '-A')) and then str.find on the output.

running multiple bash commands with subprocess

You have to use shell=True in subprocess and no shlex.split:

import subprocess

command = "echo a; echo b"

ret = subprocess.run(command, capture_output=True, shell=True)

# before Python 3.7:
# ret = subprocess.run(command, stdout=subprocess.PIPE, shell=True)

print(ret.stdout.decode())

returns:

a
b


Related Topics



Leave a reply



Submit