Is it possible to speed-up python IO?
You can't get any faster than the maximum disk read speed.
In order to reach the maximum disk speed you can use the following two tips:
- Read the file in with a big buffer. This can either be coded "manually" or simply by using io.BufferedReader ( available in python2.6+ ).
- Do the newline counting in another thread, in parallel.
How to achieve Faster File I/O In Python?
From comment:
why do you think that 8 writes result in 8 physical writes to your harddisk? The file object itself buffers what to write, if it decides to write to your OS, your OS might as well wait a little until it physically writes - and even then your harrdrives got buffers that might keep the files content for a while until it starts to really write. See How often does python flush to a file?
You should not use exceptions as control flow, nor recurse where it is not needed. Each recursion prepares new call stacks for the function call - that takes ressources and time - and all of it has to be reverted as well.
The best thing to do would be to clean up your data before feeding it into the json.load() ... the next best thing to do would be to avoid recursing ... try something along the lines of:
def read_json_line(line=None):
result = None
while result is None and line: # empty line is falsy, avoid endless loop
try:
result = json.loads(line)
except Exception as e:
result = None
# Find the offending character index:
idx_to_replace = int(str(e).split(' ')[-1].replace(')',''))
# slice away the offending character:
line = line[:idx_to_replace]+line[idx_to_replace+1:]
return result
Speed up file I/O
Think about your file_len
function. What does it have to do? It reads the entire file each time you call it.
Think about your main loop. It also reads the entire file. And then what does it do for each line in the file? It calls file_len
.
How many times are you reading the entire file if the file has 5 lines? 10 lines? 100 lines?
15000 lines?
There are tools to help you find the hot spots in your program - for example the cProfile stdlib module. However, just thinking about your program for a minute is faster than generating and analyzing a profile and can yield results just as good.
Using multiprocessing for speeding up IO in python
Depending on the structure of the data in df
(result of astore.getAllData(TESTSPEC)
) you could try to use sharedctypes to store the collected data in the shared memory. Certainly this method is useful mainly for 'POD's - data-only structures without any code or complex objects within.
Also I would move entire dataprocessing to the children and make sure that astore
is actually capable to work in parallel w/o synchronizing (or at least minimizing sync time) between clients (different processes).
But certainly all these suggestions are based on the 'common sense' - without precise knowledge about your app internals and accurate profiling it'd be hard to say exactly what'd be the best solution for you
How to speed up async requests in Python
Bottleneck: number of simultaneous connections
First, the bottleneck is the total number of simultaneous connections in the TCP connector.
That default for aiohttp.TCPConnector
is limit=100
. On most systems (tested on macOS), you should be able to double that by passing a connector
with limit=200
:
# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:
The time taken should decrease significantly. (On macOS: q = 20_000
decreased 43% from 58 seconds to 33 seconds, and q = 10_000
decreased 42% from 31 to 18 seconds.)
The limit
you can configure depends on the number of file descriptors that your machine can open. (On macOS: You can run ulimit -n
to check, and ulimit -n 1024
to increase to 1024 for the current terminal session, and then change to limit=1000
. Compared to limit=100
, q = 20_000
decreased 76% to 14 seconds, and q = 10_000
decreased 71% to 9 seconds.)
Supporting 50 million requests: async generators
Next, the reason why 50 million requests appears to hang is simply because of its sheer number.
Just creating 10 million coroutines in post_tasks
takes 68-98 seconds (varies greatly on my machine), and then the event loop is further burdened with that many tasks, 99.99% of which are blocked by the TCP connection pool.
We can defer the creation of coroutines using an async generator:
async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)
We need a counterpart to asyncio.as_completed()
to handle async_gen
and concurrency
:
from asyncio import ensure_future, events
from asyncio.queues import Queue
def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() # +
def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) # +
async def _wait_for_one():
f = await done.get()
return f.result()
async def _add_next(): # +
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)
# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): # +
loop.run_until_complete(_add_next()) # +
while todo: # +
yield _wait_for_one() # +
Then, we update fetch()
:
from functools import partial
CONCURRENCY = 200 # +
n = 0
q = 50_000_000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator # +
async_gen = make_async_gen(partial(do_get, session, url), n, q) # +
# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency # +
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] # +
Other limitations
With the above, the program can start processing 50 million requests but:
- it will still take 8 hours or so with
CONCURRENCY = 1000
, based on the estimate fromtqdm
. - your program may run out of memory for
responses
and crash.
For point 2, you should probably do:
# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f
# Do something with response, such as writing to a local file
# ...
An error in the code
do_get()
should return data
:
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
# print(data) # -
return data # +
How can I make file parsing and I/O faster in python when working with huge files (20GB+)
Function calls incur significant overhead in Python. Don't call a function on every line of the file, but inline the definition. Also, don't repeatedly open the same output file; open it once and leave it open.
with open("file.txt", "r", encoding="utf-8") as f, \
open("parsed.txt", "a", encoding="utf-8") as outh:
for line in f:
data = line.split("-|-")
try:
print(f"{data[2]} some text here {data[3]}", file=outh)
except Exception:
pass
C vs Python I/O performance for lots of small files
It boils down to a few things:
Most importantly, the Python version is using the text mode (i.e.
r
andw
), which implies handlingstr
(UTF-8) objects instead ofbytes
.There are many small files and we do so little with them -- Python's own overhead (e.g. setting up the file objects in
open
) becomes important.Python has to dynamically allocate memory for most things.
Also note that I/O in this test is not that relevant if you use local files and do multiple runs, since they will be already cached in memory. The only real I/O will be the final write
(and even then, you would have to make sure you are flushing/syncing to disk).
Now, if you take care of the text mode (i.e. using rb
and wb
) and also you reduce the allocations (less important in this case, but also noticeable), you get something like this:
def combine():
flagHeader = True
with open('results-python-new.txt', 'wb') as fout:
for filename in glob.glob('runs/Hs*.txt'):
with open(filename, 'rb') as fin:
header = fin.readline()
values = fin.readline()
if flagHeader:
flagHeader = False
fout.write(header)
fout.write(values)
Then Python already finishes the tasks in half the time -- actually faster than the C version:
Old C: 0.234
Old Python: 0.389
New Python: 0.213
Possibly you can still improve the time a bit, e.g. by avoiding the glob
.
However, if you also apply a couple of similar modifications to the C version, then you will get a much better time -- a third of the time of Python's:
New C: 0.068
Take a look:
#define LINE_SIZE 300
void combine(void) {
DIR *d;
FILE *fin;
FILE *fout;
struct dirent *dir;
char headers[LINE_SIZE];
char values[LINE_SIZE];
short flagHeader = 1;
fout = fopen("results-c-new.txt", "wb");
chdir("runs");
d = opendir(".");
if (d) {
while ((dir = readdir(d)) != NULL) {
if ((strstr(dir->d_name, "Hs")) && (strstr(dir->d_name, ".txt")) ) {
fin = fopen(dir->d_name, "rb");
fgets(headers, LINE_SIZE, fin);
fgets(values, LINE_SIZE, fin);
if (flagHeader) {
flagHeader = 0;
fputs(headers, fout);
}
fputs(values, fout);
fclose(fin);
}
}
closedir(d);
fclose(fout);
}
}
Related Topics
Python List by Value Not by Reference
Extracting Data from HTML Table
Using Python 32 Bit in 64Bit Platform
Converting Python Objects for Rpy2
How to Handle Os.System Sigkill Signal Inside Python
Cant Get Pyperclip to Use Copy and Paste Modules on Python3
Why File Read Is Faster on Reading Again
Sharing Psycopg2/Libpq Connections Across Processes
Writing Python Lists to Columns in Csv
What Does -≫ Mean in Python Function Definitions
Pytesseract Ocr Multiple Config Options
Matplotlib-Animation "No Moviewriters Available"
Why Xgrabkey Generates Extra Focus-Out and Focus-In Events
Cannot Get Environment Variables in Django Settings File