Python Multiprocessing Safely Writing to a File

Python multiprocessing safely writing to a file

@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
'''stupidly simulates long running process'''
start = time.clock()
s = 'this is a test'
txt = s
for i in range(200000):
txt += s
done = time.clock() - start
with open(fn, 'rb') as f:
size = len(f.read())
res = 'Process' + str(arg), str(size), done
q.put(res)
return res

def listener(q):
'''listens for messages on the q, writes to file. '''

with open(fn, 'w') as f:
while 1:
m = q.get()
if m == 'kill':
f.write('killed')
break
f.write(str(m) + '\n')
f.flush()

def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)

#put listener to work first
watcher = pool.apply_async(listener, (q,))

#fire off workers
jobs = []
for i in range(80):
job = pool.apply_async(worker, (i, q))
jobs.append(job)

# collect results from the workers through the pool result queue
for job in jobs:
job.get()

#now we are done, kill the listener
q.put('kill')
pool.close()
pool.join()

if __name__ == "__main__":
main()

Python: Writing to a single file with queue while using multiprocessing Pool

Multiprocessing pools implement a queue for you. Just use a pool method that returns the worker return value to the caller. imap works well:

import multiprocessing 
import re

def mp_worker(filename):
with open(filename) as f:
text = f.read()
m = re.findall("x+", text)
count = len(max(m, key=len))
return filename, count

def mp_handler():
p = multiprocessing.Pool(32)
with open('infilenamess.txt') as f:
filenames = [line for line in (l.strip() for l in f) if line]
with open('results.txt', 'w') as f:
for result in p.imap(mp_worker, filenames):
# (filename, count) tuples from worker
f.write('%s: %d\n' % result)

if __name__=='__main__':
mp_handler()

python multiprocessing writing to shared file

Why would this happen?

There are several processes which possibly try to call

open_file.write(data)
open_file.flush()

at the same time. Which behavior would be fitting, in your eyes, if something like

  • a.write
  • b.write
  • a.flush
  • c.write
  • b.flush

happens?

Can you not have many multiprocessing units writing to the same file? Do you need to use a Lock? A Queue?

Python multiprocessing safely writing to a file recommends having one queue, which is the read by one process which writes to the file. So do Writing to a file with multiprocessing and Processing single file from multiple processes in python.

Writing to file in Pool multiprocessing (Python 2.7)

You shouldn't be letting all the workers/processes write to a single file. They can all read from one file (which may cause slow downs due to workers waiting for one of them to finish reading), but writing to the same file will cause conflicts and potentially corruption.

Like said in the comments, write to separate files instead and then combine them into one on a single process. This small program illustrates it based on the program in your post:

from multiprocessing import Pool

def f(args):
''' Perform computation and write
to separate file for each '''
x = args[0]
fname = args[1]
with open(fname, 'w') as fout:
fout.write(str(x) + "\n")

def fcombine(orig, dest):
''' Combine files with names in
orig into one file named dest '''
with open(dest, 'w') as fout:
for o in orig:
with open(o, 'r') as fin:
for line in fin:
fout.write(line)

if __name__ == '__main__':
# Each sublist is a combination
# of arguments - number and temporary output
# file name
x = range(1,4)
names = ['temp_' + str(y) + '.txt' for y in x]
args = list(zip(x,names))

p = Pool(3)
p.map(f, args)

p.close()
p.join()

fcombine(names, 'final.txt')

It runs f for each argument combination which in this case are value of x and temporary file name. It uses a nested list of argument combinations since pool.map does not accept more than one arguments. There are other way to go around this, especially on newer Python versions.

For each argument combination and pool member it creates a separate file to which it writes the output. In principle your output will be longer, you can simply add another function that computes it to the f function. Also, no need to use Pool(5) for 3 arguments (though I assume that only three workers were active anyway).

Reasons for calling close() and join() are explained well in this post. It turns out (in the comment to the linked post) that map is blocking, so here you don't need them for the original reasons (wait till they all finish and then write to the combined output file from just one process). I would still use them in case other parallel features are added later.

In the last step, fcombine gathers and copies all the temporary files into one. It's a bit too nested, if you for instance decide to remove the temporary file after copying, you may want to use a separate function under the with open('dest', ).. or the for loop underneath - for readability and functionality.

Writing to a file with multiprocessing

You really should use two queues and three separate kinds of processing.

  1. Put stuff into Queue #1.

  2. Get stuff out of Queue #1 and do calculations, putting stuff in Queue #2. You can have many of these, since they get from one queue and put into another queue safely.

  3. Get stuff out of Queue #2 and write it to a file. You must have exactly 1 of these and no more. It "owns" the file, guarantees atomic access, and absolutely assures that the file is written cleanly and consistently.

Trouble writing python output to file with multiprocessing pool

I think you're well into undefined territory here, but just flushing the file works for me. for example:

from multiprocessing import Pool

log_output = open('log', 'w')

def sum_square(number):
print(number, file=log_output)
log_output.flush()

if __name__ == "__main__":
with Pool() as p:
p.map(sum_square, range(5))

works for me in Python 3.8 under Linux. I've also shortened your code to clarify the issue, and make it slightly more idiomatic.

Multiprocessing write to large file out of memory

Since imap_unordered doesn't apply any back pressure to the worker processes, I suspect the results are backing up in the internal results queue of IMapUnorderedIterator. If that's the case, you have three options:

  • Write the results faster in the main process. Try returning the string f"\x1e{json.dumps(obj)}\n" from your workers rather than dumping in the main process. If that doesn't work:
  • Write temporary files in the workers and concatenate them in a second pass in the main process. Workers will interfere with each other's writes if you try to have them all append the final file simultaneously. You should be able to do this using minimal extra disk space. Note that you can do json.dump directly into a file object. Alternatively you could guard worker writes to the same file with a multiprocessing.Lock. If the extra writes are too time consuming:
  • Manage back pressure yourself. Use Pool.apply_async or ProcessPoolExecutor.submit to start cpu_count jobs and only submit additional work after writing a result to disk. It's less automatic than Pool.imap_unordered but that's the kind of thing you have to deal with when you're data starts getting big!

Writing to same file(s) with multiprocessing (avoid lock)

Make the filename unique for each subprocess:

def process_files (file, id):
res_path = r'd:\results'
for line in file:
matches = set(regex.findall(line))
for match in matches:
filename = "{}_{}.csv".format(match, id)
res_file = os.path.join(res_path, filename)
with open(res_file, 'a') as rf:
rf.write(line)

def main():

p = multiprocessing.Pool()
for id, file in enumerate(glob.iglob(r'D:\csv_files\**\*.csv', recursive=True)):
p.apply_async(process, [file, id])

then you will have to add some code to consolidate the different "_.csv" files in single ".csv" files.

Concurrent writes on a same file is something you want to avoid - either you don't have file locks and you end up with corrupted data, or you have file locks and then it will slow down the process, which defeats the whole point of parallelizing it.



Related Topics



Leave a reply



Submit