Multiprocessing Module Showing Memory for Each Child Process Same as Main Process

Python multiprocess: run several instances of a class, keep all child processes in memory

What you are looking for is sharing state between processes. As per the documentation, you can either create shared memory, which is restrictive about the data it can store and is not thread-safe, but offers better speed and performance; or you can use server processes through managers. The latter is what we are going to use since you want to share whole objects of user-defined datatypes. Keep in mind that using managers will impact speed of your code depending on the complexity of the arguments that you pass and receive, to and from the managed objects.

Managers, proxies and pickling

As mentioned, managers create server processes to store objects, and allow access to them through proxies. I have answered a question with better details on how they work, and how to create a suitable proxy here. We are going to use the same proxy defined in the linked answer, with some variations. Namely, I have replaced the factory functions inside the __getattr__ to something that can be pickled using pickle. This means that you can run instance methods of managed objects created with this proxy without resorting to using multiprocess. The result is this modified proxy:

from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np

class A:
def __init__(self, name, method):
self.name = name
self.method = method

def get(self, *args, **kwargs):
return self.method(self.name, args, kwargs)

class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """

def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
return A(name, self._callmethod).get
return result

Solution

Now we only need to make sure that when we are creating objects of monte_carlo, we do so using managers and the above proxy. For that, we create a class constructor called create. All objects for monte_carlo should be created with this function. With that, the final code looks like this:

from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np

class A:
def __init__(self, name, method):
self.name = name
self.method = method

def get(self, *args, **kwargs):
return self.method(self.name, args, kwargs)

class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """

def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
return A(name, self._callmethod).get
return result

class monte_carlo:

def __init__(self, ):
self.x = np.ones((1000, 3))
self.E = np.mean(self.x)
self.Elist = []
self.T = None

def simulation(self, temperature):
self.T = temperature
for i in range(3000):
self.MC_step()
if i % 10 == 0:
self.Elist.append(self.E)
return

def MC_step(self):
x = self.x.copy()
k = np.random.randint(1000)
x[k] = (x[k] + np.random.uniform(-1, 1, 3))
temp_E = np.mean(self.x)
if np.random.random() < np.exp((self.E - temp_E) / self.T):
self.E = temp_E
self.x = x
return

@classmethod
def create(cls, *args, **kwargs):
# Register class
class_str = cls.__name__
BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
# Start a manager process
manager = BaseManager()
manager.start()

# Create and return this proxy instance. Using this proxy allows sharing of state between processes.
inst = eval("manager.{}(*args, **kwargs)".format(class_str))
return inst

def proba(dE,pT):
return np.exp(-dE/pT)

if __name__ == "__main__":
Tlist = [1.1, 1.2, 1.3]
N = len(Tlist)
G = []

# Create our managed instances
for _ in range(N):
G.append(monte_carlo.create())

for _ in range(5):

# Run simulations in the manager server
results = []
with Pool(8) as pool:

for i in range(N): # this loop should be ran in multiprocess
results.append(pool.apply_async(G[i].simulation, (Tlist[i], )))

# Wait for the simulations to complete
for result in results:
result.get()

for i in range(N // 2):
dE = G[i].E - G[i + 1].E
pT = G[i].T + G[i + 1].T
p = proba(dE, pT) # (proba is a function, giving a probability depending on dE)
if np.random.random() < p:
T_temp = Tlist[i]
Tlist[i] = Tlist[i + 1]
Tlist[i + 1] = T_temp

print(Tlist)

This meets the criteria you wanted. It does not create any copies at all, rather, all arguments to the simulation method call are serialized inside the pool and sent to the manager server where the object is actually stored. It gets executed there, and the results (if any) are serialized and returned in the main process. All of this, with only using the builtins!

Output

[1.2, 1.1, 1.3]

Edit

Since you are using Linux, I encourage you to use multiprocessing.set_start_method inside the if __name__ ... clause to set the start method to "spawn". Doing this will ensure that the child processes do not have access to variables defined inside the clause.

How to solve memory issues while multiprocessing using Pool.map()?

Prerequisite

  1. In Python (in the following I use 64-bit build of Python 3.6.5) everything is an object. This has its overhead and with getsizeof we can see exactly the size of an object in bytes:

    >>> import sys
    >>> sys.getsizeof(42)
    28
    >>> sys.getsizeof('T')
    50
  2. When fork system call used (default on *nix, see multiprocessing.get_start_method()) to create a child process, parent's physical memory is not copied and copy-on-write technique is used.
  3. Fork child process will still report full RSS (resident set size) of the parent process. Because of this fact, PSS (proportional set size) is more appropriate metric to estimate memory usage of forking application. Here's an example from the page:
  • Process A has 50 KiB of unshared memory
  • Process B has 300 KiB of unshared memory
  • Both process A and process B have 100 KiB of the same shared memory region

Since the PSS is defined as the sum of the unshared memory of a process and the proportion of memory shared with other processes, the PSS for these two processes are as follows:

  • PSS of process A = 50 KiB + (100 KiB / 2) = 100 KiB
  • PSS of process B = 300 KiB + (100 KiB / 2) = 350 KiB

The data frame

Not let's look at your DataFrame alone. memory_profiler will help us.

justpd.py

#!/usr/bin/env python3

import pandas as pd
from memory_profiler import profile

@profile
def main():
with open('genome_matrix_header.txt') as header:
header = header.read().rstrip('\n').split('\t')

gen_matrix_df = pd.read_csv(
'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)

gen_matrix_df.info()
gen_matrix_df.info(memory_usage='deep')

if __name__ == '__main__':
main()

Now let's use the profiler:

mprof run justpd.py
mprof plot

We can see the plot:

memory_profile

and line-by-line trace:

Line #    Mem usage    Increment   Line Contents
================================================
6 54.3 MiB 54.3 MiB @profile
7 def main():
8 54.3 MiB 0.0 MiB with open('genome_matrix_header.txt') as header:
9 54.3 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t')
10
11 2072.0 MiB 2017.7 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
12
13 2072.0 MiB 0.0 MiB gen_matrix_df.info()
14 2072.0 MiB 0.0 MiB gen_matrix_df.info(memory_usage='deep')

We can see that the data frame takes ~2 GiB with peak at ~3 GiB while it's being built. What's more interesting is the output of info.

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB

But info(memory_usage='deep') ("deep" means introspection of the data deeply by interrogating object dtypes, see below) gives:

memory usage: 7.9 GB

Huh?! Looking outside of the process we can make sure that memory_profiler's figures are correct. sys.getsizeof also shows the same value for the frame (most probably because of custom __sizeof__) and so will other tools that use it to estimate allocated gc.get_objects(), e.g. pympler.

# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()

Gives:

                                             types |   # objects |   total size
================================================== | =========== | ============
<class 'pandas.core.series.Series | 34 | 7.93 GB
<class 'list | 7839 | 732.38 KB
<class 'str | 7741 | 550.10 KB
<class 'int | 1810 | 49.66 KB
<class 'dict | 38 | 7.43 KB
<class 'pandas.core.internals.SingleBlockManager | 34 | 3.98 KB
<class 'numpy.ndarray | 34 | 3.19 KB

So where do these 7.93 GiB come from? Let's try to explain this. We have 4M rows and 34 columns, which gives us 134M values. They are either int64 or object (which is a 64-bit pointer; see using pandas with large data for detailed explanation). Thus we have 134 * 10 ** 6 * 8 / 2 ** 20 ~1022 MiB only for values in the data frame. What about the remaining ~ 6.93 GiB?

String interning

To understand the behaviour it's necessary to know that Python does string interning. There are two good articles (one, two) about string interning in Python 2. Besides the Unicode change in Python 3 and PEP 393 in Python 3.3 the C-structures have changed, but the idea is the same. Basically, every short string that looks like an identifier will be cached by Python in an internal dictionary and references will point to the same Python objects. In other word we can say it behaves like a singleton. Articles that I mentioned above explain what significant memory profile and performance improvements it gives. We can check if a string is interned using interned field of PyASCIIObject:

import ctypes

class PyASCIIObject(ctypes.Structure):
_fields_ = [
('ob_refcnt', ctypes.c_size_t),
('ob_type', ctypes.py_object),
('length', ctypes.c_ssize_t),
('hash', ctypes.c_int64),
('state', ctypes.c_int32),
('wstr', ctypes.c_wchar_p)
]

Then:

>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0

With two strings we can also do identity comparison (addressed in memory comparison in case of CPython).

>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True

Because of that fact, in regard to object dtype, the data frame allocates at most 20 strings (one per amino acids). Though, it's worth noting that Pandas recommends categorical types for enumerations.

Pandas memory

Thus we can explain the naive estimate of 7.93 GiB like:

>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953

Note that str_size is 58 bytes, not 50 as we've seen above for 1-character literal. It's because PEP 393 defines compact and non-compact strings. You can check it with sys.getsizeof(gen_matrix_df.REF[0]).

Actual memory consumption should be ~1 GiB as it's reported by gen_matrix_df.info(), it's twice as much. We can assume it has something to do with memory (pre)allocation done by Pandas or NumPy. The following experiment shows that it's not without reason (multiple runs show the save picture):

Line #    Mem usage    Increment   Line Contents
================================================
8 53.1 MiB 53.1 MiB @profile
9 def main():
10 53.1 MiB 0.0 MiB with open("genome_matrix_header.txt") as header:
11 53.1 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t')
12
13 2070.9 MiB 2017.8 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
14 2071.2 MiB 0.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
15 2071.2 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
16 2040.7 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
...
23 1827.1 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
24 1094.7 MiB -732.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
25 1765.9 MiB 671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
26 1094.7 MiB -671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
27 1704.8 MiB 610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
28 1094.7 MiB -610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
29 1643.9 MiB 549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
30 1094.7 MiB -549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
31 1582.8 MiB 488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
32 1094.7 MiB -488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
33 1521.9 MiB 427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
34 1094.7 MiB -427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
35 1460.8 MiB 366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
36 1094.7 MiB -366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
37 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
...
47 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])

I want to finish this section by a quote from fresh article about design issues and future Pandas2 by original author of Pandas.

pandas rule of thumb: have 5 to 10 times as much RAM as the size of your dataset

Process tree

Let's come to the pool, finally, and see if can make use of copy-on-write. We'll use smemstat (available form an Ubuntu repository) to estimate process group memory sharing and glances to write down system-wide free memory. Both can write JSON.

We'll run original script with Pool(2). We'll need 3 terminal windows.

  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py

Then mprof plot produces:

3 processes

The sum chart (mprof run --nopython --include-children ./script.py) looks like:

Sample Image

Note that two charts above show RSS. The hypothesis is that because of copy-on-write it's doesn't reflect actual memory usage. Now we have two JSON files from smemstat and glances. I'll the following script to covert the JSON files to CSV.

#!/usr/bin/env python3

import csv
import sys
import json

def smemstat():
with open('smemstat.json') as f:
smem = json.load(f)

rows = []
fieldnames = set()
for s in smem['smemstat']['periodic-samples']:
row = {}
for ps in s['smem-per-process']:
if 'script.py' in ps['command']:
for k in ('uss', 'pss', 'rss'):
row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20

# smemstat produces empty samples, backfill from previous
if rows:
for k, v in rows[-1].items():
row.setdefault(k, v)

rows.append(row)
fieldnames.update(row.keys())

with open('smemstat.csv', 'w') as out:
dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
dw.writeheader()
list(map(dw.writerow, rows))

def glances():
rows = []
fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
'free', 'mem_critical', 'inactive', 'shared', 'history_size',
'mem_warning', 'total', 'active', 'buffers']
with open('glances.csv', 'w') as out:
dw = csv.DictWriter(out, fieldnames=fieldnames)
dw.writeheader()
with open('glances.json') as f:
for l in f:
d = json.loads(l)
dw.writerow(d['mem'])

if __name__ == '__main__':
globals()[sys.argv[1]]()

First let's look at free memory.

Sample Image

The difference between first and minimum is ~4.15 GiB. And here is how PSS figures look like:

Sample Image

And the sum:

Sample Image

Thus we can see that because of copy-on-write actual memory consumption is ~4.15 GiB. But we're still serialising data to send it to worker processes via Pool.map. Can we leverage copy-on-write here as well?

Shared data

To use copy-on-write we need to have the list(gen_matrix_df_list.values()) be accessible globally so the worker after fork can still read it.

  1. Let's modify code after del gen_matrix_df in main like the following:

    ...
    global global_gen_matrix_df_values
    global_gen_matrix_df_values = list(gen_matrix_df_list.values())
    del gen_matrix_df_list

    p = Pool(2)
    result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
    ...
  2. Remove del gen_matrix_df_list that goes later.
  3. And modify first lines of matrix_to_vcf like:

    def matrix_to_vcf(i):
    matrix_df = global_gen_matrix_df_values[i]

Now let's re-run it. Free memory:

free

Process tree:

process tree

And its sum:

sum

Thus we're at maximum of ~2.9 GiB of actual memory usage (the peak main process has while building the data frame) and copy-on-write has helped!

As a side note, there's so called copy-on-read, the behaviour of Python's reference cycle garbage collector, described in Instagram Engineering (which led to gc.freeze in issue31558). But gc.disable() doesn't have an impact in this particular case.

Update

An alternative to copy-on-write copy-less data sharing can be delegating it to the kernel from the beginning by using numpy.memmap. Here's an example implementation from High Performance Data Processing in Python talk. The tricky part is then to make Pandas to use the mmaped Numpy array.

Shared-memory and multiprocessing

After a fork(), both parent and child "see" the same address space. The first time either changes the memory at a common address, the copy-on-write (COW) mechanism has to clone the page containing that address. So, for purposes of creating COW pages, it doesn't matter whether the mutations occur in the child or in the parent.

In your second code snippet, you left out the most important part: exactly where big_list was created. Since you said you can get away with del big_list in the child, big_list probably existed before you forked the worker process. If so, then - as above - it doesn't really matter to your symptom whether big_list is modified in the parent or the child.

To avoid this, create big_list after creating your child process. Then the address space it lives in won't be shared. Or, in Python 3.4 or later, use multiprocessing.set_start_method('spawn'). Then fork() won't be used to create child processes, and no address space is shared at all (which is always the case on Windows, which doesn't have fork()).

Python multiprocessing, read input from child process

First of all you should modify Child_Process_02 so as to not burn so many CPU cycles doing unproductive calls to c1_c2_q.get_nowait() by using the blocking call c1_c2_q.get(), which will not return until a message is received (and you can get rid of the try/except scaffolding around this call).

Second, and this is optional, since messages you are passing from Child_Process_01 to Child_Process_02 have one producer and one consumer, you can gain efficiency by replacing multiprocessing.Queue with a call to multiprocessing.Pipe(duplex=False), which will return two unidirectional multiprocessing.connection.Connection instances the second of which is suitable for sending arbitrary objects such as strings and the first of which for receiving objects (by the way the multiprocessing.Queue is implemented on top of the Pipe). But a full-duplex Pipe where each connection can be used for both sending and receiving is definitely what you want for the following program where a thread started by the main process is "listening" for requests to do input calls by calling multiprocessing.connection.Connection.recv() in a loop. The message returned by this call is the prompt string to be used for the call to input. The value returned by the input function will then be sent back on the connection:

from multiprocessing import Process, Pipe
from threading import Thread

def inputter(input_conn):
""" get requests to do input calls """
while True:
input_msg = input_conn.recv()
value = input(input_msg)
input_conn.send(value) # send inputted value:

def worker(msg_conn, input_conn):
while True:
message = msg_conn.recv()
if message is None:
break
if message == 'do input':
# send inputter our prompt message:
input_conn.send('Enter x: ')
# get back the result of the input:
x = (int)(input_conn.recv())
print('The value entered was', x)
else:
print('Got message:', message)

if __name__ == '__main__':
import time

# create the connections for sending messages from one process to another:
recv_conn, send_conn = Pipe(duplex=False)

# create the connections for doing the input requests:
input_conn1, input_conn2 = Pipe(duplex=True) # each connection is bi-drectional

# start the inputter thread with one of the inputter duplex connections:
t = Thread(target=inputter, args=(input_conn1,), daemon=True)
t.start()

# start a child process with the message connection in lieu of a Queue
# and the other inputter connection:
p = Process(target=worker, args=(recv_conn, input_conn2))
p.start()

# send messages to worker process:
send_conn.send('a')
send_conn.send('do input')
send_conn.send('b')
# signal the child process to terminate:
send_conn.send(None)
p.join()

Prints:

Got message: a
Enter x: 8
The value entered was 8
Got message: b

NOTE

It should be noted that a multiprocessing.Queue starts a feeder thread for the underlying multiprocessing.Pipe to prevent the "putter" from prematurely blocking before maxsize calls to put have been made where maxsize is the value used to instantiate the Queue. But this is also why the multiprocessing.Queue does not perform as well as the multiprocessing.Pipe. But this also means that calling send repeatedly on a connection without corresponding recv calls on the other end will eventually block. But given that you have specified maxsize=1 on your queue, this is hardly an issue as you would be blocking under the same circumstances with your queue.

Giving access to shared memory after child processes have already started

Your problem sounds like a perfect fit for the posix_ipc or sysv_ipc modules, which expose either the POSIX or SysV APIs for shared memory, semaphores, and message queues. The feature matrix there includes excellent advice for picking amongst the modules he provides.

The problem with anonymous mmap(2) areas is that you cannot easily share them with other processes -- if they were file-backed, it'd be easy, but if you don't actually need the file for anything else, it feels silly. You could use the CLONE_VM flag to the clone(2) system call if this were in C, but I wouldn't want to try using it with a language interpreter that probably makes assumptions about memory safety. (It'd be a little dangerous even in C, as maintenance programmers five years from now might also be shocked by the CLONE_VM behavior.)

But the SysV and newer POSIX shared memory mappings allow even unrelated processes to attach and detach from shared memory by identifier, so all you need to do is share the identifier from the processes that create the mappings with the processes that consume the mappings, and then when you manipulate data within the mappings, they are available to all processes simultaneously without any additional parsing overhead. The shm_open(3) function returns an int that is used as a file descriptor in later calls to ftruncate(2) and then mmap(2), so other processes can use the shared memory segment without a file being created in the filesystem -- and this memory will persist even if all processes using it have exited. (A little strange for Unix, perhaps, but it is flexible.)

multiprocessing fork() vs spawn()

  1. is it that the fork is much quicker 'cuz it does not try to identify which resources to copy?

Yes, it's much quicker. The kernel can clone the whole process and only copies modified memory-pages as a whole. Piping resources to a new process and booting the interpreter from scratch is not necessary.


  1. is it that, since fork duplicates everything, it would "waste" much more resources comparing to spawn()?

Fork on modern kernels does only "copy-on-write" and it only affects memory-pages which actually change. The caveat is that "write" already encompasses merely iterating over an object in CPython. That's because the reference-count for the object gets incremented.

If you have long running processes with lots of small objects in use, this can mean you waste more memory than with spawn. Anecdotally I recall Facebook claiming to have memory-usage reduced considerably with switching from "fork" to "spawn" for their Python-processes.



Related Topics



Leave a reply



Submit