Read Large Txt File Multithreaded

multithread read and process large text files

There are multiple approaches to it:

1.) You can create threads explicitly like Thread t = new Thread(), but this approach is expensive on creating and managing a thread.

2.) You can use .net ThreadPool and pass your executing function's address to QueueUserWorkItem static method of ThreadPool Class. This approach needs some manual code management and synchronization primitives.

3.) You can create an array of System.Threading.Tasks.Task each processing a list which are executed parallely using all your available processors on the machine and pass that array to task.WaitAll(Task[]) to wait for their completion. This approach is related to Task Parallelism and you can find detailed information on MSDN

Task[] tasks = null;
for(int i = 0 ; i < 10; i++)
{
//automatically create an async task and execute it using ThreadPool's thread
tasks[i] = Task.StartNew([address of function/lambda expression]);
}

try
{
//Wait for all task to complete
Task.WaitAll(tasks);
}
catch (AggregateException ae)
{
//handle aggregate exception here
//it will be raised if one or more task throws exception and all the exceptions from defaulting task get accumulated in this exception object
}

//continue your processing further

Process large text file concurrently

As was also proposed by Alexei, you can create OrderedTask:

class OrderedTask implements Comparable<OrderedTask> {

private final Integer index;
private final String line;

public OrderedTask(Integer index, String line) {
this.index = index;
this.line = line;
}

@Override
public int compareTo(OrderedTask o) {
return index < o.getIndex() ? -1 : index == o.getIndex() ? 0 : 1;
}

public Integer getIndex() {
return index;
}

public String getLine() {
return line;
}
}

As an output queue you can use your own backed by priority queue:

class OrderedTaskQueue {

private final ReentrantLock lock;
private final Condition waitForOrderedItem;
private final int maxQueuesize;
private final PriorityQueue<OrderedTask> backedQueue;

private int expectedIndex;

public OrderedTaskQueue(int maxQueueSize, int startIndex) {
this.maxQueuesize = maxQueueSize;
this.expectedIndex = startIndex;
this.backedQueue = new PriorityQueue<>(2 * this.maxQueuesize);

this.lock = new ReentrantLock();
this.waitForOrderedItem = this.lock.newCondition();
}

public boolean put(OrderedTask item) {
ReentrantLock lock = this.lock;
lock.lock();
try {
while (this.backedQueue.size() >= maxQueuesize && item.getIndex() != expectedIndex) {
this.waitForOrderedItem.await();
}

boolean result = this.backedQueue.add(item);
this.waitForOrderedItem.signalAll();
return result;
} catch (InterruptedException e) {
throw new RuntimeException();
} finally {
lock.unlock();
}
}

public OrderedTask take() {
ReentrantLock lock = this.lock;
lock.lock();
try {
while (this.backedQueue.peek() == null || this.backedQueue.peek().getIndex() != expectedIndex) {
this.waitForOrderedItem.await();
}
OrderedTask result = this.backedQueue.poll();
expectedIndex++;
this.waitForOrderedItem.signalAll();
return result;
} catch (InterruptedException e) {
throw new RuntimeException();
} finally {
lock.unlock();
}
}
}

StartIndex is the index of the first ordered task, and
maxQueueSize is used to stop processing of other tasks (not to fill the memory), when we wait for some earlier task to finish. It should be double/tripple of the number of processing thread, to not stop the processing immediatelly and allow the scalability.

Then you should create your task :

int indexOrder =0;
while ((line = reader.readLine()) != null) {
inputQueue.put(new OrderedTask(indexOrder++,line);

}

The line by line is only used because of your example. You should change the OrderedTask to support the batch of lines.

Multithreading while reading and processing a huge file (too big for memory)

First, the (IMO) simplest solution

If, as it seems, the lines are completely independent, just split your file in N chunks, pass the filename to open as a program argument and run multiple instances of your current script starting them manually on multiple command lines.

Pros:

  • No need to delve with the multiprocessing, inter-process communication, etc
  • Doesn't need to alter the code too much

Cons:

  • You need to preprocess the big file splitting it into chunks (although this will be much faster than your current execution time, since you won't have an open-close-per-line scenario)
  • You need to start the processes yourself, passing the appropriate filename for each of them

This would be implemented as:

Preprocessing:

APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
chunk_id = 0
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
while next_chunk:
with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
ofp.writelines(next_chunk)
chunk_id += 1
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)

From the readlines docs:

If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read.

Doing it this way won't ensure an even number of lines in all chunks, but will make preprocessing much faster,since you're reading in blocks and not line-by-line. Adapt the chunk size as needed.
Also, note that by using readlines we can be sure we won't have lines broken between chunks, but since the function returns a list of lines, we use writelines to write that in our output file (which is equivalent to loop over the list and ofp.write(line)). For the sake of completeness, let me note that you could also concatenate all strings in-memory and call write just once (i.e., do ofp.write(''.join(next_chunk))), which might bring you some (minor) performance benefit, paid in (much) higher RAM usage.

Main script:

The only modifications you need are at the very top:

import sys
file=sys.argv[1]
... # rest of your script here

By using argv you ca pass command-line arguments to your program (in this case, the file to open). Then, just run your script as:

python process_the_file.py big_file_0.txt

This will run one process. Open multiple terminals and run the same command with big_file_N.txt for each and they'll be independent from each other.

Note: I use argv[1] because for all programs the first value of argv (i.e., argv[0]) is always the program name.



Then, the multiprocessing solution

Although effective, the first solution is not quite elegant, especially since you'll have 80 files if you start from a file 80GBs in size.

A cleaner solution is to make use of python's multiprocessing module (important: NOT threading! If you don't know the difference, look up "global interpreter lock" and why multithreading in python doesn't work the way you think it would).

The idea is to have one "producer" process that opens the big file and continuously puts lines from it in a queue. Then, a pool of "consumer" processes that extract from the queue the lines and do the processing.

Pros:

  • One script does everything
  • No need to open multiple terminals and do typing

Cons:

  • Complexity
  • uses inter-process communication, which has some overhead

This would be implemented as follows:

# Libraries
import os
import multiprocessing

outputdirectory="sorted"
depth=4 # This is the tree depth

# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down.
# It opens, writes and closes a lot of small files.
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()

if __name__ == '__main__':
# Variables
file="80_gig_file.txt"

# Preperations
os.makedirs(outputdirectory)
pool = multiprocessing.Pool() # by default, 1 process per CPU
LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM

with open(file) as infile:
next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
pool.close()
pool.join()

The if __name__ == '__main__' line is a barrier to separate code that runs on every process from the one that runs only on the "father". Every process defines pipeline, but only the father actually spawns a pool of workers and applies the function. You find more details about multiprocessing.map here

Edit:

Added closing and joining o the pool to prevent the main process from exiting and killing the children in the process.

Threading when reading a large txt file in java?

It sounds like you need a configurable pool of threads, and each file-read operation is a job to be submitted into that pool.

When the user specifies how many threads to use, you'd configure the pool appropriately, submit the set of file-read jobs, and let the pool sort out the executions.

In the Java world, you'd use the Executors.newFixedThreadPool factory method, and submit each job as a Callable. Here's an article from IBM on Java thread pooling.

Python Read huge file line per line and send it to multiprocessing or thread

Although the problem seems unrealistic though. shooting 737,022,387 requests! calculate how many months it'll take from single computer!!

Still, Better way to do this task is to read line by line from file in a separate thread and insert into a queue. And then multi-process the queue.

Solution 1:

from multiprocessing import Queue, Process
from threading import Thread
from time import sleep

urls_queue = Queue()
max_process = 4

def read_urls():
with open('urls_file.txt', 'r') as f:
for url in f:
urls_queue.put(url.strip())
print('put url: {}'.format(url.strip()))

# put DONE to tell send_request_processor to exit
for i in range(max_process):
urls_queue.put("DONE")

def send_request(url):
print('send request: {}'.format(url))
sleep(1)
print('recv response: {}'.format(url))

def send_request_processor():
print('start send request processor')
while True:
url = urls_queue.get()
if url == "DONE":
break
else:
send_request(url)

def main():
file_reader_thread = Thread(target=read_urls)
file_reader_thread.start()

procs = []
for i in range(max_process):
p = Process(target=send_request_processor)
procs.append(p)
p.start()

for p in procs:
p.join()

print('all done')
# wait for all tasks in the queue
file_reader_thread.join()

if __name__ == '__main__':
main()

Demo: https://onlinegdb.com/Elfo5bGFz

Solution 2:

You can use tornado asynchronous networking library

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
async for item in q:
try:
print('Doing work on %s' % item)
await gen.sleep(0.01)
finally:
q.task_done()

async def producer():
with open('urls_file.txt', 'r') as f:
for url in f:
await q.put(url)
print('Put %s' % item)

async def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
await producer() # Wait for producer to put all tasks.
await q.join() # Wait for consumer to finish all tasks.
print('Done')
# producer and consumer can run in parallel

IOLoop.current().run_sync(main)

How to process contents from large text file using multiple threads?

Answering one part here - why is the BlockingQueue option slower.

It is important to understand that threads don't come for "free". There is always certain overhead required to get them up and "manage" them.

And of course, when you are actually using more threads than your hardware can support "natively" then context switching is added to the bill.

Beyond that, also the BlockingQueue doesn't come free either. You see, in order to preserve order, that ArrayBlockingQueue probably has to synchronize somewhere. Worst case, that means locking and waiting. Yes, the JVM and JIT are usually pretty good about such things; but again, a certain "percentage" gets added to the bill.

But just for the record, that shouldn't matter. From the javadoc:

This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.

As you are not setting "fairness"

BlockingQueue queue = new ArrayBlockingQueue<>(100);

that shouldn't affect you. On the other hand: I am pretty sure you expected those lines to be processed in order, so you would actually want to go for

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100, true);

and thereby further slowing down the whole thing.

Finally: I agree with the comments given so far. Benchmarking such things is a complex undertaking; and many aspects influence the results. The most important question is definitely: where is your bottle neck?! Is it IO performance (then more threads don't help much!) - or is it really overall processing time (and then the "correct" number of threads for processing should definitely speed up things).

And regarding "how to do this in the correct way" - I suggest to check out this question on softwareengineering.SE.



Related Topics



Leave a reply



Submit