Producer/Consumer Threads Using a Queue

Producer/Consumer threads using a Queue

Java 5+ has all the tools you need for this kind of thing. You will want to:

  1. Put all your Producers in one ExecutorService;
  2. Put all your Consumers in another ExecutorService;
  3. If necessary, communicate between the two using a BlockingQueue.

I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

So the producers submit directly to consumers.

Producer-consumers design pattern with blocking Queue java

My application works fine. but I found that each thread that consumer and producer thread pool creates stay alive in waiting state even after it finished the send SMS task.

That's correct. Your consumer thread task is inside of:

 while (true) {

It will never stop. There are many ways to fix this. One way is to put constant "done" message on the queue. You could either put the same number of done messages on the queue as there are consumer or use something like this logic:

private static final Message DONE_MESSAGE = new Message();
...
// producer puts it into the queue when it is done
sharedQueue.add(DONE_MESSAGE);
...
// consumer takes it from the queue and quits
Message message = sharedQueue.take();
if (message == DONE_MESSAGE) {
// put it back on the queue to stop the other consumer threads
sharedQueue.put(DONE_MESSAGE);
// quit the consumer thread
return;
}

is it a problem for long running application or can I use the the consumer and producer thread pool for all the time instead of creating new thread pool each time when initializingSendMessage(RawData trxn) method invoked?

I would recommend keeping the same thread pool running for the life of the application. The whole point is to reuse threads and not have them created and shutdown all of the time. Also, there's no point in having 2 fixed thread pools. One with a size of 6 or even just used a cached thread pool and it will create a thread for each of the producer and consumer jobs.

Producer Consumer using ReentrantLock and Queue

In your consumer you try to acquire lock:

if(lock.tryLock())

But tryLock acquires the lock only if it is not held by another thread at the time of invocation. Just because you start producer first it is highly likely that it is already acquired by Producer. You try to do unlock but next instruction is tryLock (in a loop) so there is no any sort of yield for other thread. In other words Consumer thread almost has no chance to acquire lock because Producer thread reacquires it. And just because you have finite loop (only 20) your Consumer just finishes.

If you add

class Producer implements Runnable{
Queue<Integer> list;
Condition con;
ReentrantLock lock;
int size;

Producer(Queue q1, Condition con, ReentrantLock l1,int size)
{
this.list=q1;
this.con=con;
this.lock=l1;
this.size=size;
}

public void run()
{
for(int i =0;i<20;i++)
{
if(lock.tryLock())
{
while(list.size()==size)
{
try
{
con.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
list.add(i);
System.out.println("Producer "+ Thread.currentThread() +"added "+i+" to the List" );
con.signalAll();
lock.unlock();

try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

after

 lock.unlock();

in your Producer loop you will give chance to Consumer thread to acquire lock and you will get result as expected.

Concurrent queue with only one consumer and producer threads for Java

The consumer is slower while the producer is producing because each time it reads, it experiences a cache miss, since a new element will always be present.
If all elements are already present, they can be fetched together, which improves throughput.

When busy-waiting consider using Thread.onSpinWait(): while it adds latency, it also enables certain performance optimizations.

// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
Thread.onSpinWait();
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}

The JDK does not have queues optimized for SPSC (Single-Producer Single-Consumer) scenarios. There are libraries for that. You can use Agrona or JCTools. Implementing these is not easy.

// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);


Related Topics



Leave a reply



Submit