How to Process Lines of a File in Parallel

Read file line by line with GNU parallel

If you have 800M lines, I think you need something faster than running a job for each line.

So how about:

sort --parallel=100 -k4 input.tsv |
parallel --pipe --group-by 4 --colsep '\s+' -kN1 'cat > num{#}.bed'

newname() {
head -n1 "$1" | parallel --colsep '\s+' mv "$1" {4}.bed
}
export -f newname
ls num*bed | parallel newname

On my system this does 100M lines in 15 minutes.

Python, process a large text file in parallel

The simplest approach would probably be to do all 30 files at once with your existing code -- would still take all day, but you'd have all the files done at once. (ie, "9 babies in 9 months" is easy, "1 baby in 1 month" is hard)

If you really want to get a single file done faster, it will depend on how your counters actually update. If almost all the work is just in analysing value you can offload that using the multiprocessing module:

import time
import multiprocessing

def slowfunc(value):
time.sleep(0.01)
return value**2 + 0.3*value + 1

counter_a = counter_b = counter_c = 0
def add_to_counter(res):
global counter_a, counter_b, counter_c
counter_a += res
counter_b -= (res - 10)**2
counter_c += (int(res) % 2)

pool = multiprocessing.Pool(50)
results = []

for value in range(100000):
r = pool.apply_async(slowfunc, [value])
results.append(r)

# don't let the queue grow too long
if len(results) == 1000:
results[0].wait()

while results and results[0].ready():
r = results.pop(0)
add_to_counter(r.get())

for r in results:
r.wait()
add_to_counter(r.get())

print counter_a, counter_b, counter_c

That will allow 50 slowfuncs to run in parallel, so instead of taking 1000s (=100k*0.01s), it takes 20s (100k/50)*0.01s to complete. If you can restructure your function into "slowfunc" and "add_to_counter" like the above, you should be able to get a factor of 24 speedup.

How to parallelize a shell script that reads lines of a text file as input?

With bash (for the $'\n' syntax) and GNU xargs (for the -d and -P arguments):

# runs one python process per line, with whole line passed as an argument
<"$file" xargs -d $'\n' -P10 -n1 python some_script.py

One caveat is that this will pass each line to your Python script as a single argument. If you need the shell to perform string-splitting first:

# string-splits and glob-expands each line into a list of arguments
# reuses each "sh" instance for as many arguments as possible
<"$file" xargs -d $'\n' -P10 sh -c 'for arg; do python some_script.py $arg; done'

Writing to a file parallely while processing in a loop in python

Q : " Writing to a file parallely while processing in a loop in python ... "

A :
Frankly speaking, the file-I/O is not your performance-related enemy.

"With all due respect to the colleagues, Python (since ever) used GIL-lock to avoid any level of concurrent execution ( actually re-SERIAL-ising the code-execution flow into dancing among any amount of threads, lending about 100 [ms] of code-interpretation time to one-AFTER-another-AFTER-another, thus only increasing the interpreter's overhead times ( and devastating all pre-fetches into CPU-core caches on each turn ... paying the full mem-I/O costs on each next re-fetch(es) ). So threading is ANTI-pattern in python (except, I may accept, for network-(long)-transport latency masking ) – user3666197 44 mins ago "

Given about the 65k files, listed in CSV, ought get processed ASAP, the performance-tuned orchestration is the goal, file-I/O being just a negligible ( and by-design well latency-maskable ) part thereof ( which does not mean, we can't screw it even more ( if trying to organise it in another performance-devastating ANTI-pattern ), can we? )


Tip #1 : avoid & resist to use any low-hanging fruit SLOCs if The Performance is the goal


If the code starts with a cheapest-ever iterator-clause,
be it a mock-up for aRow in aCsvDataSET: ...
or the real-code for i in range( len( queries ) ): ... - these (besides being known for ages to be awfully slow part of the python code-interpretation capabilites, the second one being even an iterator-on-range()-iterator in Py3 and even a silent RAM-killer in Py2 ecosystem for any larger sized ranges) look nice in "structured-programming" evangelisation, as they form a syntax-compliant separation of a deeper-level part of the code, yet it does so at an awfully high costs impacts due to repetitively paid overhead-costs accumulation. A finally injected need to "coordinate" unordered concurrent file-I/O operations, not necessary in principle at all, if done smart, are one such example of adverse performance impacts if such a trivial SLOC's ( and similarly poor design decisions' ) are being used.

Better way?

  • a ) avoid the top-level (slow & overhead-expensive) looping
  • b ) "split" the 65k-parameter space into not much more blocks than how many memory-I/O-channels are present on your physical device ( the scoring process, I can guess from the posted text, is memory-I/O intensive, as some model has to go through all the texts for scoring to happen )
  • c ) spawn n_jobs-many process workers, that will joblib.Parallel( n_jobs = ... )( delayed( <_scoring_fun_> )( block_start, block_end, ...<_params_>... ) ) and run the scoring_fun(...) for such distributed block-part of the 65k-long parameter space.
  • d ) having computed the scores and related outputs, each worker-process can and shall file-I/O its own results in its private, exclusively owned, conflicts-prevented output file
  • e ) having finished all partial block-parts' processing, the main-Python process can just join the already ( just-[CONCURRENTLY] created, smoothly & non-blocking-ly O/S-buffered / interleaved-flow, real-hardware-deposited ) stored outputs, if such a need is ...,

    and

    finito - we are done ( knowing there is no faster way to compute the same block-of-tasks, that are principally embarrasingly independent, besides the need to orchestrate them collision-free with minimised-add-on-costs).

If interested in tweaking a real-system End-to-End processing-performance,
start with lstopo-map
next verify the number of physical memory-I/O-channels
and
may a bit experiment with Python joblib.Parallel()-process instantiation, under-subscribing or over-subscribing the n_jobs a bit lower or a bit above the number of physical memory-I/O-channels. If the actual processing has some, hidden to us, maskable latencies, there might be chances to spawn more n_jobs-workers, until the End-to-End processing performance keeps steadily growing, until a system-noise hides any such further performance-tweaking effects

A Bonus part - why un-managed sources of latency kill The Performance

Parallel Computing for Large Text Files

This applies a text-processing function (currently with the re.sub from your question) to NUM_CORES equally sized chunks of your input text file, then writes them out (preserving the order from your original text input file).

from multiprocessing import Pool, cpu_count
import numpy as np
import re

NUM_CORES = cpu_count()

def process_text(input_textlines):
clean_text = []
for line in input_textlines:
cleaned = re.sub("^(\\|)([0-9])(\\s)([A-Z][a-z]+[a-z])\\,", "1\\2\t\\3\\4,", line)
clean_text.append(cleaned)
return "".join(clean_text)

# read in data and convert to sequence of equally-sized chunks
with open('data/text.txt', 'r') as f:
lines = f.readlines()

num_lines = len(lines)
text_chunks = np.array_split(lines, NUM_CORES)

# process each chunk in parallel
pool = Pool(NUM_CORES)
results = pool.map(process_text, text_chunks)

# write out results
with open("new_text.txt", "w", newline="\n") as f:
for text_chunk in results:
f.write(text_chunk)


Related Topics



Leave a reply



Submit