The Intersection of Two Sorted Arrays

The intersection of two sorted arrays

Use set_intersection as here. The usual implementation would work similar to the merge part of merge-sort algorithm.

Finding intersection of two sorted arrays in Java

If your professor wants you to use arrays, you can use the following method:

public static int[] resize(int[] arr)
{
int len = arr.length;
int[] copy = new int[len+1];
for (int i = 0; i < len; i++)
{
copy[i] = arr[i];
}
return copy;
}

This will increase the size of the array by 1. You can use that instead. By the way, you're not using the fact that they're sorted in your find() method. What you should do is this:

public static void find(int[] a, int[] b, int[] acc)
{
int a_index = 0, b_index = 0, acc_index = -1;
int a_element, b_element;
while (a_index < a.length && b_index < b.length)
{
a_element = a[a_index]; b_element = b[b_index];
if (a_element == b_element)
{
acc = resize(acc);
acc[++acc_index] = a_element;
a_index++; b_index++;
} else if (b_element < a_element) {
b_index++;
} else {
a_index++;
}
}
System.out.println(java.util.Arrays.toString(acc));
}

This method is more efficient now. Working example.

What is the fastest algorithm for intersection of two sorted lists?

Assume that A has m elements and B has n elements, with m ≥ n. Information theoretically, the best we can do is

   (m + n)!
lg -------- = n lg (m/n) + O(n)
m! n!

comparisons, since in order to verify an empty intersection, we essentially have to perform a sorted merge. We can get within a constant factor of this bound by iterating through B and keeping a "cursor" in A indicating the position at which the most recent element of B should be inserted to maintain sorted order. We use exponential search to advance the cursor, for a total cost that is on the order of

lg x_1 + lg x_2 + ... + lg x_n,

where x_1 + x_2 + ... + x_n = m + n is some integer partition of m. This sum is O(n lg (m/n)) by the concavity of lg.

Efficient algorithm to produce the n-way intersection of sorted arrays in C

I assume that all your arrays are sorted. Lets assume we have arrays A_1 to A_n. Have a counter for each array (thus, we have n counters i_1 to i_n, just like you did it for two arrays).

Now we introduce a minimum-heap, that contains the whole arrays in a manner such that the minimum array is that array with the currently lowest number pointed to by the corresponding pointer. This means, we can at each moment, retrieve the array with the currently lowest number pointed to.

Now, we extract the minimum array from the heap and remember it. We go on extracting the minimum array as long as the number pointed to stays the same. If we extract all arrays (i.e. if all arrays have the same currently lowest pointed to number), we know that this number is in the intersection. If not (i.e. if not all arrays do contain the same currently lowest pointed to number), we know that the number we are currently examining can not be in the intersection. Thus, we increment all counters to the arrays already extracted and put them back into the heap.

We do this until we find one array's pointer reaching the array's end. I'm sorry for the undetailed description, but I do not have enough time to work it out in more detail.

Edit.

If you have one array with very few elements, it might be useful to just binary-search the other arrays for these numbers or checking these numbers using a hash table.

Intersection of sorted numpy arrays

Since intersect1d sort arrays each time, it's effectively inefficient.

Here you have to sweep intersection and each sample together to build the new intersection, which can be done in linear time, maintaining order.

Such task must often be tuned by hand with low level routines.

Here a way to do that with numba :

from numba import njit
import numpy as np

@njit
def drop_missing(intersect,sample):
i=j=k=0
new_intersect=np.empty_like(intersect)
while i< intersect.size and j < sample.size:
if intersect[i]==sample[j]: # the 99% case
new_intersect[k]=intersect[i]
k+=1
i+=1
j+=1
elif intersect[i]<sample[j]:
i+=1
else :
j+=1
return new_intersect[:k]

Now the samples :

n=10**7
ref=np.random.randint(0,n,n)
ref.sort()

def perturbation(sample,k):
rands=np.random.randint(0,n,k-1)
rands.sort()
l=np.split(sample,rands)
return np.concatenate([a[:-1] for a in l])

samples=[perturbation(ref,100) for _ in range(10)] #similar samples

And a run for 10 samples

def find_intersect(samples):
intersect=samples[0]
for sample in samples[1:]:
intersect=drop_missing(intersect,sample)
return intersect

In [18]: %time u=find_intersect(samples)
Wall time: 307 ms

In [19]: len(u)
Out[19]: 9999009

This way it seems that the job can be done in about 5 minutes , beyond loading time.

What is the most efficient way of getting the intersection of k sorted arrays?

Yes, it is possible! I've modified your example code to do this.

My answer assumes that your question is about the algorithm - if you want the fastest-running code using sets, see other answers.

This maintains the O(n log(k)) time complexity: all the code between if lowest != elem or ary != times_seen: and unbench_all = False is O(log(k)). There is a nested loop inside the main loop (for unbenched in range(times_seen):) but this only runs times_seen times, and times_seen is initially 0 and is reset to 0 after every time this inner loop is run, and can only be incremented once per main loop iteration, so the inner loop cannot do more iterations in total than the main loop. Thus, since the code inside the inner loop is O(log(k)) and runs at most as many times as the outer loop, and the outer loop is O(log(k)) and runs n times, the algorithm is O(n log(k)).

This algorithm relies upon how tuples are compared in Python. It compares the first items of the tuples, and if they are equal it, compares the second items (i.e. (x, a) < (x, b) is true if and only if a < b).
In this algorithm, unlike in the example code in the question, when an item is popped from the heap, it is not necessarily pushed again in the same iteration. Since we need to check if all sub-lists contain the same number, after a number is popped from the heap, it's sublist is what I call "benched", meaning that it is not added back to the heap. This is because we need to check if other sub-lists contain the same item, so adding this sub-list's next item is not needed right now.

If a number is indeed in all sub-lists, then the heap will look something like [(2,0),(2,1),(2,2),(2,3)], with all the first elements of the tuples the same, so heappop will select the one with the lowest sub-list index. This means that first index 0 will be popped and times_seen will be incremented to 1, then index 1 will be popped and times_seen will be incremented to 2 - if ary is not equal to times_seen then the number is not in the intersection of all sub-lists. This leads to the condition if lowest != elem or ary != times_seen:, which decides when a number shouldn't be in the result. The else branch of this if statement is for when it still might be in the result.

The unbench_all boolean is for when all sub-lists need to be removed from the bench - this could be because:

  1. The current number is known to not be in the intersection of the sub-lists
  2. It is known to be in the intersection of the sub-lists

When unbench_all is True, all the sub-lists that were removed from the heap are re-added. It is known that these are the ones with indices in range(times_seen) since the algorithm removes items from the heap only if they have the same number, so they must have been removed in order of index, contiguously and starting from index 0, and there must be times_seen of them. This means that we don't need to store the indices of the benched sub-lists, only the number that have been benched.

import heapq

def mergeArys(srtd_arys):
heap = []
srtd_iters = [iter(x) for x in srtd_arys]

# put the first element from each srtd array onto the heap
for idx, it in enumerate(srtd_iters):
elem = next(it, None)
if elem:
heapq.heappush(heap, (elem, idx))

res = []

# the number of tims that the current number has been seen
times_seen = 0

# the lowest number from the heap - currently checking if the first numbers in all sub-lists are equal to this
lowest = heap[0][0] if heap else None

# collect results in nlogK time
while heap:
elem, ary = heap[0]
unbench_all = True

if lowest != elem or ary != times_seen:
if lowest == elem:
heapq.heappop(heap)
it = srtd_iters[ary]
nxt = next(it, None)
if nxt:
heapq.heappush(heap, (nxt, ary))
else:
heapq.heappop(heap)
times_seen += 1

if times_seen == len(srtd_arys):
res.append(elem)
else:
unbench_all = False

if unbench_all:
for unbenched in range(times_seen):
unbenched_it = srtd_iters[unbenched]
nxt = next(unbenched_it, None)
if nxt:
heapq.heappush(heap, (nxt, unbenched))
times_seen = 0
if heap:
lowest = heap[0][0]

return res

if __name__ == '__main__':
a1 = [[1, 3, 5, 7], [1, 1, 3, 5, 7], [1, 4, 7, 9]]
a2 = [[1, 1], [1, 1, 2, 2, 3]]
for arys in [a1, a2]:
print(mergeArys(arys))

An equivalent algorithm can be written like this, if you prefer:

def mergeArys(srtd_arys):
heap = []
srtd_iters = [iter(x) for x in srtd_arys]

# put the first element from each srtd array onto the heap
for idx, it in enumerate(srtd_iters):
elem = next(it, None)
if elem:
heapq.heappush(heap, (elem, idx))

res = []

# collect results in nlogK time
while heap:
elem, ary = heap[0]
lowest = elem
keep_elem = True
for i in range(len(srtd_arys)):
elem, ary = heap[0]
if lowest != elem or ary != i:
if ary != i:
heapq.heappop(heap)
it = srtd_iters[ary]
nxt = next(it, None)
if nxt:
heapq.heappush(heap, (nxt, ary))

keep_elem = False
i -= 1
break
heapq.heappop(heap)

if keep_elem:
res.append(elem)

for unbenched in range(i+1):
unbenched_it = srtd_iters[unbenched]
nxt = next(unbenched_it, None)
if nxt:
heapq.heappush(heap, (nxt, unbenched))

if len(heap) < len(srtd_arys):
heap = []

return res

Fast intersection of two sorted integer arrays

UPDATE

The fastest I got was 200ms with arrays size 10mil, with the unsafe version (Last piece of code).

The test I've did:

var arr1 = new int[10000000];
var arr2 = new int[10000000];

for (var i = 0; i < 10000000; i++)
{
arr1[i] = i;
arr2[i] = i * 2;
}

var sw = Stopwatch.StartNew();

var result = arr1.IntersectSorted(arr2);

sw.Stop();

Console.WriteLine(sw.Elapsed); // 00:00:00.1926156

Full Post:

I've tested various ways to do it and found this to be very good:

public static List<int> IntersectSorted(this int[] source, int[] target)
{
// Set initial capacity to a "full-intersection" size
// This prevents multiple re-allocations
var ints = new List<int>(Math.Min(source.Length, target.Length));

var i = 0;
var j = 0;

while (i < source.Length && j < target.Length)
{
// Compare only once and let compiler optimize the switch-case
switch (source[i].CompareTo(target[j]))
{
case -1:
i++;

// Saves us a JMP instruction
continue;
case 1:
j++;

// Saves us a JMP instruction
continue;
default:
ints.Add(source[i++]);
j++;

// Saves us a JMP instruction
continue;
}
}

// Free unused memory (sets capacity to actual count)
ints.TrimExcess();

return ints;
}

For further improvement you can remove the ints.TrimExcess();, which will also make a nice difference, but you should think if you're going to need that memory.

Also, if you know that you might break loops that use the intersections, and you don't have to have the results as an array/list, you should change the implementation to an iterator:

public static IEnumerable<int> IntersectSorted(this int[] source, int[] target)
{
var i = 0;
var j = 0;

while (i < source.Length && j < target.Length)
{
// Compare only once and let compiler optimize the switch-case
switch (source[i].CompareTo(target[j]))
{
case -1:
i++;

// Saves us a JMP instruction
continue;
case 1:
j++;

// Saves us a JMP instruction
continue;
default:
yield return source[i++];
j++;

// Saves us a JMP instruction
continue;
}
}
}

Another improvement is to use unsafe code:

public static unsafe List<int> IntersectSorted(this int[] source, int[] target)
{
var ints = new List<int>(Math.Min(source.Length, target.Length));

fixed (int* ptSrc = source)
{
var maxSrcAdr = ptSrc + source.Length;

fixed (int* ptTar = target)
{
var maxTarAdr = ptTar + target.Length;

var currSrc = ptSrc;
var currTar = ptTar;

while (currSrc < maxSrcAdr && currTar < maxTarAdr)
{
switch ((*currSrc).CompareTo(*currTar))
{
case -1:
currSrc++;
continue;
case 1:
currTar++;
continue;
default:
ints.Add(*currSrc);
currSrc++;
currTar++;
continue;
}
}
}
}

ints.TrimExcess();
return ints;
}

In summary, the most major performance hit was in the if-else's.
Turning it into a switch-case made a huge difference (about 2 times faster).



Related Topics



Leave a reply



Submit