Producer/Consumer threads using a Queue
Java 5+ has all the tools you need for this kind of thing. You will want to:
- Put all your Producers in one
ExecutorService
; - Put all your Consumers in another
ExecutorService
; - If necessary, communicate between the two using a
BlockingQueue
.
I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:
final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
So the producers
submit directly to consumers
.
Producer-consumers design pattern with blocking Queue java
My application works fine. but I found that each thread that consumer and producer thread pool creates stay alive in waiting state even after it finished the send SMS task.
That's correct. Your consumer thread task is inside of:
while (true) {
It will never stop. There are many ways to fix this. One way is to put constant "done" message on the queue. You could either put the same number of done messages on the queue as there are consumer or use something like this logic:
private static final Message DONE_MESSAGE = new Message();
...
// producer puts it into the queue when it is done
sharedQueue.add(DONE_MESSAGE);
...
// consumer takes it from the queue and quits
Message message = sharedQueue.take();
if (message == DONE_MESSAGE) {
// put it back on the queue to stop the other consumer threads
sharedQueue.put(DONE_MESSAGE);
// quit the consumer thread
return;
}
is it a problem for long running application or can I use the the consumer and producer thread pool for all the time instead of creating new thread pool each time when initializingSendMessage(RawData trxn) method invoked?
I would recommend keeping the same thread pool running for the life of the application. The whole point is to reuse threads and not have them created and shutdown all of the time. Also, there's no point in having 2 fixed thread pools. One with a size of 6 or even just used a cached thread pool and it will create a thread for each of the producer and consumer jobs.
Producer Consumer using ReentrantLock and Queue
In your consumer you try to acquire lock:
if(lock.tryLock())
But tryLock
acquires the lock only if it is not held by another thread at the time of invocation. Just because you start producer first it is highly likely that it is already acquired by Producer. You try to do unlock
but next instruction is tryLock
(in a loop) so there is no any sort of yield for other thread. In other words Consumer thread almost has no chance to acquire lock because Producer thread reacquires it. And just because you have finite loop (only 20) your Consumer just finishes.
If you add
class Producer implements Runnable{
Queue<Integer> list;
Condition con;
ReentrantLock lock;
int size;
Producer(Queue q1, Condition con, ReentrantLock l1,int size)
{
this.list=q1;
this.con=con;
this.lock=l1;
this.size=size;
}
public void run()
{
for(int i =0;i<20;i++)
{
if(lock.tryLock())
{
while(list.size()==size)
{
try
{
con.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
list.add(i);
System.out.println("Producer "+ Thread.currentThread() +"added "+i+" to the List" );
con.signalAll();
lock.unlock();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
after
lock.unlock();
in your Producer loop you will give chance to Consumer thread to acquire lock and you will get result as expected.
Concurrent queue with only one consumer and producer threads for Java
The consumer is slower while the producer is producing because each time it reads, it experiences a cache miss, since a new element will always be present.
If all elements are already present, they can be fetched together, which improves throughput.
When busy-waiting consider using Thread.onSpinWait()
: while it adds latency, it also enables certain performance optimizations.
// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
Thread.onSpinWait();
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}
The JDK does not have queues optimized for SPSC (Single-Producer Single-Consumer) scenarios. There are libraries for that. You can use Agrona or JCTools. Implementing these is not easy.
// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);
Related Topics
Parse String to Date with Different Format in Java
How to Execute Cmd Commands via Java
Spring Data JPA - Zoneddatetime Format for JSON Serialization
Why Does Integer Division Code Give the Wrong Answer
Difference Between @Mock and @Injectmocks
Encode Base64 Cannot Find Symbol Error
How to Map Calculated Properties with JPA and Hibernate
Setting User Agent of a Java Urlconnection
Implementing a Simple File Download Servlet
Way to Get Number of Digits in an Int
Why Do We Use Autoboxing and Unboxing in Java
How to Stop a Java Thread Gracefully
How to Reference Javafx Fxml Files in Resource Folder
Is There a Performance Difference Between a for Loop and a For-Each Loop