Multiprocessing.Pool - Picklingerror: Can't Pickle <Type 'Thread.Lock'>: Attribute Lookup Thread.Lock Failed

multiprocessing.Pool - PicklingError: Can't pickle type 'thread.lock': attribute lookup thread.lock failed

multiprocessing passes tasks (which include check_one and data) to the worker processes through a mp.SimpleQueue. Unlike Queue.Queues, everything put in the mp.SimpleQueue must be pickable. Queue.Queues are not pickable:

import multiprocessing as mp
import Queue

def foo(queue):
pass

pool=mp.Pool()
q=Queue.Queue()

pool.map(foo,(q,))

yields this exception:

UnpickleableError: Cannot pickle <type 'thread.lock'> objects

Your data includes packages, which is a Queue.Queue. That might be the source of the problem.


Here is a possible workaround: The Queue is being used for two purposes:

  1. to find out the approximate size (by calling qsize)
  2. to store results for later retrieval.

Instead of calling qsize, to share a value between multiple processes, we could use a mp.Value.

Instead of storing results in a queue, we can (and should) just return values from calls to check_one. The pool.map collects the results in a queue of its own making, and returns the results as the return value of pool.map.

For example:

import multiprocessing as mp
import Queue
import random
import logging

# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)

qsize = mp.Value('i', 1)
def check_one(args):
total, package, version = args
i = qsize.value
logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
i / float(total), package, i, total))
new_version = random.randrange(0,100)
qsize.value += 1
if new_version > version:
return (package, version, new_version, None)
else:
return None

def update():
logger.info('Searching for updates')
set_len=10
data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
for i in range(set_len) )
pool = mp.Pool()
results = pool.map(check_one, data)
pool.close()
pool.join()
for result in results:
if result is None: continue
package, version, new_version, json = result
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
package, new_version, version)
logger.info(txt)
logger.info('Updating finished successfully')

if __name__=='__main__':
logging.basicConfig(level=logging.DEBUG)
update()

Python multiprocessing PicklingError: Can't pickle type 'function'

Here is a list of what can be pickled. In particular, functions are only picklable if they are defined at the top-level of a module.

This piece of code:

import multiprocessing as mp

class Foo():
@staticmethod
def work(self):
pass

if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()

yields an error almost identical to the one you posted:

Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

The problem is that the pool methods all use a mp.SimpleQueue to pass tasks to the worker processes. Everything that goes through the mp.SimpleQueue must be pickable, and foo.work is not picklable since it is not defined at the top level of the module.

It can be fixed by defining a function at the top level, which calls foo.work():

def work(foo):
foo.work()

pool.apply_async(work,args=(foo,))

Notice that foo is pickable, since Foo is defined at the top level and foo.__dict__ is picklable.

TypeError: can't pickle _thread.lock objects

multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

Move the queue to self instead of as an argument to your functions package and send

Trouble using a lock with multiprocessing.Pool: pickling error

Your problem is that lock objects are not picklable. I can see two possible solutions for you in that case.

  • To avoid this, you can make your lock variable a global variable. Then you will be able to reference it within your pool process function directly as a global variable, and will not have to pass it as an argument to the pool process function. This works because Python uses the OS fork mechanism when creating the pool processes and hence copies the entire contents of the process that creates the pool processes to them. This is the only way of passing a lock to a Python process created with the multiprocessing package. Incidentally, it is not necessary to use the Manager class just for this lock. With this change your code would look like this:

    import multiprocessing
    from functools import partial

    lock = None # Global definition of lock
    pool = None # Global definition of pool

    def make_network(initial_tag, max_tags=2, max_iter=3):
    global lock
    global pool
    lock = multiprocessing.Lock()
    pool = multiprocessing.Pool(8)

    def get_more_tags():
    global lock
    pass

    # this is a very expensive function that I would like to parallelize
    # over a list of tags. It involves a (relatively cheap) call to an external
    # database, which needs a lock to avoid simultaneous queries. It takes a
    # list of strings (tags) as its sole argument, and returns a list of sets
    # with entries corresponding to the input list.
    f = partial(get_more_tags, max_tags=max_tags)

    def _recursively_find_more_tags(tags, level):
    global pool
    if level >= max_iter:
    raise StopIteration
    new_tags = pool.map(f, tags)
    to_search = []
    for i, s in zip(tags, new_tags):
    for t in s:
    joined = ' '.join(t)
    print(i + "|" + joined)
    to_search.append(joined)
    try:
    return _recursively_find_more_tags(to_search, level + 1)
    except StopIteration:
    return None

    _recursively_find_more_tags([initial_tag], 0)

In your real code, it is possible that the lock and pool variables might be class instance variables.

  • A second solution which avoids the use of locks altogether but which might have slightly higher overhead would be to create another process with multiprocessing.Process and connect it via a multiprocessing.Queue to each of your pool processes. This process would be responsible for running your database query. You would use the queue to allow your pool processes to send parameters to the process that managed the database query. Since all the pool processes would use the same queue, access to the database would automatically be serialized. The additional overheads would come from the pickling/unpickling of the database query arguments and the query response. Note that you can pass a multiprocessing.Queue object to a pool process as an argument. Note also that the multiprocessing.Lock based solution would not work on Windows where process are not created with fork semantics.

Can't pickle type 'instancemethod' when using multiprocessing Pool.map()

The problem is that multiprocessing must pickle things to sling them among processes, and bound methods are not picklable. The workaround (whether you consider it "easy" or not;-) is to add the infrastructure to your program to allow such methods to be pickled, registering it with the copy_reg standard library method.

For example, Steven Bethard's contribution to this thread (towards the end of the thread) shows one perfectly workable approach to allow method pickling/unpickling via copy_reg.

Python - can't pickle thread.lock error when creating a thread under a multiprocess in Windows

It looks like there is no simple answer, and it appears to be a restriction of Windows (Win 7, python 3.6 in my case); on Windows it looks like you need to start the process before you can start the worker thread inside the owned object.

There appears to be no such restriction on Unix (CentOS 7, python 2.7.5).

As an experiment I modified your code as follows; this version checks the OS and starts either the process first, or the thread first:

import multiprocessing
import threading
import time
import os

class MyProcess(multiprocessing.Process):

def __init__(self, **kwargs):
super(MyProcess, self).__init__(**kwargs)
self.dostuff = DoStuff(self)

def run(self):
print("MyProcess.run()")
print("MyProcess.ident = " + repr(self.ident))
if os.name == 'nt':
self.dostuff.start_thread()

class DoStuff(object):
def __init__(self, owner, **kwargs):
super(DoStuff, self).__init__(**kwargs)
self.owner = owner
if os.name != 'nt':
self.start_thread()

def start_thread(self):
print("DoStuff.start_thread()")
self.my_thread_instance = MyThread(self)
self.my_thread_instance.start()
time.sleep(0.1)

class MyThread(threading.Thread):
def __init__(self, owner):
super(MyThread, self).__init__()
self.owner = owner

def run(self):
print("MyThread.run()")
print("MyThread.ident = " + repr(self.ident))
print("MyThread.owner.owner.ident = " + repr(self.owner.owner.ident))

if __name__ == '__main__':
mp_target = MyProcess() # Also pass the pipe to transfer data
mp_target.daemon = True
mp_target.start()
time.sleep(0.1)

... and got the following on Windows, where the process starts first:

MyProcess.run()
MyProcess.ident = 14700
DoStuff.start_thread()
MyThread.run()
MyThread.ident = 14220
MyThread.owner.owner.ident = 14700

... and the following on Linux, where the thread is started first:

DoStuff.start_thread()
MyThread.run()
MyThread.ident = 140316342347520
MyThread.owner.owner.ident = None
MyProcess.run()
MyProcess.ident = 4358

If it were my code I'd be tempted to always start the process first, then create the thread within that process; the following version works fine for me across both platforms:

import multiprocessing
import threading
import time

class MyProcess(multiprocessing.Process):

def __init__(self, **kwargs):
super(MyProcess, self).__init__(**kwargs)
self.dostuff = DoStuff()

def run(self):
print("MyProcess.run()")
self.dostuff.start_thread()

class DoStuff(object):
def __init__(self, **kwargs):
super(DoStuff, self).__init__(**kwargs)

def start_thread(self):
self.my_thread_instance = MyThread()
self.my_thread_instance.start()
time.sleep(0.1)

class MyThread(threading.Thread):
def __init__(self):
super(MyThread, self).__init__()

def run(self):
print("MyThread.run()")

if __name__ == '__main__':
mp_target = MyProcess() # Also pass the pipe to transfer data
mp_target.daemon = True
mp_target.start()
time.sleep(0.1)


Related Topics



Leave a reply



Submit