Sharing a complex object between processes?
You can do this using Python's multiprocessing
"Manager" classes and a proxy class that you define. See Proxy Objects in the Python docs.
What you want to do is define a proxy class for your custom object, and then share the object using a "Remote Manager" -- look at the examples in the same linked doc page in the "Using a remote manager" section where the docs show how to share a remote queue. You're going to be doing the same thing, but your call to your_manager_instance.register()
will include your custom proxy class in its argument list.
In this manner, you're setting up a server to share the custom object with a custom proxy. Your clients need access to the server (again, see the excellent documentation examples of how to setup client/server access to a remote queue, but instead of sharing a Queue
, you are sharing access to your specific class).
share python object between multiprocess in python3
I was at your place five month ago. I looked around few times but my conclusion is multiprocessing with Python has exactly the problem you describe :
- Pipes and Queue are good but not for big objects from my experience
- Manager() proxies objects are slow except arrays and those one are limited. if you want to share a complex data structure use a Namespace like it is done here : multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes
- Manager() has a shared list you are looking for : https://docs.python.org/3.6/library/multiprocessing.html
- There are no pointers or real memory management in Python, so you can't share selected memory cells
I solved this kind of problem by learning C++, but it's probably not what you want to read...
Share object between processes in python
Just found a solution... main:
from multiprocessing.managers import BaseManager
class ShareManager(BaseManager):
pass
ShareManager.register('SharedData', SharedData)
if __name__ == "__main__":
manager = ShareManager()
manager.start()
shared = manager.SharedData()
FirstClass(shared).start()
where SharedData is my shared class... and this is the sub class:
class FirstClass(multiprocessing.Process):
def __init__(self, shared):
super(FirstClass, self).__init__()
self.shared = shared
PS. make sure the main thread don't die or you will lose the manager
Sharing a complex python object in memory between separate processes
For complex objects there isn't readily available method to directly share memory between processes. If you have simple ctypes
you can do this in a c-style shared memory but it won't map directly to python objects.
There is a simple solution that works well if you only need a portion of your data at any one time, not the entire 36GB. For this you can use a SyncManager
from multiprocessing.managers
. Using this, you setup a server that serves up a proxy class for your data (your data isn't stored in the class, the proxy only provides access to it). Your client then attaches to the server using a BaseManager
and calls methods in the proxy class to retrieve the data.
Behind the scenes the Manager
classes take care of pickling the data you ask for and sending it through the open port from server to client. Because you're pickling data with every call this isn't efficient if you need your entire dataset. In the case where you only need a small portion of the data in the client, the method saves a lot of time since the data only needs to be loaded once by the server.
The solution is comparable to a database solution speed-wise but it can save you a lot of complexity and DB-learning if you'd prefer to keep to a purely pythonic solution.
Here's some example code that is meant to work with GloVe word vectors.
Server
#!/usr/bin/python
import sys
from multiprocessing.managers import SyncManager
import numpy
# Global for storing the data to be served
gVectors = {}
# Proxy class to be shared with different processes
# Don't but the big vector data in here since that will force it to
# be piped to the other process when instantiated there, instead just
# return the global vector data, from this process, when requested.
class GloVeProxy(object):
def __init__(self):
pass
def getNVectors(self):
global gVectors
return len(gVectors)
def getEmpty(self):
global gVectors
return numpy.zeros_like(gVectors.values()[0])
def getVector(self, word, default=None):
global gVectors
return gVectors.get(word, default)
# Class to encapsulate the server functionality
class GloVeServer(object):
def __init__(self, port, fname):
self.port = port
self.load(fname)
# Load the vectors into gVectors (global)
@staticmethod
def load(filename):
global gVectors
f = open(filename, 'r')
for line in f:
vals = line.rstrip().split(' ')
gVectors[vals[0]] = numpy.array(vals[1:]).astype('float32')
# Run the server
def run(self):
class myManager(SyncManager): pass
myManager.register('GloVeProxy', GloVeProxy)
mgr = myManager(address=('', self.port), authkey='GloVeProxy01')
server = mgr.get_server()
server.serve_forever()
if __name__ == '__main__':
port = 5010
fname = '/mnt/raid/Data/Misc/GloVe/WikiGiga/glove.6B.50d.txt'
print 'Loading vector data'
gs = GloVeServer(port, fname)
print 'Serving data. Press <ctrl>-c to stop.'
gs.run()
Client
from multiprocessing.managers import BaseManager
import psutil #3rd party module for process info (not strictly required)
# Grab the shared proxy class. All methods in that class will be availble here
class GloVeClient(object):
def __init__(self, port):
assert self._checkForProcess('GloVeServer.py'), 'Must have GloVeServer running'
class myManager(BaseManager): pass
myManager.register('GloVeProxy')
self.mgr = myManager(address=('localhost', port), authkey='GloVeProxy01')
self.mgr.connect()
self.glove = self.mgr.GloVeProxy()
# Return the instance of the proxy class
@staticmethod
def getGloVe(port):
return GloVeClient(port).glove
# Verify the server is running
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
if proc.name() == name:
return True
return False
if __name__ == '__main__':
port = 5010
glove = GloVeClient.getGloVe(port)
for word in ['test', 'cat', '123456']:
print('%s = %s' % (word, glove.getVector(word)))
Note that the psutil
library is just used to check to see if you have the server running, it's not required. Be sure to name the server GloVeServer.py
or change the check by psutil
in the code so it looks for the correct name.
multiprocessing - sharing a complex object
I'm afraid virtually nothing here works the way you hope it works :-(
First note that identical id()
values produced by different processes tell you nothing about whether the objects are really the same object. Each process has its own virtual address space, assigned by the operating system. The same virtual address in two processes can refer to entirely different physical memory locations. Whether your code produces the same id()
output or not is pretty much purely accidental. Across multiple runs, sometimes I see different id()
output in your Process
section and repeated id()
output in your Pool
section, or vice versa, or both.
Second, a Manager
supplies semantic sharing but not physical sharing. The data for your numeri
instance lives only in the manager process. All your worker processes see (copies of) proxy objects. Those are thin wrappers that forward all operations to be performed by the manager process. This involves lots of inter-process communication, and serialization inside the manager process. This is a great way to write really slow code ;-) Yes, there is only one copy of the numeri
data, but all work on it is done by a single process (the manager process).
To see this more clearly, make the changes @martineau suggested, and also change get_list_id()
to this:
def get_list_id(self): # added method
import os
print("get_list_id() running in process", os.getpid())
return id(self.nl)
Here's sample output:
41543664
------------ Process
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 46268496
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 44153904
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
--------------- Pool
producing 41639248
get_list_id() running in process 5856
with list_id 44544608
producing 41777200
get_list_id() running in process 5856
with list_id 44544608
producing 41776816
get_list_id() running in process 5856
with list_id 44544608
producing 41777168
get_list_id() running in process 5856
with list_id 44544608
producing 41777136
get_list_id() running in process 5856
with list_id 44544608
Clear? The reason you get the same list id each time is not because each worker process has the same self.nl
member, it's because all numeri
methods run in a single process (the manager process). That's why the list id is always the same.
If you're running on a Linux-y system (an OS that supports fork()
), a much better idea is to forget all this Manager
stuff and create your complex object at module level before starting any worker processes. Then the workers will inherit (address-space copies of) your complex object. The usual copy-on-write fork()
semantics will make that about as memory-efficient as possible. That's sufficient if mutations don't need to be folded back into the main program's copy of the complex object. If mutations do need to be folded back in, then you're back to needing lots of inter-process communication, and multiprocessing
becomes correspondingly less attractive.
There are no easy answers here. Don't shoot the messenger ;-)
How to share data between two processes?
I know there has been a couple of close votes against this question, but the supposed duplicate question's answer does not really explain why the OP's program does not work as is and the offered solution is not what I would propose. Hence:
Let's analyze what is happening. The creation of obj = exp()
is done by the main process. The execution of exp.func1
occurs is a different process/address space and therefore the obj
object a must be serialized/de-serialized to the address space of that process. In that new address space self.var1
comes across with the initial value of 0 and is then set to 5, but only the copy of the obj
object that is in the address space of process p1
is being modified; the copy of that object that exists in the main process has not been modified. Then when you start process p2
, another copy of obj
from the main process is sent to the new process, but still with self.var1
having a value of 0.
The solution is for self.var1
to be an instance of multiprocessing.Value
, which is a special variable that exists in shared memory accessible to all procceses. See the docs.
from multiprocessing import Process, Value
class exp:
def __init__(self):
self.var1 = Value('i', 0, lock=False)
def func1(self):
self.var1.value = 5
print(self.var1.value)
def func2(self):
print(self.var1.value)
if __name__ == "__main__":
#multiprocessing
obj = exp()
p1 = Process(target = obj.func1)
p2 = Process(target = obj.func2)
print("multiprocessing")
p1.start()
# No need to sleep, just wait for p1 to complete
# before starting p2:
#time.sleep(2)
p1.join()
p2.start()
p2.join()
Prints:
multiprocessing
5
5
Note
Using shared memory for this particular problem is much more efficient than using a managed class, which is referenced by the "close" comment.
The assignment of 5 to self.var1.value
is an atomic operation and does not need to be a serialized operation. But if:
- We were performing a non-atomic operation (requires multiple steps) such as
self.var1.value += 1
and: - Multiple processes were performing this non-atomic operation in parallel, then:
- We should create the value with a lock:
self.var1 = Value('i', 0, lock=True)
and: - Update the value under control of the lock:
with self.var1.get_lock(): self.var1.value += 1
How can I share a class between processes?
multiprocessing.Value
isn't designed to be used with custom classes, it's supposed to be similar to a multiprocessing.sharedctypes.Value
. Instead, you need to create a custom manager and register your class with it. Your life will also be easier if you don't access value
directly, but modify/access it via methods, which will get exported by the default Proxy
created for your class by default. Regular attributes (like Counter.value
) aren't, so they aren't accessible without additional customization. Here's a working example:
import multiprocessing
from multiprocessing.managers import BaseManager
class MyManager(BaseManager): pass
def Manager():
m = MyManager()
m.start()
return m
class Counter(object):
def __init__(self):
self._value = 0
def update(self, value):
self._value += value
def get_value(self):
return self._value
MyManager.register('Counter', Counter)
def update(counter_proxy, thread_id):
counter_proxy.update(1)
print counter_proxy.get_value(), 't%s' % thread_id, \
multiprocessing.current_process().name
return counter_proxy
def main():
manager = Manager()
counter = manager.Counter()
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for i in range(10):
pool.apply(func=update, args=(counter, i))
pool.close()
pool.join()
print 'Should be 10 but is %s.' % counter.get_value()
if __name__ == '__main__':
main()
Output:
1 t0 PoolWorker-2
2 t1 PoolWorker-8
3 t2 PoolWorker-4
4 t3 PoolWorker-5
5 t4 PoolWorker-6
6 t5 PoolWorker-7
7 t6 PoolWorker-3
8 t7 PoolWorker-9
9 t8 PoolWorker-2
10 t9 PoolWorker-8
Should be 10 but is 10.
Related Topics
Python: Using Doctests for Classes
Creating a New Dataframe Column by Comparing Strings of Two Unequal Dataframes
Get Max Value Comparing Multiple Columns and Return Specific Values
How to Read a Column Without Header from CSV and Save the Output in a Txt File Using Python
How to Convert a Datetime Object to Milliseconds Since Epoch (Unix Time) in Python
Find Matching Rows in 2 Dimensional Numpy Array
How to Write 2 Lists of Items in 2 Columns Instead of 2 Arrays
Unpivot Multiple Columns With Same Name in Pandas Dataframe
Drop Rows Containing Empty Cells from a Pandas Dataframe
Import Local Module in Jupyter Notebook
How to Decompile a Compiled .Pyc File into a .Py File
How to Create Dynamic Workflows in Airflow
Faster Way to Read Excel Files to Pandas Dataframe
Python: Split a List into Multiple Lists Based on a Subset of Elements
Pyspark Replace All Values in Dataframe With Another Values
Sklearn: Typeerror: Fit() Missing 1 Required Positional Argument: 'X"