Python Threading with Queue: How to Avoid to Use Join

Python threading with queue: how to avoid to use join?

This is likely because your Barra does not release the global interpreter lock (GIL) when Barra.ricevi. You may want to check this though.

The GIL ensures that only one thread can run at any one time (limiting the usefulness of threads in a multi-processor system). The GIL switches threads every 100 "ticks" -- a tick loosely mapping to bytecode instructions. See here for more details.

In your producer thread, not much happens outside of the C-library call. This means the producer thread will get to call Barra.ricevi a great many times before the GIL switches to another thread.

Solutions to this are to, in terms of increasing complexity:

  • Call time.sleep(0) after adding an item to the queue. This yields the thread so that another thread can run.
  • Use sys.setcheckinterval() to lower the amount of "ticks" executed before switching threads. This is will come at the cost of making the program much more computationally expensive.
  • Use multiprocessing rather than threading. This includes using multiprocessing.Queue instead of Queue.Queue.
  • Modify Barra so that it does release the GIL when its functions are called.

Example using multiprocessing. Be aware that when using multiprocessing, your processes no longer have an implied shared state. You will need to have a look at multiprocessing to see how to pass information between processes.

import Barra  
import multiprocessing

def threadCAN(posQu):
while True:
canMsg = Barra.ricevi("can0")
if canMsg[0] == 'ERR':
print(canMsg)
else:
print("Enqueued message", canMsg)
posQu.put(canMsg)

if __name__ == "__main__":
posQu = multiprocessing.Queue(maxsize=0)
procCan = multiprocessing.Process(target=threadCAN, args=(posQu,))
procCan.daemon = True
procCan.start()

while True:
posMsg = posQu.get()
print("Messagge from the queue", posMsg)

What does a call on queue.join() in main thread do to non-main threads?

The short answer to your main question is nothing.

For a longer answer, here are two concurrency graphs, one without wait:
Sample Image

And one with:
Sample Image

As you can see, at the beginning both the two dryer threads are in a lock, which is, as you correctly understood, is get()'s block. Now, in the first case the main thread finishes after finishing the washer function. When adding the dish_queue.join() the main thread waits for the dish_queue to end all the tasks. So when you say that join() unblocks the main thread, it means that it removes it's own block. As you can notice, the other threads are totally unaffected by it and remain blocked.

As for what is block, it's when a thread or a process waits for input from outside the thread, or in this case, waiting for the an element in the queue. In case that you want to stop the other threads, you'll need to either add a timeout to get() (which will throw an exception and kill the thread), or kill them after the dish_queue.join().

Can you join a Python queue without blocking?

The Python Queue itself does not support this, so you could try the following

from threading import Thread

class QueueChecker(Thread):
def __init__(self, q):
Thread.__init__(self)
self.q = q

def run(self):
q.join()

q_manager_thread = QueueChecker(my_q)
q_manager_thread.start()

while q_manager_thread.is_alive():
#do other things

#when the loop exits the tasks are done
#because the thread will have returned
#from blocking on the q.join and exited
#its run method

q_manager_thread.join() #to cleanup the thread

a while loop on the thread.is_alive() bit might not be exactly what you want, but at least you can see how to asynchronously check on the status of the q.join now.

why is Queue.join() necessary here?

You have a race condition in your code. Imagine that you have only one item left in the Queue and you'd be using only two threads instead of 8. Then following sequence of events happen:

  1. Thread A calls q.empty to check if it's empty or not. Since there is one item in the queue result is False and loop body is executed.
  2. Before thread A calls q.get there's a context switch and thread B gets to run.
  3. Thread B calls q.empty, there's still one item in the queue thus the result is False and loop body is executed.
  4. Thread B calls q.get without parameters and it immediately returns with the last item from the queue. Then thread B processes the item and exits since q.empty returns True.
  5. Thread A gets to run. Since it already called q.empty in step 1 it will call q.get next but this will block forever thus your program won't terminate.

You can simulate the above behavior by importing time and changing the loop a bit:

while not q.empty():
time.sleep(0.1) # Force context switch
x = q.get()

Note that behavior is the same no matter if task_done is called or not.

So why did adding task_done help? By default Python 2 will do context switch every 100 interpreter instructions so adding code might have changed the place where context switch occurs. See another question and linked PDF for better explanation. On my machine the program didn't hang no matter if task_done was there or not so this is just a speculation what caused it to happen for you.

If you want to fix the behavior you could just have infinite loop and pass parameter to get instructing it to not block. This causes get to eventually throw Queue.Empty exception that you can catch and then break the loop:

from Queue import Queue, Empty

def func():
global q, count
while True:
try:
x = q.get(False)
except Empty:
break
io_process(x)
if lock.acquire():
shared_resource_process(x)
print '%s is processing %r' %(threading.currentThread().getName(), x)
count += 1
lock.release()


Related Topics



Leave a reply



Submit