Memory-Constrained External Sorting of Strings, with Duplicates Combined&Counted, on a Critical Server (Billions of Filenames)

Memory-constrained external sorting of strings, with duplicates combined&counted, on a critical server (billions of filenames)

IDK if external sorting with count-merging of duplicates has been studied. I did find a 1983 paper (see below). Usually, sorting algorithms are designed and studied with the assumption of sorting objects by keys, so duplicate keys have different objects. There might be some existing literature on this, but it's a very interesting problem. Probably it's just considered an application of compact dictionaries combined with external merge-sorting.

Efficient dictionaries for storing large amounts of strings in little memory is a very well studied problem. Most of the useful data structures can include auxiliary data for each word (in our case, a dup count).


TL:DR summary of useful ideas, since I rambled on in way too much detail about many things in the main body of this answer:

  • Batch boundaries when your dictionary size hits a threshold, not after fixed numbers of input files. If there were a lot of duplicates in a group of 5000 strings, you still won't be using very much memory. You can find way more duplicates in the first pass this way.

  • Sorted batches makes merging much faster. You can and should merge many->one instead of binary merging. Use a PriorityQueue to figure out which input file has the line you should take next.

  • To avoid a burst of memory usage when sorting the keys in a hash table, use a dictionary that can do an in-order traversal of keys. (i.e. sort on the fly.) There's SortedDictionary<TKey, TValue> (binary tree-based). This also interleaves the CPU usage of sorting with the I/O waiting to get the input strings.

  • Radix-sort each batch into outputs by first-character (a-z, non-alphabetic that sorts before A, and non-alphabetic that sorts after z). Or some other bucketing choice that distributes your keys well. Use separate dictionaries for each radix bucket, and empty only the biggest into a batch when you hit your memory ceiling. (fancier eviction heuristics than "biggest" may be worth it.)

  • throttle I/O (esp. when merging), and check system CPU load and memory pressure. Adapt behaviour accordingly to make sure you don't cause an impact when the server is most busy.

  • For smaller temp files at the cost of CPU time, use a common-prefix encoding, or maybe lz4.

  • A space-efficient dictionary will allow larger batch sizes (and thus a larger duplicate-finding window) for the same upper memory bound. A Trie (or better, Radix Trie) might be ideal, because it stores the characters within the tree nodes, with common prefixes only stored once. Directed Acyclic Word Graphs are even more compact (finding redundancy between common substrings that aren't prefixes). Using one as a Dictionary is tricky but probably possible (see below).

  • Take advantage of the fact that you don't need to delete any tree nodes or strings until you're going to empty the whole dictionary. Use a growable array of nodes, and another growable char array that packs strings head to tail. (Useful for a Radix Trie (multi-char nodes), but not a regular Trie where each node is a single char.)

  • Depending on how the duplicates are distributed, you might or might not be able to find very many on the first pass. This has some implications, but doesn't really change how you end up merging.


I'm assuming you have some directory traversal idea in mind, which can efficiently supply your code with a stream of strings to be uniquified and counted. So I'll just say "strings" or "keys", to talk about the inputs.

Trim off as many unnecessary characters as possible (e.g. lose the .xml if they're all .xml).


It might be useful to do the CPU/memory intensive work on a separate machine, depending on what other hardware you have with a fast network connection to your critical production server.

You could run a simple program on the server that sends filenames over a TCP connection to a program running on another machine, where it's safe to use much more memory. The program on the server could still do small dictionary batches, and just store them on a remote filesystem.


And now, since none of the other answers really put all the pieces together, here's my actual answer:

An upper bound on memory usage is easy. Write your program to use a constant memory ceiling, regardless of input size. Bigger inputs will lead to more merging phases, not more memory usage at any point.

The best estimate of temporary file storage space you can do without looking at the input is a very conservative upper bound that assumes every input string is unique. You need some way to estimate how many input strings there will be. (Most filesystems know how many separate files they contain, without having to walk the directory tree and count them.)

You can make some assumptions about the distribution of duplicates to make a better guess.

If number, rather than size, of scratch files is an issue, you can store multiple batches in the same output file, one after another. Either put length-headers at the start of each to allow skipping forward by batch, or write byte offsets to a separate data stream. If size is also important, see my paragraph about using frcode-style common-prefix compression.


As Ian Mercer points out in his answer, sorting your batches will make merging them much more efficient. If you don't, you either risk hitting a wall where your algorithm can't make forward progress, or you need to do something like load one batch, scan another batch for entries that are in the first, and rewrite the 2nd batch with just the potentially-few matching entries removed.

Not sorting your batches makes the time complexity of the first pass O(N), but either you have to sort at some point later, or your later stages have a worst-case bound that's dramatically worse. You want your output globally sorted, so other than RadixSort approaches, there's no avoiding an O(N log N) somewhere.

With limited batch size, O(log N) merge steps are expected, so your original analysis missed the O(N log N) complexity of your approach by ignoring what needs to happen after the phase1 batches are written.


The appropriate design choices change a lot depending on whether our memory ceiling is big enough to find many duplicates within one batch. If even a complex compact data structure like a Trie doesn't help much, putting the data into a Trie and getting it out again to write a batch is a waste of CPU time.

If you can't do much duplicate-elimination within each batch anyway, then you need to optimize for putting possibly-matching keys together for the next stage. Your first stage could group input strings by first byte, into up-to 252 or so output files (not all 256 values are legal filename characters), or into 27 or so output files (alphabet + misc), or 26+26+1 for upper/lower case + non-alphabetic. Temp files can omit the common prefix from each string.

Then most of these first stage batches should have a much higher duplicate density. Actually, this Radix distribution of inputs into output buckets is useful in any case, see below.

You should still sort your first-stage outputs in chunks, to give the next pass a much wider dup-find window for the same RAM.


I'm going to spend more time on the domain where you can find a useful amount of duplicates in the initial stream, before using up ~100MiB of RAM, or whatever you choose as an upper limit.

Obviously we add strings to some sort of dictionary to find and count duplicates on the fly, while only requiring enough storage for the set of unique strings. Just storing strings and then sorting them would be significantly less efficient, because we'd hit our RAM limit much sooner without on-the-fly duplicate detection.

To minimize the phase2 work, phase1 should find and count as many duplicates as possible, reducing the total size of the p2 data. Reducing the amount of merging work for phase2 is good, too. Bigger batches helps with both factors, so it's very useful to come as close to your memory ceiling as you safely can in phase1. Instead of writing a batch after a constant number of input strings, do it when your memory consumption nears your chosen ceiling. Duplicates are counted and thrown away, and don't take any extra storage.

An alternative to accurate memory accounting is tracking the unique strings in your dictionary, which is easy (and done for you by the library implementation). Accumulating the length of strings added can give you a good estimate of memory used for storing the strings, too. Or just make an assumption about string length distribution. Make your hash table the right size initially so it doesn't have to grow while you add elements, so you stop when it's 60% full (load factor) or something.


A space-efficient data structure for the dictionary increases our dup-finding window for a given memory limit. Hash tables get badly inefficient when their load factor is too high, but the hash table itself only has to store pointers to the strings. It's the most familiar dictionary and has a library implementations.

We know we're going to want our batch sorted once we've seen enough unique keys, so it might make sense to use a dictionary that can be traversed in sorted order. Sorting on the fly makes sense because keys will come in slowly, limited by disk IO since we're reading from filesystem metadata. One downside is if most of the keys we see are duplicates, then we're doing a lot of O(log batchsize) lookups, rather than a lot of O(1) lookups. And it's more likely that a key will be a duplicate when the dictionary is big, so most of those O(log batchsize) queries will be with a batch size near max, not uniformly distributed between 0 and max. A tree pays the O(log n) overhead of sorting for every lookup, whether the key turned out to be unique or not. A hash table only pays the sorting cost at the end after removing duplicates. So for a tree it's O(total_keys * log unique_keys), hash table is O(unique_keys * log unique_keys) to sort a batch.

A hash table with max load factor set to 0.75 or something might be pretty dense, but having to sort the KeyValuePairs before writing out a batch probably puts a damper on using standard Dictionary. You don't need copies of the strings, but you'll probably end up copying all the pointers (refs) to scratch space for a non-in-place sort, and maybe also when getting them out of the hash table before sorting. (Or instead of just pointers, KeyValuePair, to avoid having to go back and look up each string in the hash table). If short spikes of big memory consumption are tolerable, and don't cause you to swap / page to disk, you could be fine. This is avoidable if you can do an in-place sort in the buffer used by the hash table, but I doubt that can happen with standard-library containers.

A constant trickle of CPU usage to maintain the sorted dictionary at the speed keys are available is probably better than infrequent bursts of CPU usage to sort all of a batch's keys, besides the burst of memory consumption.

The .NET standard library has SortedDictionary<TKey, TValue>, which the docs say is implemented with a binary tree. I didn't check if it has a rebalance function, or uses a red-black tree, to guarantee O(log n) worst case performance. I'm not sure how much memory overhead it would have. If this is a one-off task, then I'd absolutely recommend using this to implement it quickly and easily. And also for a first version of a more optimized design for repeated use. You'll probably find it's good enough, unless you can find a nice library implementation of Tries.


Data structures for memory-efficient sorted dictionaries

The more memory efficient out dictionary is, the more dups we can find before having to write out a batch and delete the dictionary. Also, if it's a sorted dictionary, the larger our batches can be even when they can't find duplicates.

A secondary impact of data structure choice is how much memory traffic we generate while running on the critical server. A sorted array (with O(log n) lookup time (binary search), and O(n) insert time (shuffle elements to make room)) would be compact. However, it wouldn't just be slow, it would saturate memory bandwidth with memmove a lot of the time. 100% CPU usage doing this would have a bigger impact on the server's performance than 100% CPU usage searching a binary tree. It doesn't know where to load the next node from until it's loaded the current node, so it can't pipeline memory requests. The branch mispredicts of comparisons in the tree search also help moderate consumption of the memory bandwidth that's shared by all cores. (That's right, some 100%-CPU-usage programs are worse than others!)

It's nice if emptying our dictionary doesn't leave memory fragmented when we empty it. Tree nodes will be constant size, though, so a bunch of scattered holes will be usable for future tree node allocations. However, if we have separate dictionaries for multiple radix buckets (see below), key strings associated with other dictionaries might be mixed in with tree nodes. This could lead to malloc having a hard time reusing all the freed memory, potentially increasing actual OS-visible memory usage by some small factor. (Unless C# runtime garbage collection does compaction, in which case fragmentation is taken care of.)

Since you never need to delete nodes until you want to empty the dictionary and delete them all, you could store your Tree nodes in a growable array. So memory management only has to keep track of one big allocation, reducing bookkeeping overhead compared to malloc of each node separately. Instead of real pointers, the left / right child pointers could be array indices. This lets you use only 16 or 24 bits for them. (A Heap is another kind of binary tree stored in an array, but it can't be used efficiently as a dictionary. It's a tree, but not a search tree).

Storing the string keys for a dictionary would normally be done with each String as a separately-allocated object, with pointers to them in an array. Since again, you never need to delete, grow, or even modify one until you're ready to delete them all, you can pack them head to tail in a char array, with a terminating zero-byte at the end of each one. This again saves a lot of book-keeping, and also makes it easy to keep track of how much memory is in use for the key strings, letting you safely come closer to your chosen memory upper bound.

Trie / DAWG for even more compact storage

For even denser storage of a set of strings, we can eliminate the redundancy of storing all the characters of every string, since there are probably a lot of common prefixes.

A Trie stores the strings in the tree structure, giving you common-prefix compression. It can be traversed in sorted order, so it sorts on the fly. Each node has as many children as there are different next-characters in the set, so it's not a binary tree. A C# Trie partial implementation (delete not written) can be found in this SO answer, to a question similar to this but not requiring batching / external sorting.

Trie nodes need to store potentially many child pointers, so each node can be large. Or each node could be variable-size, holding the list of nextchar:ref pairs inside the node, if C# makes that possible. Or as the Wikipedia article says, a node can actually be a linked-list or binary search tree, to avoid wasting space in nodes with few children. (The lower levels of a tree will have a lot of that.) End-of-word markers / nodes are needed to distinguish between substrings that aren't separate dictionary entries, and ones that are. Our count field can serve that purpose. Count=0 means the substring ending here isn't in the dictionary. count>=0 means it is.

A more compact Trie is the Radix Tree, or PATRICIA Tree, which stores multiple characters per node.

Another extension of this idea is the Deterministic acyclic finite state automaton (DAFSA), sometimes called a Directed Acyclic Word Graph (DAWG), but note that the DAWG wikipedia article is about a different thing with the same name. I'm not sure a DAWG can be traversed in sorted order to get all the keys out at the end, and as wikipedia points out, storing associated data (like a duplicate count) requires a modification. I'm also not sure they can be built incrementally, but I think you can do lookups without having compacted. The newly added entries will be stored like a Trie, until a compaction step every 128 new keys merges them into the DAWG. (Or run the compaction less frequently for bigger DAWGs, so you aren't doing it too much, like doubling the size of a hash table when it has to grow, instead of growing linearly, to amortize the expensive op.)

You can make a DAWG more compact by storing multiple characters in a single node when there isn't any branching / converging. This page also mentions a Huffman-coding approach to compact DAWGs, and has some other links and article citations.

JohnPaul Adamovsky's DAWG implementation (in C) looks good, and describes some optimizations it uses. I haven't looked carefully to see if it can map strings to counts. It's optimized to store all the nodes in an array.

This answer to the dup-count words in 1TB of text question suggests DAWGs, and has a couple links, but I'm not sure how useful it is.


Writing batches: Radix on first character

You could get your RadixSort on, and keep separate dictionaries for every starting character (or for a-z, non-alphabetic that sorts before a, non-alphabetic that sorts after z). Each dictionary writes out to a different temp file. If you have multiple compute nodes available for a MapReduce approach, this would be the way to distribute merging work to the compute nodes.

This allows an interesting modification: instead of writing all radix buckets at once, only write the largest dictionary as a batch. This prevents tiny batches going into some buckets each time you. This will reduce the width of the merging within each bucket, speeding up phase2.

With a binary tree, this reduces the depth of each tree by about log2(num_buckets), speeding up lookups. With a Trie, this is redundant (each node uses the next character as a radix to order the child trees). With a DAWG, this actually hurts your space-efficiency because you lose out on finding the redundancy between strings with different starts but later shared parts.

This has the potential to behave poorly if there are a few infrequently-touched buckets that keep growing, but don't usually end up being the largest. They could use up a big fraction of your total memory, making for small batches from the commonly-used buckets. You could implement a smarter eviction algorithm that records when a bucket (dictionary) was last emptied. The NeedsEmptying score for a bucket would be something like a product of size and age. Or maybe some function of age, like sqrt(age). Some way to record how many duplicates each bucket has found since last emptied would be useful, too. If you're in a spot in your input stream where there are a lot of repeats for one of the buckets, the last thing you want to do is empty it frequently. Maybe every time you find a duplicate in a bucket, increment a counter. Look at the ratio of age vs. dups-found. Low-use buckets sitting there taking RAM away from other buckets will be easy to find that way, when their size starts to creep up. Really-valuable buckets might be kept even when they're the current biggest, if they're finding a lot of duplicates.

If your data structures for tracking age and dups found is a struct-of-arrays, the (last_emptied[bucket] - current_pos) / (float)dups_found[bucket] division can be done efficiently with vector floating point. One integer division is slower than one FP division. One FP division is the same speed as 4 FP divisions, and compilers can hopefully auto-vectorize if you make it easy for them like this.

There's a lot of work to do between buckets filling up, so division would be a tiny hiccup unless you use a lot of buckets.

choosing how to bucket

With a good eviction algorithm, an ideal choice of bucketing will put keys that rarely have duplicates together in some buckets, and buckets that have many duplicates together in other buckets. If you're aware of any patterns in your data, this would be a way to exploit it. Having some buckets that are mostly low-dup means that all those unique keys don't wash away the valuable keys into an output batch. An eviction algorithm that looks at how valuable a bucket has been in terms of dups found per unique key will automatically figure out which buckets are valuable and worth keeping, even though their size is creeping up.

There are many ways to radix your strings into buckets. Some will ensure that every element in a bucket compares less than every element in every later bucket, so producing fully-sorted output is easy. Some won't, but have other advantages. There are going to be tradeoffs between bucketing choices, all of which are data-dependent:

  • good at finding a lot of duplicates in the first pass (e.g. by separating high-dup patterns from low-dup patterns)
  • distributes the number of batches uniformly between buckets (so no bucket has a huge number of batches requiring a multi-stage merge in phase2), and maybe other factors.
  • produces bad behaviour when combined with your eviction algorithm on your data set.
  • amount of between-bucket merging needed to produce globally-sorted output. The importance of this scales with the total number of unique strings, not the number of input strings.

I'm sure clever people have thought about good ways to bucket strings before me, so this is probably worth searching on if the obvious approach of by-first-character isn't ideal. This special use-case (of sorting while eliminating/counting duplicates) is not typical. I think most work on sorting only considers sorts that preserve duplicates. So you might not find much that helps choose a good bucketing algorithm for a dup-counting external sort. In any case, it will be data-dependent.

Some concrete-options for bucketing are: Radix = first two bytes together (still combining upper/lowercase, and combining non-alphabetic characters). Or Radix = the first byte of the hash code. (Requires a global-merge to produce sorted output.) Or Radix = (str[0]>>2) << 6 + str[1]>>2. i.e. ignore the low 2 bits of the first 2 chars, to put [abcd][abcd].* together, [abcd][efgh].* together, etc. This would also require some merging of the sorted results between some sets of buckets. e.g. daxxx would be in the first bucket, but aexxx would be in the 2nd. But only buckets with the same first-char high-bits need to be merged with each other to produce the sorted final output.

An idea for handling a bucketing choice that gives great dup-finding but needs merge-sorting between buckets: When writing the phase2 output, bucket it with the first character as the radix to produce the sort order you want. Each phase1 bucket scatters output into phase2 buckets as part of the global sort. Once all the phase1 batches that can include strings starting with a have been processed, do the merge of the a phase2-bucket into the final output and delete those temp files.

Radix = first 2 bytes (combining non-alphabetic) would make for for 282 = 784 buckets. With 200MiB of RAM, that's average output file size of only ~256k. Emptying just one bucket at a time would make that the minimum, and you'd usually get larger batches, so this could work. (Your eviction algorithm could hit a pathological case that made it keep a lot of big buckets, and write a series of tiny batches for new buckets. There are dangers to clever heuristics if you don't test carefully).

Multiple batches packed into the same output file is probably most useful with many small buckets. You'll have e.g. 784 output files, each containing a series of batches. Hopefully your filesystem has enough contiguous free space, and is smart enough, to do a good job of not fragmenting too badly when scattering small-ish writes to many files.


Merging:

In the merging stages, with sorted batches we don't need a dictionary. Just take the next line from the batch that has the lowest, combining duplicates as you find them.

MergeSort typically merges pairs, but when doing external sorting (i.e. disk -> disk), a much wider input is common to avoid reading and re-writing the output a lot of times. Having 25 input files open to merge into one output file should be fine. Use the library implementation of PriorityQueue (typically implemented as a Heap) to choose the next input element from many sorted lists. Maybe add input lines with the string as the priority, and the count and input file number as payload.

If you used radix distribute-by-first-character in the first pass, then merge all the a batches into the final output file (even if this process takes multiple merging stages), then all the b batches, etc. You don't need to check any of the batches from the starts-with-a bucket against batches from any other bucket, so this saves a lot of merging work, esp. if your keys are well distributed by first character.


Minimizing impact on the production server:

Throttle disk I/O during merging, to avoid bringing your server to its knees if disk prefetch generates a huge I/O queue depth of reads. Throttling your I/O, rather than a narrower merge, is probably a better choice. If the server is busy with its normal job, it prob. won't be doing many big sequential reads even if you're only reading a couple files.

Check the system load occasionally while running. If it's high, sleep for 1 sec before doing some more work and checking again. If it's really high, don't do any more work until the load average drops (sleeping 30sec between checks).

Check the system memory usage, too, and reduce your batch threshold if memory is tight on the production server. (Or if really tight, flush your partial batch and sleep until memory pressure reduces.)

If temp-file size is an issue, you could do common-prefix compression like frcode from updatedb/locate to significantly reduce the file size for sorted lists of strings. Probably use case-sensitive sorting within a batch, but case-insensitive radixing. So each batch in the a bucket will have all the As, then all the as. Or even LZ4 compress / decompress them on the fly. Use hex for the counts, not decimal. It's shorter, and faster to encode/decode.

Use a separator that's not a legal filename character, like /, between key and count. String parsing might well take up a lot of the CPU time in the merge stage, so it's worth considering. If you can leave strings in per-file input buffers, and just point your PQueue at them, that might be good. (And tell you which input file a string came from, without storing that separately.)


performance tuning:

If the initial unsorted strings were available extremely fast, then a hash table with small batches that fit the dictionary in the CPU L3 cache might be a win, unless a larger window can include a much larger fraction of keys, and find more dups. It depends on how many repeats are typical in say 100k files. Build small sorted batches in RAM as you read, then merge them to a disk batch. This may be more efficient than doing a big in-memory quicksort, since you don't have random access to the input until you've initially read it.

Since I/O will probably be the limit, large batches that don't fit in the CPU's data cache are probably a win, to find more duplicates and (greatly?) reduce the amount of merging work to be done.

It might be convenient to check the hash table size / memory consumption after every chunk of filenames you get from the OS, or after every subdirectory or whatever. As long as you choose a conservative size bound, and you make sure you can't go for too long without checking, you don't need to go nuts checking every iteration.


This paper from 1983 examines external merge-sorting eliminating duplicates as they're encountered, and also suggests duplicate elimination with a hash function and a bitmap. With long input strings, storing MD5 or SHA1 hashes for duplicate-elimination saves a lot of space.

I'm not sure what they had in mind with their bitmap idea. Being collision-resistant enough to be usable without going back to check the original string would require a hash code of too many bits to index a reasonable-size bitmap. (e.g. MD5 is a 128bit hash).

find the value that was present maximum number of entries

You could sort the file and then do one pass that only requires O(1) memory.

Find duplicates in large file

The key is that your data will not fit into memory. You can use external merge sort for this:

Partition your file into multiple smaller chunks that fit into memory. Sort each chunk, eliminate the duplicates (now neighboring elements).

Merge the chunks and again eliminate the duplicates when merging. Since you will have an n-nway merge here you can keep the next k elements from each chunk in memory, once the items for a chunk are depleted (they have been merged already) grab more from disk.

Count number of killed processes

Well, if we are allowed to manipulate the process list that was initially given to us, you can simply sort the list of processes in descending order of magnitude. Then, the problem boils down to just iterating over the list and summing the memory allocation of each process you encounter. When you exceed x (i.e. the allocation required for the new process) you just return the current index + 1, which is the number of processes you have encountered (and killed) so far.

Below is a sample Python code implementing it.

def solve(processes, x):
processes.sort(reverse=True)
sum_of_freed_memory = 0
for i in range(len(processes)):
sum_of_freed_memory += processes[i]
if sum_of_freed_memory >= x:
return (i+1)
return -1 # to not crash if/when total memory is smaller than x

You may observe this code at work here, producing the output you desire.

Note that even if we could not manipulate the original list given as input, if we are allowed to use O(n) space in our solution, we may copy the initial list and use the same algorithm on that copy. In terms of code, replacing the first line of the function solve() with the line given below would do it.

    processes = sorted(processes, reverse=True)

The approach this code takes is greedy. At each step, it reasons in the following manner: if I have to kill one more process, then among the ones still running, let's kill the one using the largest memory, so that the chances of allocating sufficient memory are the highest possible. In other words, if there is a way of allocating enough space by killing one more process, killing the one using largest memory will do it, whereas killing some other process may be insufficient. Although it is not that formal a proof, I believe this reasoning explains why the algorithm works.



Related Topics



Leave a reply



Submit