How to Speed-Up Python Io

Is it possible to speed-up python IO?

You can't get any faster than the maximum disk read speed.

In order to reach the maximum disk speed you can use the following two tips:

  1. Read the file in with a big buffer. This can either be coded "manually" or simply by using io.BufferedReader ( available in python2.6+ ).
  2. Do the newline counting in another thread, in parallel.

How to achieve Faster File I/O In Python?

From comment:

why do you think that 8 writes result in 8 physical writes to your harddisk? The file object itself buffers what to write, if it decides to write to your OS, your OS might as well wait a little until it physically writes - and even then your harrdrives got buffers that might keep the files content for a while until it starts to really write. See How often does python flush to a file?


You should not use exceptions as control flow, nor recurse where it is not needed. Each recursion prepares new call stacks for the function call - that takes ressources and time - and all of it has to be reverted as well.

The best thing to do would be to clean up your data before feeding it into the json.load() ... the next best thing to do would be to avoid recursing ... try something along the lines of:

def read_json_line(line=None):
result = None

while result is None and line: # empty line is falsy, avoid endless loop
try:
result = json.loads(line)
except Exception as e:
result = None
# Find the offending character index:
idx_to_replace = int(str(e).split(' ')[-1].replace(')',''))
# slice away the offending character:
line = line[:idx_to_replace]+line[idx_to_replace+1:]

return result

Speed up file I/O

Think about your file_len function. What does it have to do? It reads the entire file each time you call it.

Think about your main loop. It also reads the entire file. And then what does it do for each line in the file? It calls file_len.

How many times are you reading the entire file if the file has 5 lines? 10 lines? 100 lines?
15000 lines?

There are tools to help you find the hot spots in your program - for example the cProfile stdlib module. However, just thinking about your program for a minute is faster than generating and analyzing a profile and can yield results just as good.

Using multiprocessing for speeding up IO in python

Depending on the structure of the data in df (result of astore.getAllData(TESTSPEC)) you could try to use sharedctypes to store the collected data in the shared memory. Certainly this method is useful mainly for 'POD's - data-only structures without any code or complex objects within.

Also I would move entire dataprocessing to the children and make sure that astore is actually capable to work in parallel w/o synchronizing (or at least minimizing sync time) between clients (different processes).

But certainly all these suggestions are based on the 'common sense' - without precise knowledge about your app internals and accurate profiling it'd be hard to say exactly what'd be the best solution for you

How to speed up async requests in Python

Bottleneck: number of simultaneous connections

First, the bottleneck is the total number of simultaneous connections in the TCP connector.

That default for aiohttp.TCPConnector is limit=100. On most systems (tested on macOS), you should be able to double that by passing a connector with limit=200:

# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:

The time taken should decrease significantly. (On macOS: q = 20_000 decreased 43% from 58 seconds to 33 seconds, and q = 10_000 decreased 42% from 31 to 18 seconds.)

The limit you can configure depends on the number of file descriptors that your machine can open. (On macOS: You can run ulimit -n to check, and ulimit -n 1024 to increase to 1024 for the current terminal session, and then change to limit=1000. Compared to limit=100, q = 20_000 decreased 76% to 14 seconds, and q = 10_000 decreased 71% to 9 seconds.)

Supporting 50 million requests: async generators

Next, the reason why 50 million requests appears to hang is simply because of its sheer number.

Just creating 10 million coroutines in post_tasks takes 68-98 seconds (varies greatly on my machine), and then the event loop is further burdened with that many tasks, 99.99% of which are blocked by the TCP connection pool.

We can defer the creation of coroutines using an async generator:

async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)

We need a counterpart to asyncio.as_completed() to handle async_gen and concurrency:

from asyncio import ensure_future, events
from asyncio.queues import Queue

def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() # +

def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) # +

async def _wait_for_one():
f = await done.get()
return f.result()

async def _add_next(): # +
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)

# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): # +
loop.run_until_complete(_add_next()) # +
while todo: # +
yield _wait_for_one() # +

Then, we update fetch():

from functools import partial

CONCURRENCY = 200 # +

n = 0
q = 50_000_000

async def fetch():
# example
url = "https://httpbin.org/anything/log?id="

async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator # +
async_gen = make_async_gen(partial(do_get, session, url), n, q) # +

# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency # +
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] # +

Other limitations

With the above, the program can start processing 50 million requests but:

  1. it will still take 8 hours or so with CONCURRENCY = 1000, based on the estimate from tqdm.
  2. your program may run out of memory for responses and crash.

For point 2, you should probably do:

# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f

# Do something with response, such as writing to a local file
# ...


An error in the code

do_get() should return data:

async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}

async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
# print(data) # -
return data # +

How can I make file parsing and I/O faster in python when working with huge files (20GB+)

Function calls incur significant overhead in Python. Don't call a function on every line of the file, but inline the definition. Also, don't repeatedly open the same output file; open it once and leave it open.

with open("file.txt", "r", encoding="utf-8") as f, \
open("parsed.txt", "a", encoding="utf-8") as outh:
for line in f:
data = line.split("-|-")
try:
print(f"{data[2]} some text here {data[3]}", file=outh)
except Exception:
pass

C vs Python I/O performance for lots of small files

It boils down to a few things:

  1. Most importantly, the Python version is using the text mode (i.e. r and w), which implies handling str (UTF-8) objects instead of bytes.

  2. There are many small files and we do so little with them -- Python's own overhead (e.g. setting up the file objects in open) becomes important.

  3. Python has to dynamically allocate memory for most things.

Also note that I/O in this test is not that relevant if you use local files and do multiple runs, since they will be already cached in memory. The only real I/O will be the final write (and even then, you would have to make sure you are flushing/syncing to disk).

Now, if you take care of the text mode (i.e. using rb and wb) and also you reduce the allocations (less important in this case, but also noticeable), you get something like this:

def combine():
flagHeader = True
with open('results-python-new.txt', 'wb') as fout:
for filename in glob.glob('runs/Hs*.txt'):
with open(filename, 'rb') as fin:
header = fin.readline()
values = fin.readline()
if flagHeader:
flagHeader = False
fout.write(header)
fout.write(values)

Then Python already finishes the tasks in half the time -- actually faster than the C version:

Old C:      0.234
Old Python: 0.389
New Python: 0.213

Possibly you can still improve the time a bit, e.g. by avoiding the glob.

However, if you also apply a couple of similar modifications to the C version, then you will get a much better time -- a third of the time of Python's:

New C:      0.068

Take a look:

#define LINE_SIZE 300

void combine(void) {
DIR *d;
FILE *fin;
FILE *fout;
struct dirent *dir;
char headers[LINE_SIZE];
char values[LINE_SIZE];
short flagHeader = 1;

fout = fopen("results-c-new.txt", "wb");
chdir("runs");
d = opendir(".");
if (d) {
while ((dir = readdir(d)) != NULL) {
if ((strstr(dir->d_name, "Hs")) && (strstr(dir->d_name, ".txt")) ) {
fin = fopen(dir->d_name, "rb");
fgets(headers, LINE_SIZE, fin);
fgets(values, LINE_SIZE, fin);
if (flagHeader) {
flagHeader = 0;
fputs(headers, fout);
}
fputs(values, fout);
fclose(fin);
}
}
closedir(d);
fclose(fout);
}
}


Related Topics



Leave a reply



Submit