Sharing a Complex Object Between Processes

Sharing a complex object between processes?

You can do this using Python's multiprocessing "Manager" classes and a proxy class that you define. See Proxy Objects in the Python docs.

What you want to do is define a proxy class for your custom object, and then share the object using a "Remote Manager" -- look at the examples in the same linked doc page in the "Using a remote manager" section where the docs show how to share a remote queue. You're going to be doing the same thing, but your call to your_manager_instance.register() will include your custom proxy class in its argument list.

In this manner, you're setting up a server to share the custom object with a custom proxy. Your clients need access to the server (again, see the excellent documentation examples of how to setup client/server access to a remote queue, but instead of sharing a Queue, you are sharing access to your specific class).

share python object between multiprocess in python3

I was at your place five month ago. I looked around few times but my conclusion is multiprocessing with Python has exactly the problem you describe :

  • Pipes and Queue are good but not for big objects from my experience
  • Manager() proxies objects are slow except arrays and those one are limited. if you want to share a complex data structure use a Namespace like it is done here : multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes
  • Manager() has a shared list you are looking for : https://docs.python.org/3.6/library/multiprocessing.html
  • There are no pointers or real memory management in Python, so you can't share selected memory cells

I solved this kind of problem by learning C++, but it's probably not what you want to read...

Share object between processes in python

Just found a solution... main:

from multiprocessing.managers import BaseManager

class ShareManager(BaseManager):
pass


ShareManager.register('SharedData', SharedData)

if __name__ == "__main__":
manager = ShareManager()
manager.start()
shared = manager.SharedData()
FirstClass(shared).start()

where SharedData is my shared class... and this is the sub class:

class FirstClass(multiprocessing.Process):

def __init__(self, shared):
super(FirstClass, self).__init__()
self.shared = shared

PS. make sure the main thread don't die or you will lose the manager

Sharing a complex python object in memory between separate processes

For complex objects there isn't readily available method to directly share memory between processes. If you have simple ctypes you can do this in a c-style shared memory but it won't map directly to python objects.

There is a simple solution that works well if you only need a portion of your data at any one time, not the entire 36GB. For this you can use a SyncManager from multiprocessing.managers. Using this, you setup a server that serves up a proxy class for your data (your data isn't stored in the class, the proxy only provides access to it). Your client then attaches to the server using a BaseManager and calls methods in the proxy class to retrieve the data.

Behind the scenes the Manager classes take care of pickling the data you ask for and sending it through the open port from server to client. Because you're pickling data with every call this isn't efficient if you need your entire dataset. In the case where you only need a small portion of the data in the client, the method saves a lot of time since the data only needs to be loaded once by the server.

The solution is comparable to a database solution speed-wise but it can save you a lot of complexity and DB-learning if you'd prefer to keep to a purely pythonic solution.

Here's some example code that is meant to work with GloVe word vectors.

Server

#!/usr/bin/python
import sys
from multiprocessing.managers import SyncManager
import numpy

# Global for storing the data to be served
gVectors = {}

# Proxy class to be shared with different processes
# Don't but the big vector data in here since that will force it to
# be piped to the other process when instantiated there, instead just
# return the global vector data, from this process, when requested.
class GloVeProxy(object):
def __init__(self):
pass

def getNVectors(self):
global gVectors
return len(gVectors)

def getEmpty(self):
global gVectors
return numpy.zeros_like(gVectors.values()[0])

def getVector(self, word, default=None):
global gVectors
return gVectors.get(word, default)

# Class to encapsulate the server functionality
class GloVeServer(object):
def __init__(self, port, fname):
self.port = port
self.load(fname)

# Load the vectors into gVectors (global)
@staticmethod
def load(filename):
global gVectors
f = open(filename, 'r')
for line in f:
vals = line.rstrip().split(' ')
gVectors[vals[0]] = numpy.array(vals[1:]).astype('float32')

# Run the server
def run(self):
class myManager(SyncManager): pass
myManager.register('GloVeProxy', GloVeProxy)
mgr = myManager(address=('', self.port), authkey='GloVeProxy01')
server = mgr.get_server()
server.serve_forever()

if __name__ == '__main__':
port = 5010
fname = '/mnt/raid/Data/Misc/GloVe/WikiGiga/glove.6B.50d.txt'

print 'Loading vector data'
gs = GloVeServer(port, fname)

print 'Serving data. Press <ctrl>-c to stop.'
gs.run()

Client

from   multiprocessing.managers import BaseManager
import psutil #3rd party module for process info (not strictly required)

# Grab the shared proxy class. All methods in that class will be availble here
class GloVeClient(object):
def __init__(self, port):
assert self._checkForProcess('GloVeServer.py'), 'Must have GloVeServer running'
class myManager(BaseManager): pass
myManager.register('GloVeProxy')
self.mgr = myManager(address=('localhost', port), authkey='GloVeProxy01')
self.mgr.connect()
self.glove = self.mgr.GloVeProxy()

# Return the instance of the proxy class
@staticmethod
def getGloVe(port):
return GloVeClient(port).glove

# Verify the server is running
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
if proc.name() == name:
return True
return False

if __name__ == '__main__':
port = 5010
glove = GloVeClient.getGloVe(port)

for word in ['test', 'cat', '123456']:
print('%s = %s' % (word, glove.getVector(word)))

Note that the psutil library is just used to check to see if you have the server running, it's not required. Be sure to name the server GloVeServer.py or change the check by psutil in the code so it looks for the correct name.

multiprocessing - sharing a complex object

I'm afraid virtually nothing here works the way you hope it works :-(

First note that identical id() values produced by different processes tell you nothing about whether the objects are really the same object. Each process has its own virtual address space, assigned by the operating system. The same virtual address in two processes can refer to entirely different physical memory locations. Whether your code produces the same id() output or not is pretty much purely accidental. Across multiple runs, sometimes I see different id() output in your Process section and repeated id() output in your Pool section, or vice versa, or both.

Second, a Manager supplies semantic sharing but not physical sharing. The data for your numeri instance lives only in the manager process. All your worker processes see (copies of) proxy objects. Those are thin wrappers that forward all operations to be performed by the manager process. This involves lots of inter-process communication, and serialization inside the manager process. This is a great way to write really slow code ;-) Yes, there is only one copy of the numeri data, but all work on it is done by a single process (the manager process).

To see this more clearly, make the changes @martineau suggested, and also change get_list_id() to this:

def get_list_id(self):  # added method
import os
print("get_list_id() running in process", os.getpid())
return id(self.nl)

Here's sample output:

41543664
------------ Process
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 46268496
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 44153904
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
--------------- Pool
producing 41639248
get_list_id() running in process 5856
with list_id 44544608
producing 41777200
get_list_id() running in process 5856
with list_id 44544608
producing 41776816
get_list_id() running in process 5856
with list_id 44544608
producing 41777168
get_list_id() running in process 5856
with list_id 44544608
producing 41777136
get_list_id() running in process 5856
with list_id 44544608

Clear? The reason you get the same list id each time is not because each worker process has the same self.nl member, it's because all numeri methods run in a single process (the manager process). That's why the list id is always the same.

If you're running on a Linux-y system (an OS that supports fork()), a much better idea is to forget all this Manager stuff and create your complex object at module level before starting any worker processes. Then the workers will inherit (address-space copies of) your complex object. The usual copy-on-write fork() semantics will make that about as memory-efficient as possible. That's sufficient if mutations don't need to be folded back into the main program's copy of the complex object. If mutations do need to be folded back in, then you're back to needing lots of inter-process communication, and multiprocessing becomes correspondingly less attractive.

There are no easy answers here. Don't shoot the messenger ;-)

How to share data between two processes?

I know there has been a couple of close votes against this question, but the supposed duplicate question's answer does not really explain why the OP's program does not work as is and the offered solution is not what I would propose. Hence:

Let's analyze what is happening. The creation of obj = exp() is done by the main process. The execution of exp.func1 occurs is a different process/address space and therefore the obj object a must be serialized/de-serialized to the address space of that process. In that new address space self.var1 comes across with the initial value of 0 and is then set to 5, but only the copy of the obj object that is in the address space of process p1 is being modified; the copy of that object that exists in the main process has not been modified. Then when you start process p2, another copy of obj from the main process is sent to the new process, but still with self.var1 having a value of 0.

The solution is for self.var1 to be an instance of multiprocessing.Value, which is a special variable that exists in shared memory accessible to all procceses. See the docs.

from multiprocessing import Process, Value

class exp:
def __init__(self):
self.var1 = Value('i', 0, lock=False)

def func1(self):

self.var1.value = 5
print(self.var1.value)

def func2(self):

print(self.var1.value)


if __name__ == "__main__":

#multiprocessing
obj = exp()
p1 = Process(target = obj.func1)
p2 = Process(target = obj.func2)

print("multiprocessing")
p1.start()
# No need to sleep, just wait for p1 to complete
# before starting p2:
#time.sleep(2)
p1.join()
p2.start()
p2.join()

Prints:

multiprocessing
5
5

Note

Using shared memory for this particular problem is much more efficient than using a managed class, which is referenced by the "close" comment.

The assignment of 5 to self.var1.value is an atomic operation and does not need to be a serialized operation. But if:

  1. We were performing a non-atomic operation (requires multiple steps) such as self.var1.value += 1 and:
  2. Multiple processes were performing this non-atomic operation in parallel, then:
  3. We should create the value with a lock: self.var1 = Value('i', 0, lock=True) and:
  4. Update the value under control of the lock: with self.var1.get_lock(): self.var1.value += 1

How can I share a class between processes?

multiprocessing.Value isn't designed to be used with custom classes, it's supposed to be similar to a multiprocessing.sharedctypes.Value. Instead, you need to create a custom manager and register your class with it. Your life will also be easier if you don't access value directly, but modify/access it via methods, which will get exported by the default Proxy created for your class by default. Regular attributes (like Counter.value) aren't, so they aren't accessible without additional customization. Here's a working example:

import multiprocessing
from multiprocessing.managers import BaseManager

class MyManager(BaseManager): pass

def Manager():
m = MyManager()
m.start()
return m

class Counter(object):
def __init__(self):
self._value = 0

def update(self, value):
self._value += value

def get_value(self):
return self._value

MyManager.register('Counter', Counter)

def update(counter_proxy, thread_id):
counter_proxy.update(1)
print counter_proxy.get_value(), 't%s' % thread_id, \
multiprocessing.current_process().name
return counter_proxy

def main():
manager = Manager()
counter = manager.Counter()
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for i in range(10):
pool.apply(func=update, args=(counter, i))
pool.close()
pool.join()

print 'Should be 10 but is %s.' % counter.get_value()

if __name__ == '__main__':
main()

Output:

1 t0 PoolWorker-2
2 t1 PoolWorker-8
3 t2 PoolWorker-4
4 t3 PoolWorker-5
5 t4 PoolWorker-6
6 t5 PoolWorker-7
7 t6 PoolWorker-3
8 t7 PoolWorker-9
9 t8 PoolWorker-2
10 t9 PoolWorker-8
Should be 10 but is 10.


Related Topics



Leave a reply



Submit