Which concurrent Queue implementation should I use in Java?
Basically the difference between them are performance characteristics and blocking behavior.
Taking the easiest first, ArrayBlockingQueue
is a queue of a fixed size. So if you set the size at 10, and attempt to insert an 11th element, the insert statement will block until another thread removes an element. The fairness issue is what happens if multiple threads try to insert and remove at the same time (in other words during the period when the Queue was blocked). A fairness algorithm ensures that the first thread that asks is the first thread that gets. Otherwise, a given thread may wait longer than other threads, causing unpredictable behavior (sometimes one thread will just take several seconds because other threads that started later got processed first). The trade-off is that it takes overhead to manage the fairness, slowing down the throughput.
The most important difference between LinkedBlockingQueue
and ConcurrentLinkedQueue
is that if you request an element from a LinkedBlockingQueue
and the queue is empty, your thread will wait until there is something there. A ConcurrentLinkedQueue
will return right away with the behavior of an empty queue.
Which one depends on if you need the blocking. Where you have many producers and one consumer, it sounds like it. On the other hand, where you have many consumers and only one producer, you may not need the blocking behavior, and may be happy to just have the consumers check if the queue is empty and move on if it is.
Concurrent queue with only one consumer and producer threads for Java
The consumer is slower while the producer is producing because each time it reads, it experiences a cache miss, since a new element will always be present.
If all elements are already present, they can be fetched together, which improves throughput.
When busy-waiting consider using Thread.onSpinWait()
: while it adds latency, it also enables certain performance optimizations.
// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
Thread.onSpinWait();
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}
The JDK does not have queues optimized for SPSC (Single-Producer Single-Consumer) scenarios. There are libraries for that. You can use Agrona or JCTools. Implementing these is not easy.
// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);
[Java]: Which kind of queue to use for my scenario?
Of the three you listed, the only which is actually a class is ArrayBlockingQueue. A blocking queue is different from a normal queue in that, if an object attempts to remove the front item, it will pause execution until there is an available item to remove.
"BlockingQueue" and "Queue" are just a interfaces; you can't instantiate them. Types of BlockingQueue that you can instantiate are ArrayBlockingQueue, LinkedBlockingQueue, etc.
Personally, I would use a LinkedBlockingQueue for this application - the advantage of using a linked list is that there's no set max capacity, and the memory usage decreases as the queue shrinks.
Is there a concurrent circular linked queue collection in java?
Given your revised question, this is what I would recommend:
For the initializing phase, use a concurrent Queue
or Deque
to accumulate the elements. Any implementation will do.
Once that phase is complete, create an instance of the following RoundRobin
class from the queue object, and use its get
method to the cycle through its elements.
public class RoundRobin <E> {
private final AtomicInteger next = new AtomicInteger(0);
private final E[] elements;
public RoundRobin(Collection<E> queue, Class<E> clazz) {
this.elements = queue.toArray(Array.newInstance(clazz, 0));
}
public E get() {
return elements[next.getAndIncrement() % elements.length];
}
}
The RoundRobin
class is thread-safe and get
is concurrent.
If the elements
collection is mutated while you are constructing the RoundRobin
, then the resulting RoundRobin
state may not be the same as the final state of queue
. From my reading of your stated requirements, that is OK.
Topic-like concurrent queue in plain Java
Basically you are talking about multiplexing, and no there isn't something in the standard lib but it is pretty simple to create one. Presuming that your clients aren't interested in messages published before they subscribe then you need a pool of queues for each consumer and publication simply offers the item to each queue:
public class Multiplexer<M> {
private final List<BlockingQueue<M>> consumers
= new CopyOnWriteArrayList<BlockingQueue<M>>();
public void publish(M msg) {
for (BlockingQueue<M> q : consumers) {
q.offer(msg);
}
}
public void addConsumer(BlockingQueue<M> consumer) {
consumers.add(consumer);
}
}
This version allows consumers to use whatever blocking queue implementation they might want. You could obviously provide a standard implementation and a nice interface for the client if you want.
Related Topics
Differencebetween Local and Instance Variables in Java
Giving 'Java.Library.Path' in Netbeans for .Dll/.So Files
Why Do We Have to Override the Equals() Method in Java
How to Set Hard Limit on a Jcomponent When Setmaximumsize() and Setprefferedsize() Don't Work
How to Handle Simultaneous Key Presses in Java
Is It Not Possible to Have a Code in the Background Who Will Be Called Every 24H
How to Call Custom Database Functions with Hibernate
How to Sort Two Arrays in Relation to Each Other
Cannot Create Jdbc Driver of Class ' ' for Connect Url 'Null':I Do Not Understand This Exception
Where Is Java Installed on MAC Os X
Problems with Newline in Graphics2D.Drawstring
How Does the Bitwise & (And) Work in Java
Can Anyone Recommend a Simple Java Web-App Framework
How to Create a .Jar File or Export Jar in Intellij Idea (Like Eclipse Java Archive Export)
Convert Arraylist into 2D Array Containing Varying Lengths of Arrays