Read file line by line with GNU parallel
If you have 800M lines, I think you need something faster than running a job for each line.
So how about:
sort --parallel=100 -k4 input.tsv |
parallel --pipe --group-by 4 --colsep '\s+' -kN1 'cat > num{#}.bed'
newname() {
head -n1 "$1" | parallel --colsep '\s+' mv "$1" {4}.bed
}
export -f newname
ls num*bed | parallel newname
On my system this does 100M lines in 15 minutes.
Python, process a large text file in parallel
The simplest approach would probably be to do all 30 files at once with your existing code -- would still take all day, but you'd have all the files done at once. (ie, "9 babies in 9 months" is easy, "1 baby in 1 month" is hard)
If you really want to get a single file done faster, it will depend on how your counters actually update. If almost all the work is just in analysing value you can offload that using the multiprocessing module:
import time
import multiprocessing
def slowfunc(value):
time.sleep(0.01)
return value**2 + 0.3*value + 1
counter_a = counter_b = counter_c = 0
def add_to_counter(res):
global counter_a, counter_b, counter_c
counter_a += res
counter_b -= (res - 10)**2
counter_c += (int(res) % 2)
pool = multiprocessing.Pool(50)
results = []
for value in range(100000):
r = pool.apply_async(slowfunc, [value])
results.append(r)
# don't let the queue grow too long
if len(results) == 1000:
results[0].wait()
while results and results[0].ready():
r = results.pop(0)
add_to_counter(r.get())
for r in results:
r.wait()
add_to_counter(r.get())
print counter_a, counter_b, counter_c
That will allow 50 slowfuncs to run in parallel, so instead of taking 1000s (=100k*0.01s), it takes 20s (100k/50)*0.01s to complete. If you can restructure your function into "slowfunc" and "add_to_counter" like the above, you should be able to get a factor of 24 speedup.
How to parallelize a shell script that reads lines of a text file as input?
With bash (for the $'\n'
syntax) and GNU xargs (for the -d
and -P
arguments):
# runs one python process per line, with whole line passed as an argument
<"$file" xargs -d $'\n' -P10 -n1 python some_script.py
One caveat is that this will pass each line to your Python script as a single argument. If you need the shell to perform string-splitting first:
# string-splits and glob-expands each line into a list of arguments
# reuses each "sh" instance for as many arguments as possible
<"$file" xargs -d $'\n' -P10 sh -c 'for arg; do python some_script.py $arg; done'
Writing to a file parallely while processing in a loop in python
Q : " Writing to a file parallely while processing in a loop in python ... "
A :
Frankly speaking, the file-I/O is not your performance-related enemy.
"With all due respect to the colleagues, Python (since ever) used GIL-lock to avoid any level of concurrent execution ( actually re-SERIAL-ising the code-execution flow into dancing among any amount of threads, lending about 100 [ms] of code-interpretation time to one-AFTER-another-AFTER-another, thus only increasing the interpreter's overhead times ( and devastating all pre-fetches into CPU-core caches on each turn ... paying the full mem-I/O costs on each next re-fetch(es) ). So threading is ANTI-pattern in python (except, I may accept, for network-(long)-transport latency masking ) – user3666197 44 mins ago "
Given about the 65k files, listed in CSV, ought get processed ASAP, the performance-tuned orchestration is the goal, file-I/O being just a negligible ( and by-design well latency-maskable ) part thereof ( which does not mean, we can't screw it even more ( if trying to organise it in another performance-devastating ANTI-pattern ), can we? )
Tip #1 : avoid & resist to use any low-hanging fruit SLOCs if The Performance is the goal
If the code starts with a cheapest-ever iterator-clause,
be it a mock-up for aRow in aCsvDataSET: ...
or the real-code for i in range( len( queries ) ): ...
- these (besides being known for ages to be awfully slow part of the python code-interpretation capabilites, the second one being even an iterator-on-range()-iterator in Py3 and even a silent RAM-killer in Py2 ecosystem for any larger sized ranges) look nice in "structured-programming" evangelisation, as they form a syntax-compliant separation of a deeper-level part of the code, yet it does so at an awfully high costs impacts due to repetitively paid overhead-costs accumulation. A finally injected need to "coordinate" unordered concurrent file-I/O operations, not necessary in principle at all, if done smart, are one such example of adverse performance impacts if such a trivial SLOC's ( and similarly poor design decisions' ) are being used.
Better way?
- a ) avoid the top-level (slow & overhead-expensive) looping
- b ) "split" the 65k-parameter space into not much more blocks than how many memory-I/O-channels are present on your physical device ( the scoring process, I can guess from the posted text, is memory-I/O intensive, as some model has to go through all the texts for scoring to happen )
- c ) spawn
n_jobs
-many process workers, that willjoblib.Parallel( n_jobs = ... )( delayed( <_scoring_fun_> )( block_start, block_end, ...<_params_>... ) )
and run thescoring_fun(...)
for such distributed block-part of the 65k-long parameter space. - d ) having computed the scores and related outputs, each worker-process can and shall file-I/O its own results in its private, exclusively owned, conflicts-prevented output file
- e ) having finished all partial block-parts' processing, the
main
-Python process can just join the already ( just-[CONCURRENTLY] created, smoothly & non-blocking-ly O/S-buffered / interleaved-flow, real-hardware-deposited ) stored outputs, if such a need is ...,
and
finito - we are done ( knowing there is no faster way to compute the same block-of-tasks, that are principally embarrasingly independent, besides the need to orchestrate them collision-free with minimised-add-on-costs).
If interested in tweaking a real-system End-to-End processing-performance,
start with lstopo
-map
next verify the number of physical memory-I/O-channels
and
may a bit experiment with Python joblib.Parallel()
-process instantiation, under-subscribing or over-subscribing the n_jobs
a bit lower or a bit above the number of physical memory-I/O-channels. If the actual processing has some, hidden to us, maskable latencies, there might be chances to spawn more n_jobs
-workers, until the End-to-End processing performance keeps steadily growing, until a system-noise hides any such further performance-tweaking effects
A Bonus part - why un-managed sources of latency kill The Performance
Parallel Computing for Large Text Files
This applies a text-processing function (currently with the re.sub
from your question) to NUM_CORES
equally sized chunks of your input text file, then writes them out (preserving the order from your original text input file).
from multiprocessing import Pool, cpu_count
import numpy as np
import re
NUM_CORES = cpu_count()
def process_text(input_textlines):
clean_text = []
for line in input_textlines:
cleaned = re.sub("^(\\|)([0-9])(\\s)([A-Z][a-z]+[a-z])\\,", "1\\2\t\\3\\4,", line)
clean_text.append(cleaned)
return "".join(clean_text)
# read in data and convert to sequence of equally-sized chunks
with open('data/text.txt', 'r') as f:
lines = f.readlines()
num_lines = len(lines)
text_chunks = np.array_split(lines, NUM_CORES)
# process each chunk in parallel
pool = Pool(NUM_CORES)
results = pool.map(process_text, text_chunks)
# write out results
with open("new_text.txt", "w", newline="\n") as f:
for text_chunk in results:
f.write(text_chunk)
Related Topics
Java: Getting a Substring from a String Starting After a Particular Character
How to Append a Query Parameter to an Existing Url
How to Center Crop a Background Image of Linear Layout
How to Map an Auto Increment Field in Hibernate
Generating Unique Random Numbers Effectively in Java
How to Parallelize a Foreach Loop in Java
Appbar Not Showing Back Button
How to Disable Fail_On_Empty_Beans in Jackson
How to Set a Maximum Height With Wrap Content in Android
Convert to Int from String of Numbers Having Comma
How to Upload a Document to Sharepoint With Java
How to Properly Re-Run Spring Boot Application from Eclipse
Ora-00942 Sqlexception With Hibernate (Unable to Find a Table)
Spring Data JPA Problem With Updating Multiple Entities
Setting Custom Key When Pushing New Data to Firebase Database
Sonarqube: Missing Blame Information for the Following Files
Java Error Message. Unexpected Type, Required Variable, Found Value
Print Out All Possible Combinations of an Arraylist<Arraylist<String>> Recursively