Creating Threads in Python

Creating Threads in python

You don't need to use a subclass of Thread to make this work - take a look at the simple example I'm posting below to see how:

from threading import Thread
from time import sleep

def threaded_function(arg):
for i in range(arg):
print("running")
sleep(1)

if __name__ == "__main__":
thread = Thread(target = threaded_function, args = (10, ))
thread.start()
thread.join()
print("thread finished...exiting")

Here I show how to use the threading module to create a thread which invokes a normal function as its target. You can see how I can pass whatever arguments I need to it in the thread constructor.

Can threads create sub-threads in Python?

So regarding your questions:

  • Q1: It is not a problem to start "subthreads" from a thread
  • Q2: It is actually an interesting question, my instinct would say "no", but getting a proof sounds better to me

So I created a quick test as below (I would use a gist but I can't access such things from where I am):

from threading import Thread
import time

def sub_worker(id):
print("SubWorker started from thread", id)
while True:
print("Subworking...")
time.sleep(5)
def worker(id):
print("Worker started from thread", id)
count = 1
while count < 5:
print("Working...")
tmp_thread = Thread(target=sub_worker, args=[count])
tmp_thread.start()
count +=1
time.sleep(1)
raise EnvironmentError("Tired of working")

main = Thread(target=worker, args=[0])

main.start()

Which gives us the output (as expected an error in the parent thread does not stop the "children"):

Worker started from thread 0
Working...
SubWorker started from thread 1
Subworking...
Working...
SubWorker started from thread 2
Subworking...
Working...
SubWorker started from thread 3
Subworking...
Working...
SubWorker started from thread 4
Subworking...
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Temp\tt\Tools\Anaconda3.4.3.1\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "C:\Temp\tt\Tools\Anaconda3.4.3.1\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "C:/Temp/tt/Tools/PyCharmWorkspace/xml_parse/test.py", line 18, in worker
raise EnvironmentError("Tired of working")
OSError: Tired of working

Subworking...
Subworking...
Subworking...
Subworking...
Subworking...
Subworking...
Subworking...

I think that htop shows this hierarchy might be due to the fact that threads are treated as processes by the Linux kernel. And since a call to fork is made it can shows this hierarchy. With the concept of threads, I do not believe that a hierarchy makes so much sense as each of them will share the same resources (memory, file descriptors ...etc)

Creating Threads within a Thread in Python

You got close! :-)

The problem in the latest version of the code is that, while the global results is passed to creator(), creator() never uses it: it creates its own local results list. Of course modifying the latter has no effect on the global results, so that one remains empty. So here's a variation to repair that, but also with minor local changes to make the code more "Pythonic":

import threading

def adder(x, res, i):
res[i] += x*i

def creator(a, threads, results):
for i in range(a):
results.append(0)
t = threading.Thread(target=adder, args=(a, results, i))
threads.append(t)
t.start()
for t in threads:
t.join()

threads = []
results = []

mainThread = threading.Thread(target=creator, args=(5, threads, results))
mainThread.start()
mainThread.join()
for i in range(len(results)):
print results[i]
print threads[i]

How to create and start any number of threads in python 3

Use a simple for loop on the number of threads you want. Of course you should save them somehow to probably join them in the end:

def threads(queue):
num_of_threads = 10
print('Start Downloading ...')
threads = []
for i in range(1, num_of_threads+1):
th = Thread(target=downloading, args=(queue, "Thread {}".format(i)))
threads.append(th)
th.start()
return threads

As suggested, a pool will be helpful in this use-case since it will optimize the run to your system:

from multiprocessing.dummy import Pool as ThreadPool

def downloading(img):
urllib.request.urlretrieve(img, path + '/' + img.split('/')[-1])
global count
count += 1
print('Done: ' + img.split('/')[-1])

def threads(queue):
pool = ThreadPool()
pool.map(downloading, queue)
pool.close()
pool.join()

Note that in this way you should change the downloading function to receive one argument that is a single image. The map function sends each item of an iterable (second argument) to a function (first argument). This is also why the done set is not necessary since each image will be processed exactly once.

How long does it take to create a thread in python

So there are two ways to interpret your question:

  1. Whether the existence of other threads (that have not been started) affects creation time for new threads
  2. Whether other threads running in the background (threads already started) affects creation time for new threads.

Checking the first one

In this case, you simply don't start the threads:

import threading
import time

def fun1(a,b):
c = a + b
print(c)
time.sleep(100)


times = []

for i in range(10):
start = time.time()
threading.Thread(target=fun1, args=(55,155)) # don't start
end = time.time()
times.append(end-start)

print(times)

output for 10 runs:

[4.696846008300781e-05, 2.8848648071289062e-05, 2.6941299438476562e-05, 2.5987625122070312e-05, 2.5987625122070312e-05, 2.5987625122070312e-05, 2.5987625122070312e-05, 2.5987625122070312e-05, 2.5033950805664062e-05, 2.6941299438476562e-05]

As you can see, the times are about the same (as you would expect).

Checking the second one

In this case, we want the previously created threads to keep running as we create more threads. So we give each thread a task that never finishes:

import threading
import time

def fun1(a,b):
while True:
pass # never ends


times = []

for i in range(100):
start = time.time()
threading.Thread(target=fun1, args=(55,155)).start()
end = time.time()
times.append(end-start)

print(times)

output:

Over 100 runs, the first one took 0.0003440380096435547 whereas the last one took 0.3017098903656006 so there's quite a magnitude of increase there.

Dynamically creating functions and threads in Python

If I understand your problem correctly, it should be like this:

    import threading

...
command_array = ...
number_of_commands = len(command_array)
...

def run_the_command(index):
exec command_array[index]

threads = []
for i in range(number_of_commands):
t = threading.Thread(target=run_the_command, args=(i,))
t.start()
threads.append(t)

Note that:

  1. Pass run_it_boy instead of run_it_boy(), because you don't want to call it now, but let the threading module do it.
  2. It is encouraged to use snake_case for function/method name and variable name, CamelCase is for class name.

Alternative

In my opinion it is better to use something called thread pool.

Creating threads in python-multithreading

Here's an example of a runner for a graph like yours.

The idea is that you define a function that runs each task (do_task here), and build a graph of the (immediate) dependencies each task requires. The example task_deps below mirrors your graph from above.

The run_graph function will then call do_task with each task ID; the function is supposed to do whatever it needs to compute your result (it can read the results of any previous computation if it needs to).

The run_graph function will eventually return a dict of {task_id: result}.

The code below outputs

Scheduling {1}
Scheduling {2, 3, 4}
Scheduling {5, 6, 7}
Scheduling {8, 9, 10}
Scheduling {11, 12, 13}
Scheduling {14}
Scheduling {15}

which, as supposed, corresponds exactly to the structure of your graph from top to bottom,
and

{1: 'Task 1 completed with result 42',
2: 'Task 2 completed with result 84',
3: 'Task 3 completed with result 126',
4: 'Task 4 completed with result 168',
5: 'Task 5 completed with result 210',
6: 'Task 6 completed with result 252',
7: 'Task 7 completed with result 294',
8: 'Task 8 completed with result 336',
9: 'Task 9 completed with result 378',
10: 'Task 10 completed with result 420',
11: 'Task 11 completed with result 462',
12: 'Task 12 completed with result 504',
13: 'Task 13 completed with result 546',
14: 'Task 14 completed with result 588',
15: 'Task 15 completed with result 630'}


import concurrent.futures

def do_task(task_id, results, dependencies):
# sanity check - this function could use `dependencies` and `results` too
assert all(dep in results for dep in dependencies)
return f"Task {task_id} completed with result {task_id * 42}"

def run_graph(task_dependencies, runner):
# Dict for results for each task.
results = {}
# Set of tasks yet to be completed.
todo = set(task_dependencies)

with concurrent.futures.ThreadPoolExecutor() as executor:
# While there are items in the to-do set...
while todo:
# ... figure out what we can immediately execute by
# comparing the dependency set to the result keys we already have
# (i.e. the complement of the to-do set)
next_tasks = {
task_id
for (task_id, deps) in task_dependencies.items()
if task_id in todo and set(deps) <= set(results)
}
# If there are no next tasks we could schedule, it means the dependency
# graph is incorrect (or at the very least incompleteable).
if not next_tasks:
raise RuntimeError(
f"Unable to schedule tasks, bad dependencies? Todo: {todo}"
)

print("Scheduling", next_tasks)
# Submit tasks for execution in parallel. `futures` will be a list of
# 2-tuples (task_id, future).
futures = [
(
task_id,
executor.submit(
runner, task_id, results, task_dependencies[task_id]
),
)
for task_id in next_tasks
]

# Loop over the futures, waiting for their results; when a future
# finishes, save the result value and remove that task from the
# to-do set.
for (task_id, future) in futures:
results[task_id] = future.result()
todo.remove(task_id)
# Once the while loop finishes, we have our results.
return results

if __name__ == "__main__":
task_deps = {
1: (),
2: (1,),
3: (1,),
4: (1,),
5: (2,),
6: (3,),
7: (4,),
8: (5,),
9: (6,),
10: (7,),
11: (8,),
12: (9,),
13: (10,),
14: (11, 12),
15: (14, 13),
}

result = run_graph(task_deps, do_task)
print(result)


Related Topics



Leave a reply



Submit