How to Use Concurrentlinkedqueue

How to use ConcurrentLinkedQueue?

No, the methods don't need to be synchronized, and you don't need to define any methods; they are already in ConcurrentLinkedQueue, just use them. ConcurrentLinkedQueue does all the locking and other operations you need internally; your producer(s) adds data into the queue, and your consumers poll for it.

First, create your queue:

Queue<YourObject> queue = new ConcurrentLinkedQueue<YourObject>();

Now, wherever you are creating your producer/consumer objects, pass in the queue so they have somewhere to put their objects (you could use a setter for this, instead, but I prefer to do this kind of thing in a constructor):

YourProducer producer = new YourProducer(queue);

and:

YourConsumer consumer = new YourConsumer(queue);

and add stuff to it in your producer:

queue.offer(myObject);

and take stuff out in your consumer (if the queue is empty, poll() will return null, so check it):

YourObject myObject = queue.poll();

For more info see the Javadoc

EDIT:

If you need to block waiting for the queue to not be empty, you probably want to use a LinkedBlockingQueue, and use the take() method. However, LinkedBlockingQueue has a maximum capacity (defaults to Integer.MAX_VALUE, which is over two billion) and thus may or may not be appropriate depending on your circumstances.

If you only have one thread putting stuff into the queue, and another thread taking stuff out of the queue, ConcurrentLinkedQueue is probably overkill. It's more for when you may have hundreds or even thousands of threads accessing the queue at the same time. Your needs will probably be met by using:

Queue<YourObject> queue = Collections.synchronizedList(new LinkedList<YourObject>());

A plus of this is that it locks on the instance (queue), so you can synchronize on queue to ensure atomicity of composite operations (as explained by Jared). You CANNOT do this with a ConcurrentLinkedQueue, as all operations are done WITHOUT locking on the instance (using java.util.concurrent.atomic variables). You will NOT need to do this if you want to block while the queue is empty, because poll() will simply return null while the queue is empty, and poll() is atomic. Check to see if poll() returns null. If it does, wait(), then try again. No need to lock.

Finally:

Honestly, I'd just use a LinkedBlockingQueue. It is still overkill for your application, but odds are it will work fine. If it isn't performant enough (PROFILE!), you can always try something else, and it means you don't have to deal with ANY synchronized stuff:

BlockingQueue<YourObject> queue = new LinkedBlockingQueue<YourObject>();

queue.put(myObject); // Blocks until queue isn't full.

YourObject myObject = queue.take(); // Blocks until queue isn't empty.

Everything else is the same. Put probably won't block, because you aren't likely to put two billion objects into the queue.

java.util.ConcurrentLinkedQueue

You're essentially asking three different questions (two of them explicitly and one implicitly.) Here they are, with my answers:

1. Do I need to do my own synchronization if I use java.util.ConcurrentLinkedQueue?

Atomic operations on the concurrent collections are synchronized for you. In other words, each individual call to the queue is guaranteed thread-safe without any action on your part. What is not guaranteed thread-safe are any operations you perform on the collection that are non-atomic.

For example, this is threadsafe without any action on your part:

queue.add(obj);

or

queue.poll(obj);

However; non-atomic calls to the queue are not automatically thread-safe. For example, the following operations are not automatically threadsafe:

if(!queue.isEmpty()) {
queue.poll(obj);
}

That last one is not threadsafe, as it is very possible that between the time isEmpty is called and the time poll is called, other threads will have added or removed items from the queue. The threadsafe way to perform this is like this:

synchronized(queue) {
if(!queue.isEmpty()) {
queue.poll(obj);
}
}

Again...atomic calls to the queue are automatically thread-safe. Non-atomic calls are not.

2. Am I guaranteed not to lose calls to java.util.ConcurrentLinkedQueue if there are 1000 simultaneous requests?

Because this is an unbounded implementation you are guaranteed that no matter how many simultaneous requests to make, the queue will not lose those requests (because of the queue's concurrency...you might run out of memory or some such...but the queue implementation itself will not be your limiting factor.) In a web application, there are other opportunities to "lose" requests, but the synchronization (or lack thereof) of the queue won't be your cause.

3. Will the java.util.ConcurrentLinkedQueue perform well enough?

Typically, we talk about "correctness" when we talk about concurrency. What I mean, is that Concurrent classes guarantee that they are thread-safe (or robust against dead-lock, starvation, etc.) When we talk about that, we aren't making any guarantees about performance (how fast calls to the collection are) - we are only guaranteeing that they are "correct."

However; the ConcurrentLinkedQueue is a "wait-free" implementation, so this is probably as performant as you can get. The only way to guarantee load performance of your servlet (including the use of the concurrent classes) is to test it under load.

How to use ConcurrentLinkedQueue in Scala?

You may just use ConcurrentLinkedQueue instead of Buffer as it's also mutable:

scala> import java.util.concurrent._
import java.util.concurrent._

scala> val nodes = Array.fill(10){new ConcurrentLinkedQueue[Int]()}
nodes: Array[java.util.concurrent.ConcurrentLinkedQueue[Int]] = Array([], [], [], [], [], [], [], [], [], [])

scala> def addMutualEdge(i: Int)(j: Int) {nodes(i).add(j); nodes(j).add(i)}
addMutualEdge: (i: Int)(j: Int)Unit

It's fastest option as this queue is based on CAS-operations, so no blocking there (in comparision with SynchronizedBuffer). Another option is to synchronize operations directly:

scala> val nodes = Array.fill[mutable.Buffer[Int]](10){new ArrayBuffer[Int]()}
nodes: Array[scala.collection.mutable.Buffer[Int]] = Array(ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer())

scala> def addMutualEdge(i: Int)(j: Int) = this.synchronized{nodes(i) += j; nodes(j) += i}
addMutualEdge: (i: Int)(j: Int)scala.collection.mutable.Buffer[Int]

You can also use java's Collections.synchronizedList(...) in combination with scala.collection.JavaConverters.asScala

import java.util._
import scala.collection.JavaConverters._
scala> val nodes = Array.fill(10){Collections.synchronizedList(new ArrayBuffer[Int]().asJava).asScala}
nodes: Array[scala.collection.mutable.Buffer[Int]] = Array(Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer())

Or you can use AtomicReferenceArray:

implicit class RichAtomic[T](a: AtomicReferenceArray[List[T]]) { def apply(i: Int) = (a,i); def update(i: Int, e: List[T]) = a.set(i, e)}
implicit class RichList[T](a: (AtomicReferenceArray[List[T]], Int)) { def ::=(e: T) = while({val lst = a._1.get(a._2);!a._1.compareAndSet(a._2, lst, e :: lst)}){}}
implicit def toList[T](a: (AtomicReferenceArray[List[T]], Int)) = a._1.get(a._2)

val nodes = new AtomicReferenceArray(Array.fill[List[Int]](10){Nil})

scala> def addMutualEdge(i: Int)(j: Int) = {nodes(i) ::= j; nodes(j) ::= i}
addMutualEdge: (i: Int)(j: Int)Unit

Implicits used to provide simillar interface as for just Array. Note, that ::= adds element to the start of list.

How to iterate on concurrentLinkedQueue by multiple threads?

I think you should not iterate but create 4 thread each polling data from the queue so that polled data will be deleted or in other words consumed

// your queue
ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();

// create 4 Threads
for (int i = 0; i < 4; i++) {
new Thread(() -> {
while (!concurrentLinkedQueue.isEmpty()) {
// consume element
var element = concurrentLinkedQueue.poll();

// do something with element
// here
}
}).start();
}

Is this ConcurrentLinkedQueue/wait/notify algorithm correct?

The answer to your question is yes. The consumer will see all of the updates.

However:

  1. This is not a sensible implementation. It looks like you are using the polling approach with wait / notify so that you don't need a busy loop to wait for the queue to become nonempty. But a better (simpler, more efficient) approach would be to use a BlockingQueue instead and use the blocking get() method.

    For what it is worth, you are negating any possible scalability advantages of using ConcurrentLinkedQueue by using the queue object as a mutex to do wait / notify signalling. (This would also apply if you used a different object as the mutex. The problem is the mutual exclusion!)

  2. If you are going to do it this way (for whatever reason), a notify() would be preferable to a notifyAll(). Only one consumer is going to be able to consume that (single) element you added to the queue. Waking up all of the consumers is unnecessary.

  3. It is not a good idea to extend Thread. A better way is to put your business logic into a Runnable (or a lambda) which you pass as a Thread constructor parameter. Read: "implements Runnable" vs "extends Thread" in Java


You also were interested in:

... what is exactly the imprecision when determining the size of ConcurrentLinkedQueue.

The answer to that is in the javadoc for ConcurrentLinkedQueue:

"Beware that, unlike in most collections, this method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires an O(n) traversal."

"Additionally, if elements are added or removed during execution of this method, the returned result may be inaccurate. Thus, this method is typically not very useful in concurrent applications."

In other words, ConcurrentLinkedQueue counts the queue elements, and does not give an accurate answer if elements are being added and removed at the same time.

Concurrent linked queue use CAS

For reference: example, code explanations of the algorithm are available on this page from the IBM Developer website.

The page linked above will give you the purposes of each operation A, B, C and D, and why they are required to allow what is referenced therein as constructive interference between threads concurrently updating the tail of the queue.

Your change corrupts the algorithm. The else clause must not be executed when C is unsuccessful. The role it plays instead is to reproduce the operation D when a thread intercepts an uncompleted [*] update of the tail from another thread.

[*]: That is, after C but before D.

To understand why and when it fails, consider the following scenario.

while (true) {
Node<E> curTail = tail.get();
Node<E> residue = curTail.next.get();

/* (i) */

if (curTail.next.compareAndSet(null, newNode)) /* C */ {
tail.compareAndSet(curTail, newNode) /* D */ ;
return true;
} else {
tail.compareAndSet(curTail, residue) /* B */;
}
}
  • Two threads T1 and T2 start simultaneously at position (i). Their stack holds the same references for curTail and residue. residue is supposed null (i.e. the queue is supposed to be in quiescent state).

  • T1 completes the first CAS C successfully. It does not execute D yet.

  • T2 fails the CAS C, enters the else, executes CAS B successfully, since the reference tail hasn't changed.

  • T1 fails the CAS D, since the reference assigned to tail has been set to null by C. There is no fallback, and the method exits. Element from T2 is not inserted.

  • Second attempt for T1, the reference assigned to tail is null and curTail.next throws a NullPointerException. The data structure is corrupted.

To summarize, A and B works in pairs. They exist to ensure interfering threads can help the convergence of the queue to a normal state and recover from a queue left in an intermediate state. Imagine a thread executes C but gets killed before having a chance to run D. Without A and B, the queue would be forever corrupted. A and B ensures the state can be reconstructed and the unfinished insertion completed.

ConcurrentLinkedQueue Code Explanation

The ConcurrentLinkedQueue allows concurrent modification of the internal list while traversing it. This implies that the node you are looking at could have been removed concurrently. To detect such situations the next pointer of a removed node is changed to point to itself. Look at updateHead (L302) for details.

Split a ConcurrentLinkedQueue into half using Spliterator

You can't split it in half in general, I mean to split in half this queue must have a size at each point in time. And while CLQ does have a size() method, it's documentation is pretty clear that this size requires O(n) traversal time and because this is a concurrent queue it's size might not be accurate at all (it is named concurrent for a reason after all). The current Spliterator from CLQ splits it in batches from what I can see.

If you want to split it in half logically and process the elements, then I would suggest moving to some Blocking implementation that has a drainTo method, this way you could drain the elements to an ArrayList for example, that will split much better (half, then half again and so on).

On a side note, why would you want to do the processing in different threads yourself? This seems very counter-intuitive, the Spliterator is designed to work for parallel streams. Calling trySplit once is probably not even enough - you have to call it until it returns null... Either way doing these things on your own sounds like a very bad idea to me.



Related Topics



Leave a reply



Submit