Multiprocessing: How to Share a Dict Among Multiple Processes

Share Python dict across many processes

Unfortunately shared memory in Ray must be immutable. Typically, it is recommended that you use actors for mutable state. (see here).

You can do a couple of tricks with actors. For example, you can store object references in your dict if the values are immutable. Then the dict itself won't be in shared memory, but all of its objects would be.

@ray.remote
class DictActor
def __init__(self):
self._dict = {}

def put(self, key, value):
self._dict[key] = ray.put(value)

def get(self, key):
return self._dict[key]

d = DictActor.remote()
ray.get(d.put.remote("a", np.zeros(100)))
ray.get(d.get.remote("a")) # This result is in shared memory.

python multiprocessing - Sharing a dictionary of classes between processes with subsequent writes from the process reflected to the shared memory

This is a "gotcha" that holds a lot of surprises for the uninitiated. The problem is that when you have a managed dictionary, to see updates propagated you need to change a key or a value of a key. Here technically you have not changed the value, that is, you are still referencing the same object instance (type ExampleClass) and are only changing something within that reference. Bizarre, I know. This is the modified method f that you need:

def f(self, dict):
# generate a random index to add the class to
index = str(random.randint(0, 100))

# create a new class at that index
dict[index] = ExampleClass(str(random.randint(100, 200)))

# this is the problem, it doesn't share the updated variables in the dictionary between the processes <----------------------
# attempt to change the created variables
ec = dict[index]
ec.count += 1
ec.stringVar = "yeAH"
dict[index] = ec # show new reference
# print what's inside
for x in dict.values():
print(x.count, x.stringVar)

Note:

Had you used the following code to set the key/pair values, the following would actually print False:

ec = ExampleClass(str(random.randint(100, 200)))
dict[index] = ec
print(dict[index] is ec)

This is why in the modifed method f, dict[index] = ec # show new reference appears to be a new reference being set as the value.

Also, you should consider not using dict, a builtin data type, as a variable name.

How to write to a shared dictionary faster in Python?

Since no process needs to read keys or values from the dictionary, I don't see why parallel_process_for_shortq doesn't just return the key and the value to be stored with the key as a tuple back to the main process, which can then assemble the dictionary from these tuples.

As an aside, I don't get what the self arguments are supposed to be for since it is not defined nor do you seem to be using classes. See How to create a Minimal, Reproducible Example.

def pooler(self, payload):
lis_docs=[]
for keys in payload.keys():
lis_docs.append([keys,payload[keys]])

# parallel process on this
pool=mp.Pool(os.cpu_count()) # Use all processors
results = pool.map(parallel_process_for_shortq, args=(self, doc))
lis_docs = {k: v for k, v in results}
pool.close()
pool.join()

def parallel_process_for_shortq(self,doc,processed_doc):
sentences=tokenize_sentences(doc[1])
modified_text=" ".join(sentences)

keywords=get_keywords(self.nlp,modified_text,4,self.s2v,self.fdist,self.normalized_levenshtein,len(sentences))
keyword_sentence_mapping = get_sentences_for_keyword(keywords, sentences)
# return key and value
return doc[0], [keywords,keyword_sentence_mapping]


Related Topics



Leave a reply



Submit