Which Concurrent Queue Implementation Should I Use in Java

Which concurrent Queue implementation should I use in Java?

Basically the difference between them are performance characteristics and blocking behavior.

Taking the easiest first, ArrayBlockingQueue is a queue of a fixed size. So if you set the size at 10, and attempt to insert an 11th element, the insert statement will block until another thread removes an element. The fairness issue is what happens if multiple threads try to insert and remove at the same time (in other words during the period when the Queue was blocked). A fairness algorithm ensures that the first thread that asks is the first thread that gets. Otherwise, a given thread may wait longer than other threads, causing unpredictable behavior (sometimes one thread will just take several seconds because other threads that started later got processed first). The trade-off is that it takes overhead to manage the fairness, slowing down the throughput.

The most important difference between LinkedBlockingQueue and ConcurrentLinkedQueue is that if you request an element from a LinkedBlockingQueue and the queue is empty, your thread will wait until there is something there. A ConcurrentLinkedQueue will return right away with the behavior of an empty queue.

Which one depends on if you need the blocking. Where you have many producers and one consumer, it sounds like it. On the other hand, where you have many consumers and only one producer, you may not need the blocking behavior, and may be happy to just have the consumers check if the queue is empty and move on if it is.

Concurrent queue with only one consumer and producer threads for Java

The consumer is slower while the producer is producing because each time it reads, it experiences a cache miss, since a new element will always be present.
If all elements are already present, they can be fetched together, which improves throughput.

When busy-waiting consider using Thread.onSpinWait(): while it adds latency, it also enables certain performance optimizations.

// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
Thread.onSpinWait();
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}

The JDK does not have queues optimized for SPSC (Single-Producer Single-Consumer) scenarios. There are libraries for that. You can use Agrona or JCTools. Implementing these is not easy.

// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);

[Java]: Which kind of queue to use for my scenario?

Of the three you listed, the only which is actually a class is ArrayBlockingQueue. A blocking queue is different from a normal queue in that, if an object attempts to remove the front item, it will pause execution until there is an available item to remove.

"BlockingQueue" and "Queue" are just a interfaces; you can't instantiate them. Types of BlockingQueue that you can instantiate are ArrayBlockingQueue, LinkedBlockingQueue, etc.

Personally, I would use a LinkedBlockingQueue for this application - the advantage of using a linked list is that there's no set max capacity, and the memory usage decreases as the queue shrinks.

Is there a concurrent circular linked queue collection in java?

Given your revised question, this is what I would recommend:

For the initializing phase, use a concurrent Queue or Deque to accumulate the elements. Any implementation will do.

Once that phase is complete, create an instance of the following RoundRobin class from the queue object, and use its get method to the cycle through its elements.

public class RoundRobin <E> {
private final AtomicInteger next = new AtomicInteger(0);
private final E[] elements;

public RoundRobin(Collection<E> queue, Class<E> clazz) {
this.elements = queue.toArray(Array.newInstance(clazz, 0));
}

public E get() {
return elements[next.getAndIncrement() % elements.length];
}
}

The RoundRobin class is thread-safe and get is concurrent.

If the elements collection is mutated while you are constructing the RoundRobin, then the resulting RoundRobin state may not be the same as the final state of queue. From my reading of your stated requirements, that is OK.

Topic-like concurrent queue in plain Java

Basically you are talking about multiplexing, and no there isn't something in the standard lib but it is pretty simple to create one. Presuming that your clients aren't interested in messages published before they subscribe then you need a pool of queues for each consumer and publication simply offers the item to each queue:

public class Multiplexer<M> {
private final List<BlockingQueue<M>> consumers
= new CopyOnWriteArrayList<BlockingQueue<M>>();

public void publish(M msg) {
for (BlockingQueue<M> q : consumers) {
q.offer(msg);
}
}

public void addConsumer(BlockingQueue<M> consumer) {
consumers.add(consumer);
}
}

This version allows consumers to use whatever blocking queue implementation they might want. You could obviously provide a standard implementation and a nice interface for the client if you want.



Related Topics



Leave a reply



Submit