Processing Single File from Multiple Processes

Processing single file from multiple processes

What you are looking for is a Producer/Consumer pattern

Basic threading example

Here is a basic example using the threading module (instead of multiprocessing)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()

if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20

# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()

# produce data
for i in xrange(total):
work.put(i)

work.join()

# get the results
for i in xrange(total):
print results.get()

sys.exit()

You wouldn't share the file object with the threads. You would produce work for them by supplying the queue with lines of data. Then each thread would pick up a line, process it, and then return it in the queue.

There are some more advanced facilities built into the multiprocessing module to share data, like lists and special kind of Queue. There are trade-offs to using multiprocessing vs threads and it depends on whether your work is cpu bound or IO bound.

Basic multiprocessing.Pool example

Here is a really basic example of a multiprocessing Pool

from multiprocessing import Pool

def process_line(line):
return "FOO: %s" % line

if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)

print results

A Pool is a convenience object that manages its own processes. Since an open file can iterate over its lines, you can pass it to the pool.map(), which will loop over it and deliver lines to the worker function. Map blocks and returns the entire result when its done. Be aware that this is an overly simplified example, and that the pool.map() is going to read your entire file into memory all at once before dishing out work. If you expect to have large files, keep this in mind. There are more advanced ways to design a producer/consumer setup.

Manual "pool" with limit and line re-sorting

This is a manual example of the Pool.map, but instead of consuming an entire iterable in one go, you can set a queue size so that you are only feeding it piece by piece as fast as it can process. I also added the line numbers so that you can track them and refer to them if you want, later on.

from multiprocessing import Process, Manager
import time
import itertools

def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item

# exit signal
if line == None:
return

# fake work
time.sleep(.5)
result = (line_no, line)

out_list.append(result)

if __name__ == "__main__":
num_workers = 4

manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)

# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)

# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)

for p in pool:
p.join()

# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)

Safe to have multiple processes writing to the same file at the same time? [CentOs 6, ext4]

What you're doing seems perfectly OK, provided you're using the POSIX "raw" IO syscalls such as read(), write(), lseek() and so forth.

If you use C stdio (fread(), fwrite() and friends) or some other language runtime library which has its own userspace buffering, then the answer by "Tilo" is relevant, in that due to the buffering, which is to some extent outside your control, the different processes might overwrite each other's data.

Wrt OS locking, while POSIX states that writes or reads less than of size PIPE_BUF are atomic for some special files (pipes and FIFO's), there is no such guarantee for regular files. In practice, I think it's likely that IO's within a page are atomic, but there is no such guarantee. The OS only does locking internally to the extent that is necessary to protect its own internal data structures. One can use file locks, or some other interprocess communication mechanism, to serialize access to files. But, all this is relevant only of you have several processes doing IO to the same region of a file. In your case, as your processes are doing IO to disjoint sections of the file, none of this matters, and you should be fine.

Python 3: How to write to the same file from multiple processes without messing it up?

If this is a continuation of your project from yesterday you already have your download list in memory - just remove the entries from the loaded list as their processes finish download and only write down the whole list over the input file once you're exiting the 'downloader'. There is no reason to constantly write down the changes.

If you want to know (say from an external process) when a url gets downloaded even while your 'downloader' is running, write in a downloaded.dat a new line each time a process returns that download was successful.

Of course, in both cases, write from within your main process/thread so you don't have to worry about mutex.

UPDATE - Here's how to do it with an additional file, using the same code base as yesterday:

def init_downloader(params):  # our downloader initializator
downloader = Downloader(**params[0]) # instantiate our downloader
downloader.run(params[1]) # run our downloader
return params # job finished, return the same params for identification

if __name__ == "__main__": # important protection for cross-platform use

downloader_params = [ # Downloaders will be initialized using these params
{"port_number": 7751},
{"port_number": 7851},
{"port_number": 7951}
]
downloader_cycle = cycle(downloader_params) # use a cycle for round-robin distribution

with open("downloaded_links.dat", "a+") as diff_file: # open your diff file
diff_file.seek(0) # rewind the diff file to the beginning to capture all lines
diff_links = {row.strip() for row in diff_file} # load downloaded links into a set
with open("input_links.dat", "r+") as input_file: # open your input file
available_links = []
download_jobs = [] # store our downloader parameters + a link here
# read our file line by line and filter out downloaded links
for row in input_file: # loop through our file
link = row.strip() # remove the extra whitespace to get the link
if link not in diff_links: # make sure link is not already downloaded
available_links.append(row)
download_jobs.append([next(downloader_cycle), link])
input_file.seek(0) # rewind our input file
input_file.truncate() # clear out the input file
input_file.writelines(available_links) # store back the available links
diff_file.seek(0) # rewind the diff file
diff_file.truncate() # blank out the diff file now that the input is updated
# and now let's get to business...
if download_jobs:
download_pool = Pool(processes=5) # make our pool use 5 processes
# run asynchronously so we can capture results as soon as they ar available
for response in download_pool.imap_unordered(init_downloader, download_jobs):
# since it returns the same parameters, the second item is a link
# add the link to our `diff` file so it doesn't get downloaded again
diff_file.write(response[1] + "\n")
else:
print("Nothing left to download...")

The whole idea is, as I wrote in the comment, to use a file to store downloaded links as they get downloaded, and then on the next run to filter out the downloaded links and update the input file. That way even if you forcibly kill it, it will always resume where it left off (except for the partial downloads).

Can multiple processes write to the same folder?

This has absolutely nothing to do with Python, as file operations in Python use OS level system calls (unless run as root, your Python program would not have permissions to do raw device writes anyway and doing them as root would be incredibly stupid).

A little bit of file system theory if anyone cares to read:

Yes, if you study file system architecture and how data is actually stored on drives, there are similarities between files and directories - but only on data storage level. The reason being there is no need to separate these two. For example ext4 file system has a method of storing information about a file (metadata), stored in small units called inodes, and the actual file itself. Inode contains a pointer to the actual disk space where file data can be found.

File systems generally are rather agnostic to directories. A file system is basically just this: it contains information about free disk space, information about files with pointers to data, and the actual data. Part of metadata is the directory where the file resides. In modern file systems (ancient FAT is the exception that is still in use) data storage on disk is not related to directories. Directories are used to allow both humans and the computer implementing the file system locate files and folders quickly instead of walking through sequentially the list of inodes until the correct file is found.

You may have read that directories are just files. Yes, they are "files" that contain either a list of files in it (or actually a tree but please do not confuse this with a directory tree - it is just a mechanism of storing information about large directories so that files in that directory do not need to be searched sequentially within the directory entry). The reason this is a file is that it is the mechanism how file systems store data. There is no need to have a specific data storage mechanism, as a directory only contains a list of files and pointers to their inodes. You could think of it as a database or even simpler, a text file. But in the end it is just a file that contains pointers, not something that is allocated on the disk surface to contain the actual files stored in the directory.

That was the background.

The file system implementation on your computer is just a piece of software that knows how to deal with all this. When you open a file in a certain directory for writing, something like this usually happens:

  1. A free inode is located and an entry created there
  2. Free clusters / blocks database is queried to find storage space for the file contents
  3. File data is stored and blocks/clusters are marked "in use" in that database
  4. Inode is updated to contain file metadata and a
    pointer to this disk space
  5. "File" containing the directory data of
    the target directory is located
  6. This file is modified so that one
    record is added. This record has a pointer to the inode just
    created, and the file name as well
  7. Inode of the file is updated to
    contain a link to the directory, too.

It is the job of operating system and file system driver within it to ensure all this happens consistently. In practice it means the file system driver queues operations. Writing several files into the same directory simultaneously is a routine operation - for example web browser cache directories get updated this way when you browse the internet. Under the hood the file system driver queues these operations and completes steps 1-7 for each new file before it starts processing the following operation.

To make it a bit more complex there is a journal acting as an intermediate buffer. Your transactions are written to the journal, and when the file system is idle, the file system driver commits the journal transactions to the actual storage space, but theory remains the same. This is a performance and reliability issue.

You do not need to worry about this on application level, as it is the job of the operating system to do all that.

In contrast, if you create a lot of randomly named files in the same directory, in theory there could be a conflict at some point if your random name generator produced two identical file names. There are ways to mitigate this, and this would be the part you need to worry about in your application. But anything deeper than that is the task of the operating system.

How can create a parallel processing to make multiple processes downloading and writing?

You can probably achieve some improvement by concurrent downloading segments of the file and for this multithreading should be sufficient. The number of threads to use is something that will be needed to experiment with to see not only how it effects the network performance but also the concurrent writing of the disk output. For the latter the suggestion of @showkey to use a memory-mapped file is a good one but to use multiprocessing with a memory-mapped file you need to be on a Linux-like platform, which I am guessing you might be running on (you are supposed to tag your question with the platform whenever you tag a question with multiprocessing just so I don't have to guess). But since on Windows the memory map cannot be shared across processes and you are forced to use threading, that is what I will use. And since a thread will be retrieving its segment in chunks, it may very well be releasing the GIL periodically allowing the other threads to run. Of course, if you are running under Linux, it is a question of modifying the import statements (it should be self-explanatory), although again it is not clear whether the most efficient size of that pool would be the number of cores you have.

By using a memory mapped file, there is no need for any final merging of files. In method __init__, you should set self.fn to the final filename you want.

import requests, os
# This uses multithreading:
#from multiprocessing.dummy import Pool
# This uses multiprocessing:
from multiprocessing.pool import Pool
import sys
import mmap
import time

#NUM_PROCESSES = os.cpu_count() # You must experiment with this
NUM_PROCESSES = 16

def init_pool(the_mm):
global mm
mm = the_mm

class my_download(object):
def __init__(self,url):
self.url = url
self.process_num = NUM_PROCESSES
self.fn = url.split('/')[-1] # Or whatever final filename you want
url_headers = requests.head(self.url)
self.size = int(url_headers.headers['Content-Length'])
self.ranges = self.get_file_range()

def get_file_range(self):
ranges = []
download_num = int(self.size/self.process_num)
for i in range(self.process_num):
if i == self.process_num-1:
ranges.append((download_num*i,''))
else:
ranges.append((download_num*i,download_num*(i+1)))
print('ranges:', ranges)
return ranges

def run_task(self,i):
print('process {} start'.format(str(i)))
headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
print(headers, flush=True)
r = requests.get(self.url, headers=headers,stream=True)
offset = self.ranges[i][0]
for chunk in r.iter_content(chunk_size=1024):
if chunk:
size = len(chunk)
mm[offset:offset+size] = chunk
offset += size
print('process {} end'.format(str(i)))

def run(self):
with open(self.fn, 'wb') as f:
if sys.platform != 'win32':
# If not Windows then memory-mapped file size cannot be greater than disk file size:
f.seek(self.size - 1)
f.write(b'\0') # But for Windows a 1-byte file will suffice
# Re-open:
with open(self.fn, 'rb+') as f:
# Create memory-mapped file on top of disk file:
with mmap.mmap(f.fileno(), self.size) as mm:
pool = Pool(processes = self.process_num, initializer=init_pool, initargs=(mm,))
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()

start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()
print(time.time() - start)

The following program that uses a memory-mapped file but downloads the file in a single request took 157 seconds compared with the above program that took 85 seconds on my desktop:

import mmap
import requests
import sys
import time

start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
url_headers = requests.head(url)
size = int(url_headers.headers['Content-Length'])
with open(fn, 'wb') as f:
if sys.platform != 'win32':
# If not Windows then memory-mapped file size cannot be greater than disk file size:
f.seek(size - 1)
f.write(b'\0') # But for Windows a 1-byte file will suffice
# Reopen:
with open(fn, 'rb+') as f:
# memory-map the file, size 0 means whole file
with mmap.mmap(f.fileno(), length=size) as mm:
r = requests.get(url, stream=True)
offset = 0
for chunk in r.iter_content(chunk_size=1024):
if chunk:
size = len(chunk)
mm[offset:offset+size] = chunk
offset += size
print(time.time() - start)

The following program downloads the file as a single request but does not use a memory-mapped file for writing the output and took 239 seconds.

import requests
import sys
import time

start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
with open(fn, 'wb') as f:
r = requests.get(url, stream=True)
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
print(time.time() - start)

Summary:

Download file in 16 segments using memory-mapped file: 85 seconds
Download file in 1 segment using memory-mapped file: 157 seconds
Download file in 1 segment without using memory-mapped file: 239 seconds.

In addition to trying different values for NUM_PROCESSES, you might want to try to increase the size of the chunk_size argument used with the iter_content method, although it may have no effect.



Related Topics



Leave a reply



Submit