Sharing Stdout Among Multiple Threads/Processes

Sharing stdout among multiple threads/processes

Sorry for answering myself...

The definite solution was to use the GNU parallel utility.

It came to replace the well known xargs utility, but runs the commands in parallel, separating the output into groups.

So I just left my simple one-process, one-thread utility as-is and piped its call through the parallel like that:

generate-argument-list | parallel < options > my-utility

This, depending on parallel's options can produce nicely grouped outputs for multiple calls of my-utility

python multiple threads redirecting stdout

You need to create a thread function of your own


def stream_runner(stream,id):
# open a stream-specific log file to write to
with open(f'stream_{id}.log','wt') as f:
# block until ffmpeg is done
sp.run(stream.compile(),stderr=f)

for i, station in enumerate(csv.DictReader(data)):
stream = run(station['name'], station['mount'], station['url'])
thread = Thread(target=stream_runner,args=(stream,i))
thread.start()

Something like this should work.

Safely running code in a process, redirect stdout in multithreading.Process

Communicating with running process is not straightforward in Python. For some reason you can only do it once in subprocess life cycle. From my experience, it is best to run a thread that starts a process and after timeout gets its output and terminates the subprocess.

Something like:

def subprocess_with_timeout(cmd, timeout_sec, stdin_data=None):
"""Execute `cmd` in a subprocess and enforce timeout `timeout_sec` seconds.

Send `stdin_data` to the subprocess.

Return subprocess exit code and outputs on natural completion of the subprocess.
Raise an exception if timeout expires before subprocess completes."""
proc = os.subprocess.Popen(cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
timer = threading.Timer(timeout_sec, proc.kill)
# this will terminate subprocess after timeout
timer.start()

# you will be blocked here until process terminates (by itself or by timeout death switch)
stdoutdata, stderrdata = proc.communicate(stdin_data)

if timer.is_alive():
# Process completed naturally - cancel timer and return exit code
timer.cancel()
return proc.returncode, stdoutdata, stderrdata
# Process killed by timer - raise exception
raise TimeoutError('Process #%d killed after %f seconds' % (proc.pid, timeout_sec))

So, run a threaded executioner that calls for subprocess_with_timeout. It should handle the inputs and save the results.

Another idea is using a webserver to do the IPC. See this link

stdout order with multiple threads

Nope.

Even you are running two threads asynchronously (I don't know if you are here), inside each thread the commands are executed in order.

So, your thread() would print "tread finished processing" first, then put data in result queue. Only then will your main() get the message and print the "main got result".

In multi thread application how can I redirect stderr & stdout in separate file as per thread?

If you really must do this...

First you need to create 2 pthread_key_ts, one for stdout and one for stderr. These can be created using pthread_key_create, and they must be accessable from all threads. Let's call them stdout_key and stderr_key.

When a thread is being created:

FILE *err = ..., *out = ...;
pthread_setspecific(stdout_key, out);
pthread_setspecific(stderr_key, err);

and then in your header file:

#define stdout (FILE*)pthread_getspecific(stdout_key)
#define stderr (FILE*)pthread_getspecific(stderr_key)
#define printf(...) fprintf(stdout, ##__VA_ARGS__)

then just use:

fprintf(stderr, "hello\n");
fprintf(stdout, "hello\n");
printf("hello\n");

I don't recommend this approach though.

Can I redirect stdout for two sub processes?

Think of a file descriptor as a ref-counted/smart indirect pointer to a kernel-maintained file object.
dup (dup2, dup3) duplicates the smart "pointer" by incrementing the kernel-maintained file object's refcount.
close decrements the refcount and destructs the kernel-maintained file object if the refcount becomes 0.

These file objects may be shared by multiple processes, which usually happens when a process forks (a fork also increments the refcounts of the file objects pointed to by the inherited filedescriptors (and so does sending a file descriptor via a UNIX socket)).

(Keeping this model in mind is particularly important when you deal with pipe file descriptors duplicated by forking, because a pipe's read end will only get an EOF when all of its write ends (in all processes that have a fd referring to that write end) have been closed.)



Related Topics



Leave a reply



Submit