Question about zombie processess and threads
First, don't kill or cancel threads, ask them to kill themselves. If you kill a thread from outside you never know what side effects - variables, state of synchronization primitives, etc.- you leave behind. If you find it necessary for one thread to terminate another then have the problematic thread check a switch, catch a signal, whatever, and clean up its state before exiting itself.
1) If by uncancelable you mean detached, the same as a joined thread. You don't know what mess you are leaving behind if you are blindly killing it.
2) From an application level viewpoint the primary thing is that if the main thread exits() or returns() it is going to take down all other threads with it. If the main thread terminates itself with pthread_exit() the remaining threads continue on.
3) Much like a process the thread will retain some resources until it is reaped (joined) or the program ends, unless it was run as detached.
RE Note: The threads don't share a stack they each have their own. See clone() for some info on thread creation.
Zombie process even though threads are still running
This is a known issue. A fix proposed a while ago by Kaz wasn't accepted by Ulrich Drepper. His comment on this was:
I haven't looked at the patch nor tried it.
If the patch changes the behavior that the main thread, after calling
sys_exit, still react to signals sent to this thread or to the process
as a whole, then the patch is wrong. The userlevel context of the
thread is not usable anymore. It will have run all kinds of
destructors. The current behavior is AFAIK that the main thread won't
react to any signal anymore. That is absolutely required.
Read the mail chain for more discussion on this here:
http://lkml.iu.edu/hypermail/linux/kernel/0902.0/00153.html
Zombie Threads on POSIX systems
A zombie thread is a joinable thread which has terminated, but which
hasn't been joined. Normally, either a thread should be joined at some
time, or it should be detached. Otherwise, the OS maintains its state
for some possible future join, which takes resources.
Avoid zombie processes by regularly calling `join()`
As @Giannis suggested in a comment, you're reinventing a process manager from scratch. Sticking to what comes with Python, do you have some objection to using multiprocessing.Pool
? If so, what?
The usual way to do this is to pick a maximum number of worker processes you want to run simultaneously. Say,
NUM_WORKERS = 4
Then drop this in as a replacement for your receive()
function:
def receive(q: mp.Queue):
pool = mp.Pool(NUM_WORKERS)
while True:
request = q.get()
if request == "EOF":
break
pool.apply_async(worker, args=(request,))
pool.close()
pool.join()
The NUM_WORKERS
processes are created once, and reused across tasks. If for some reason you need (or want) a brand new process for each task, you only need to add maxtasksperchild=1
to the Pool
constructor.
And if for some reason you need to know when each task finishes, you could, e.g., add a callback=
argument to the apply_async()
call and write a little function that will be called when the task ends (and it will receive, as argument, whatever your worker()
function returns).
The devil's in the daemons
So it turns out your worker processes in your real app want to (for whatever reasons) create processes of their own, and processes created by Pool
can't do that. They're created as "daemon" processes. From the docs:
When a process exits, it attempts to terminate all of its daemonic child processes.
Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits.
Pretty much clear as mud ;-) Here's an elaborate way to roll your own Pool
workalike that creates non-daemon processes, but too elaborate for my tastes:
Python Process Pool non-daemonic?
Going back to your original design, which you already know works, I'd just change it to separate the logic of periodically joining worker processes from the logic of manipulating the queue. Logically, they really have nothing to do with each other. Specifically, creating a "background thread" to join makes good sense to me:
def reap(workers, quit):
from time import sleep
while not quit.is_set():
to_join = [w for w in workers if not w.is_alive()]
for p_worker in to_join:
print(f"Join {p_worker.name}")
p_worker.join()
workers.remove(p_worker)
sleep(2) # whatever you like
for p_worker in workers:
print(f"Join {p_worker.name}")
p_worker.join()
def receive(q: mp.Queue):
import threading
workers = [] # type: List[mp.Process]
quit = threading.Event()
reaper = threading.Thread(target=reap, args=(workers, quit))
reaper.start()
while True:
request = q.get()
if request == "EOF":
break
p_worker = mp.Process(target=worker, args=(request,), name=request)
p_worker.start()
workers.append(p_worker)
quit.set()
reaper.join()
I happen to know that list.append()
and list.remove()
are thread-safe in CPython, so there's no need to protect those operations with a lock. But it wouldn't hurt if you added one.
And one more to try
While processes created by Pool
are daemonic, it seems that processes created by the similar concurrent.futures.ProcessPoolExecutor
are not. So this simple variation of my first suggestion may work for you (or may not ;-) ):
NUM_WORKERS = 4
def receive(q: mp.Queue):
import concurrent.futures as cf
with cf.ProcessPoolExecutor(NUM_WORKERS) as e:
while True:
request = q.get()
if request == "EOF":
break
e.submit(worker, request)
If that does work for you, it's hard to imagine anything materially simpler.
What are Zombies and what causes them? Are there Zombie processes and Zombie objects?
Zombie processes and zombie objects are totally unrelated. Zombie processes are when a parent starts a child process and the child process ends, but the parent doesn't pick up the child's exit code. The process object has to stay around until this happens - it consumes no resources and is dead, but it still exists - hence, 'zombie'.
Zombie objects are a debugging feature of Cocoa / CoreFoundation to help you catch memory errors - normally when an object's refcount drops to zero it's freed immediately, but that makes debugging difficult. Instead, if zombie objects are enabled, the object's memory isn't instantly freed, it's just marked as a zombie, and any further attempts to use it will be logged and you can track down where in the code the object was used past its lifetime.
EXEC_BAD_ACCESS is your run-of-the-mill "You used a bad pointer" exception, like if I did:
(*(0x42)) = 5;
C - threads and processes - prevent zombies
First, switch from calling wait
to calling waitpid
. Otherwise, if you have more than one thread waiting, they'll steal each other's notifications.
Second, break the call to waitpid
into its own function that takes the PID to wait for as a parameter. Cast it through void *
, since that's what's used for thread parameters.
Third, change the call to the function to a call to pthread_create
, casting the PID to wait for to a void *
to pass to the newly-created thread.
Lastly, have the thread detach itself, since there won't be anything waiting for the thread to terminate.
Zombie process is generated when call system() in Perl threads
Threads are listed in the perldoc as "discouraged". Personally, I find they work fine, they're just somewhat counter intuitive - they aren't lightweight constructs like might be assumed (based on other threading models).
I will note - the generic solution to self-reaping zombies is to set $SIG{'CHLD'}
e.g.: http://perldoc.perl.org/perlipc.html but that's probably not a good idea if you're capturing the return code. You could probably do an open
and a waitpid
instead though.
So I wouldn't normally suggest their use, unless you've a scenario where you need to do a lot of inter-thread communication. Parallel::ForkManager
is generally much more efficient.
If you do have to use them - I wouldn't do what you're doing, and spawning a thread per 'job' and instead use a worker threads model with Thread::Queue
.
I can't say for certain, but I suspect one of your problems is this line:
$cmd = "$HOME/worker.sh "."$arg";
Because perl will be interpolating $HOME
- and you don't define it, therefore it's null.
You really should be turning on strict
and warnings
and cleaning up any errors as a result - your code has quite a few.
But that said - unless I'm missing something your code is much more complicated than it needs to be - it looks like all you're doing here is running parallel ssh commands.
So I'd suggest what you'd be better off with is something like this:
#!/usr/bin/env perl
use strict;
use warnings;
use threads;
use Thread::Queue;
my @servers = qw/hostA hostB/;
my $cmd = '$HOME/worker.sh --node';
my $threadcount = 2;
my $hostq = Thread::Queue->new();
my $errorq = Thread::Queue->new();
sub worker {
while ( my $hostname = $hostq->dequeue ) {
my $output =
qx( ssh -o StrictHostKeyChecking=no $hostname \"source /etc/profile; $cmd\" );
if ($?) {
$errorq->enqueue("$hostname: $output");
}
}
}
$hostq->enqueue(@servers);
for ( 1 .. $threadcount ) {
my $thr = threads->create( \&worker );
}
$hostq->end();
foreach my $thr ( threads->list ) {
$thr->join;
}
$errorq->end();
while ( my $error = $errorq->dequeue ) {
print "ERROR: $error\n";
}
Alternatively, with Parallel::ForkManager
:
#!/usr/bin/env perl
use strict;
use warnings;
my @servers = qw/hostA hostB/;
my $cmd = '$HOME/worker.sh --node';
my $manager = Parallel::ForkManager->new(5); #fork limit.
foreach my $hostname (@servers) {
$manager->start and next;
my $output =
qx( ssh -o StrictHostKeyChecking=no $hostname \"source /etc/profile; $cmd\" );
if ($?) {
print "ERROR: $hostname $output\n";
}
$manager->finish;
}
$manager->wait_all_children();
Related Topics
How to Disable Tcp Slow Start in Linux
Replace Forward Slash with Double Backslash Enclosed in Double Quotes
Amazon Linux Ami Ec2 Gui/Remote Desktop
Check If a Git Branch Is Ahead of Another Using a Script
How to Create a File Listener in Linux
Swift on Linux: Make Very First Step Work
How I Install Specific Fonts on My Aws Ec2 Instance
Chmod a Freshly Mounted External Drive to Set Up Writing Access
How Does Copy-On-Write in Fork() Handle Multiple Fork
Does There Exist Kernel Stack for Each Process
How to Store Your Github Https Password on Linux in a Terminal Keychain
How to Check If a String Contains a Special Character (!@#$%^&*()_+)
How to Manage Permissions When Developing in a Docker Container
Print Stdout/Stderr and Write Them to a File in Bash
Find Based Filename Autocomplete in Bash Script
Libv4L2: Error Turning on Stream: No Space Left on Device
Would It Be Possible to Read Out Physical Keyboard Strokes in Node.Js