Python threading with queue: how to avoid to use join?
This is likely because your Barra
does not release the global interpreter lock (GIL) when Barra.ricevi
. You may want to check this though.
The GIL ensures that only one thread can run at any one time (limiting the usefulness of threads in a multi-processor system). The GIL switches threads every 100 "ticks" -- a tick loosely mapping to bytecode instructions. See here for more details.
In your producer thread, not much happens outside of the C-library call. This means the producer thread will get to call Barra.ricevi
a great many times before the GIL switches to another thread.
Solutions to this are to, in terms of increasing complexity:
- Call
time.sleep(0)
after adding an item to the queue. This yields the thread so that another thread can run. - Use
sys.setcheckinterval()
to lower the amount of "ticks" executed before switching threads. This is will come at the cost of making the program much more computationally expensive. - Use
multiprocessing
rather thanthreading
. This includes usingmultiprocessing.Queue
instead ofQueue.Queue
. - Modify
Barra
so that it does release the GIL when its functions are called.
Example using multiprocessing
. Be aware that when using multiprocessing, your processes no longer have an implied shared state. You will need to have a look at multiprocessing to see how to pass information between processes.
import Barra
import multiprocessing
def threadCAN(posQu):
while True:
canMsg = Barra.ricevi("can0")
if canMsg[0] == 'ERR':
print(canMsg)
else:
print("Enqueued message", canMsg)
posQu.put(canMsg)
if __name__ == "__main__":
posQu = multiprocessing.Queue(maxsize=0)
procCan = multiprocessing.Process(target=threadCAN, args=(posQu,))
procCan.daemon = True
procCan.start()
while True:
posMsg = posQu.get()
print("Messagge from the queue", posMsg)
What does a call on queue.join() in main thread do to non-main threads?
The short answer to your main question is nothing.
For a longer answer, here are two concurrency graphs, one without wait:
And one with:
As you can see, at the beginning both the two dryer threads are in a lock, which is, as you correctly understood, is get()
's block. Now, in the first case the main thread finishes after finishing the washer function. When adding the dish_queue.join()
the main thread waits for the dish_queue to end all the tasks. So when you say that join()
unblocks the main thread, it means that it removes it's own block. As you can notice, the other threads are totally unaffected by it and remain blocked.
As for what is block, it's when a thread or a process waits for input from outside the thread, or in this case, waiting for the an element in the queue. In case that you want to stop the other threads, you'll need to either add a timeout to get()
(which will throw an exception and kill the thread), or kill them after the dish_queue.join()
.
Can you join a Python queue without blocking?
The Python Queue
itself does not support this, so you could try the following
from threading import Thread
class QueueChecker(Thread):
def __init__(self, q):
Thread.__init__(self)
self.q = q
def run(self):
q.join()
q_manager_thread = QueueChecker(my_q)
q_manager_thread.start()
while q_manager_thread.is_alive():
#do other things
#when the loop exits the tasks are done
#because the thread will have returned
#from blocking on the q.join and exited
#its run method
q_manager_thread.join() #to cleanup the thread
a while
loop on the thread.is_alive()
bit might not be exactly what you want, but at least you can see how to asynchronously check on the status of the q.join
now.
why is Queue.join() necessary here?
You have a race condition in your code. Imagine that you have only one item left in the Queue
and you'd be using only two threads instead of 8. Then following sequence of events happen:
- Thread A calls
q.empty
to check if it's empty or not. Since there is one item in the queue result isFalse
and loop body is executed. - Before thread A calls
q.get
there's a context switch and thread B gets to run. - Thread B calls
q.empty
, there's still one item in the queue thus the result isFalse
and loop body is executed. - Thread B calls
q.get
without parameters and it immediately returns with the last item from the queue. Then thread B processes the item and exits sinceq.empty
returnsTrue
. - Thread A gets to run. Since it already called
q.empty
in step 1 it will callq.get
next but this will block forever thus your program won't terminate.
You can simulate the above behavior by importing time
and changing the loop a bit:
while not q.empty():
time.sleep(0.1) # Force context switch
x = q.get()
Note that behavior is the same no matter if task_done
is called or not.
So why did adding task_done
help? By default Python 2 will do context switch every 100 interpreter instructions so adding code might have changed the place where context switch occurs. See another question and linked PDF for better explanation. On my machine the program didn't hang no matter if task_done
was there or not so this is just a speculation what caused it to happen for you.
If you want to fix the behavior you could just have infinite loop and pass parameter to get
instructing it to not block. This causes get
to eventually throw Queue.Empty
exception that you can catch and then break the loop:
from Queue import Queue, Empty
def func():
global q, count
while True:
try:
x = q.get(False)
except Empty:
break
io_process(x)
if lock.acquire():
shared_resource_process(x)
print '%s is processing %r' %(threading.currentThread().getName(), x)
count += 1
lock.release()
Related Topics
How to Install Writable Shared and User Specific Data Files with Setuptools
How to Redirect the Stdout into Some Sort of String Buffer
Remove Duplicate Dict in List in Python
How to Profile Memory Usage in Python
What's the Difference Between a Python Module and a Python Package
How to Make Firefox Headless Programmatically in Selenium with Python
What Is the Use of "Assert" in Python
Using Python Subprocess.Call() to Launch an Ncurses Process
Unix Socket Credential Passing in Python
How to Upload File with Python Requests
How to Round to 2 Decimals with Python
Beyond Top Level Package Error in Relative Import
Class Method Differences in Python: Bound, Unbound and Static
Matplotlib Scatterplot; Color as a Function of a Third Variable