Multiprocessing in Python - Sharing Large Object (E.G. Pandas Dataframe) Between Multiple Processes

Sharing python objects (e.g. Pandas Dataframe) between independently running python scripts

You already have quite the right sense of what you can do to use your data.

The best solution depends on your actual needs,
so I will try to cover the possibilities with a working example.

What you want

If I understand your need completely, you want to

  • continuously update a DataFrame (from a websocket)
  • while doing some computations on the same DataFrame
  • keeping the DataFrame up to date on the computation workers,
  • one computation is CPU intensive
  • another is not.

What you need

As you said, you will need a way to run different threads or processes in order to keep the computation running.

How about Threads

The easiest way to achieve what you want would be to use the threading library.
Since threads can share memory, and you only have one worker actually updating the DataFrame, it is quite easy to propose a way to manage the data up to date:

import time
from dataclasses import dataclass

import pandas
from threading import Thread

@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])

def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)

class StreamLoader:
"""This class is our worker communicating with the websocket"""

def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder

def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)

def run(self):
# limit loop for the showcase
for _ in range(5):
self.update_df()
print("[1] Updated DF %s" % str(self.df_holder.dataframe))
time.sleep(3)

class LightComputation:
"""This class is a random computation worker"""

def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder

def compute(self):
print("[2] Current DF %s" % str(self.df_holder.dataframe))

def run(self):
# limit loop for the showcase
for _ in range(5):
self.compute()
time.sleep(5)

def main():
# We create a DataFrameHolder to keep our DataFrame available.
df_holder = DataFrameHolder()

# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()

# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Thread(target=compute.run)
compute_process.start()

# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()

if __name__ == "__main__":
main()

Note that we use a class to hold reference of the current DataFrame because some operations like append are not necessarily inplace,
thus, if we directly sent the reference to the DataFrame object, the modification would be lost on the worker.
Here the DataFrameHolder object will keep the same location in memory, thus the worker can still access the updated DataFrame.

Processes may be more powerful

Now if you need more computation power, processes may be more useful since they enable you to isolate your worker on a different core.
To start a Process instead of a Thread in python, you can use the multiprocessing library.
The API of both objects are the same and you will only have to change the constructors as follow

from threading import Thread
# I create a thread
compute_process = Thread(target=compute.run)

from multiprocessing import Process
# I create a process that I can use the same way
compute_process = Process(target=compute.run)

Now if you tried to change the values in the above script, you will see that the DataFrame is not updating correctly.

For this you will need more work since the two processes don't share memory, and you have multiple ways of communicating between them (https://en.wikipedia.org/wiki/Inter-process_communication)

The reference of @SimonCrane is quite interesting on the matters and showcases the use of a shared-memory between two processes using multiprocessing.manager.

Here is a version with the worker using a separate process with a shared memory:

import logging
import multiprocessing
import time
from dataclasses import dataclass
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from threading import Thread

import pandas

@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])

def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)

def retrieve(self):
return self.dataframe

class DataFrameManager(BaseManager):
"""This dataclass handles shared DataFrameHolder.
See https://docs.python.org/3/library/multiprocessing.html#examples
"""
# You can also use a socket file '/tmp/shared_df'
MANAGER_ADDRESS = ('localhost', 6000)
MANAGER_AUTH = b"auth"

def __init__(self) -> None:
super().__init__(self.MANAGER_ADDRESS, self.MANAGER_AUTH)
self.dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])

@classmethod
def register_dataframe(cls):
BaseManager.register("DataFrameHolder", DataFrameHolder)

class DFWorker:
"""Abstract class initializing a worker depending on a DataFrameHolder."""

def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder

class StreamLoader(DFWorker):
"""This class is our worker communicating with the websocket"""

def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)

def run(self):
# limit loop for the showcase
for _ in range(4):
self.update_df()
print("[1] Updated DF\n%s" % str(self.df_holder.retrieve()))
time.sleep(3)

class LightComputation(DFWorker):
"""This class is a random computation worker"""

def compute(self):
print("[2] Current DF\n%s" % str(self.df_holder.retrieve()))

def run(self):
# limit loop for the showcase
for _ in range(4):
self.compute()
time.sleep(5)

def main():
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)

# Register our DataFrameHolder type in the DataFrameManager.
DataFrameManager.register_dataframe()
manager = DataFrameManager()
manager.start()
# We create a managed DataFrameHolder to keep our DataFrame available.
df_holder = manager.DataFrameHolder()

# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()

# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Process(target=compute.run)
compute_process.start()

# The managed dataframe is updated in every Thread/Process
time.sleep(5)
print("[0] Main process DF\n%s" % df_holder.retrieve())

# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()

if __name__ == "__main__":
main()

As you can see, the differences between threading and processing are quite tiny.

With a few tweaks, you can start from there to connect to the same manager if you want to use a different file to handle your CPU intensive processing.

Python: Sharing large dataframes between multiprocessing process

You are using multiple processes mainly because you want to read the data in parallel. Once it has been read in there doesn't seem to be much reason why you should continue with two processes. I.e. your processes should read the data, then terminate and let the main process continue.

I would however recommend using multi-threading instead of multi-processing. The differences are not always noticeable, but multi-threading will make it simpler to share global variables between your main thread and the subthreads (I'll explain this below). Another advantage of multi-threading is that it doesn't cause the whole application to crash if a single thread fails. That is not the case with multi-processing. See more here: http://net-informations.com/python/iq/multi.htm

It takes some time to get used to how parallelism works in Python, and one of the main considerations you have to pay attention to is how to ensure what you are doing is thread-safe. Usually the mechanism you use for passing data to and from a thread is a queue. This ensures that only one thread is accessing the same object at any given time.

That being said, in your simple example you could simply define two global variables and start two threads that each read data into one of these global variables (i.e. no sharing of variables across threads). You also have to tell your main thread to wait until both your threads have completed before continuing as otherwise the main thread could try to access the data while the subthreads are still working on them. (Again, normally you would adopt a queue-based strategy to avoid this problem, but you don't necessarily need that here).

Here is some example code:

import threading
import pandas as pd
import time

def get_df_1():
#set the scope of the variable to "global", meaning editing it here, it is edited globally
global df_1

# read in your xml file here (for the example I simply create some dummy data)
data = [['tom', 10], ['nick', 15], ['juli', 14]]
df_1 = pd.DataFrame(data, columns=['Name', 'Age'])

# wait five seconds (for illustration purposes to simulate working time)
time.sleep(5)
print("df_1 fetched")

def get_df_2():
global df_2
data = [['tom', 176], ['nick', 182], ['juli', 167]]
df_2 = pd.DataFrame(data, columns=['Name', 'Height'])
time.sleep(5)
print("df_2 fetched")

df_1 = None
df_2 = None

#define threads
t1 = threading.Thread(target=get_df_1)
t2 = threading.Thread(target=get_df_2)

# start threads
t1.start()
t2.start()

#this will print immediately
print("Threads have been started")

# wait until threads finish
t1.join()
t2.join()

#this will only print after the threads are done
print("Threads have finished")

print(df_1)
print(df_2)

How can multiple child processes write in the same shared memory dataframe in python?

I would not add the rows to ns.df inside each child process individually but collect them after each child process has terminated. Look at this example:

from concurrent.futures import ProcessPoolExecutor

import pandas as pd

def child_process(child_id):
return pd.DataFrame({"column": [f"child_{child_id}"]})

df_main = pd.DataFrame({"column": ["parent"]})

with ProcessPoolExecutor(max_workers=4) as pool:
child_dfs = list(pool.map(child_process, range(5)))

df_all = pd.concat([df_main, *child_dfs])
print(df_all)

Output

    column
0 parent
0 child_0
0 child_1
0 child_2
0 child_3
0 child_4

If you changed ns.df inside each child process, it would be actually a shared memory object.

Caveat: if the returned dataframes of the child processes are very big, then using multiprocessing might add significant overhead since the dataframes have to be pickled before reloading them in the main process. Depending on what the actual child process does (maybe a lot of I/O or it uses C functions which release the GIL), it might be better to use multithreading instead.

Share a dictionary of pandas dataframe across multiprocessing python

The best solution I've found (and it only works for some types of problems) is to use a client/server setup using Python's BaseManager and SyncManager classes. To do this you first setup a Server that serve's up a proxy class for the data.

DataServer.py

#!/usr/bin/python
from multiprocessing.managers import SyncManager
import numpy

# Global for storing the data to be served
gData = {}

# Proxy class to be shared with different processes
# Don't put big data in here since that will force it to be piped to the
# other process when instantiated there, instead just return a portion of
# the global data when requested.
class DataProxy(object):
def __init__(self):
pass

def getData(self, key, default=None):
global gData
return gData.get(key, None)

if __name__ == '__main__':
port = 5000

print 'Simulate loading some data'
for i in xrange(1000):
gData[i] = numpy.random.rand(1000)

# Start the server on address(host,port)
print 'Serving data. Press <ctrl>-c to stop.'
class myManager(SyncManager): pass
myManager.register('DataProxy', DataProxy)
mgr = myManager(address=('', port), authkey='DataProxy01')
server = mgr.get_server()
server.serve_forever()

Run the above once and leave it running. Below is the client class you use to access the data.

DataClient.py

from   multiprocessing.managers import BaseManager
import psutil #3rd party module for process info (not strictly required)

# Grab the shared proxy class. All methods in that class will be availble here
class DataClient(object):
def __init__(self, port):
assert self._checkForProcess('DataServer.py'), 'Must have DataServer running'
class myManager(BaseManager): pass
myManager.register('DataProxy')
self.mgr = myManager(address=('localhost', port), authkey='DataProxy01')
self.mgr.connect()
self.proxy = self.mgr.DataProxy()

# Verify the server is running (not required)
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
if proc.name() == name:
return True
return False

Below is the test code to try this with multiprocessing.

TestMP.py

#!/usr/bin/python
import time
import multiprocessing as mp
import numpy
from DataClient import *

# Confusing, but the "proxy" will be global to each subprocess,
# it's not shared across all processes.
gProxy = None
gMode = None
gDummy = None
def init(port, mode):
global gProxy, gMode, gDummy
gProxy = DataClient(port).proxy
gMode = mode
gDummy = numpy.random.rand(1000) # Same as the dummy in the server
#print 'Init proxy ', id(gProxy), 'in ', mp.current_process()

def worker(key):
global gProxy, gMode, gDummy
if 0 == gMode: # get from proxy
array = gProxy.getData(key)
elif 1 == gMode: # bypass retrieve to test difference
array = gDummy
else: assert 0, 'unknown mode: %s' % gMode
for i in range(1000):
x = sum(array)
return x

if __name__ == '__main__':
port = 5000
maxkey = 1000
numpts = 100

for mode in [1, 0]:
for nprocs in [16, 1]:
if 0==mode: print 'Using client/server and %d processes' % nprocs
if 1==mode: print 'Using local data and %d processes' % nprocs
keys = [numpy.random.randint(0,maxkey) for k in xrange(numpts)]
pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
start = time.time()
ret_data = pool.map(worker, keys, chunksize=1)
print ' took %4.3f seconds' % (time.time()-start)
pool.close()

When I run this on my machine I get...

Using local data and 16 processes
took 0.695 seconds
Using local data and 1 processes
took 5.849 seconds
Using client/server and 16 processes
took 0.811 seconds
Using client/server and 1 processes
took 5.956 seconds

Whether this works for you in your multiprocessing system depends on how often have to grab the data. There's a small overhead associated with each transfer. You can see this if you turn down the number of iterations in the x=sum(array) loop. At some point you'll spend more time getting data than working on it.

Besides multiprocessing, I also like this pattern because I only have to load my big array data once in the server program and it stays loaded until I kill the server. That means I can run a bunch of separate scripts against the data and they execute quickly; no waiting for data to load.

While the approach here is somewhat similar to using a database, it has the advantage of working on any type of python object, not just simple DB tables of strings and ints, etc. I've found that using a DB is a bit faster for those simple types but for me, it tends to be more work programatically and my data doesn't always port over easily to a database.



Related Topics



Leave a reply



Submit