Inter Thread Communication in Java

Java Inter Process communication and Inter Thread communication?

The fundamental difference is that threads live in the same address spaces, but processes live in the different address spaces. This means that inter-thread communication is about passing references to objects, and changing shared objects, but processes is about passing serialized copies of objects.

In practice, Java interthread communication can be implemented as plain Java method calls on shared object with appropriate synchronization thrown in. Alternatively, you can use the new concurrency classes to hide some of the nitty-gritty (and error prone) synchronization issues.

By contrast, Java interprocess communication is based at the lowest level on turning state, requests, etc into sequences of bytes that can be sent as messages or as a stream to another Java process. You can do this work yourself, or you can use a variety of "middleware" technologies of various levels of complexity to abstract away the implementation details. Technologies that may be used include, Java object serialization, XML, JSON, RMI, CORBA, SOAP / "web services", message queing, and so on.

At a practical level, interthread communication is many orders of magnitude faster than interprocess communication, and allows you to do many things a lot more simply. But the downside is that everything has to live in the same JVM, so there are potential scalability issues, security issues, robustness issues and so on.

How to do 3 thread communication in JAVA?

You don't necessarily need to create 3 separate Thread classes for 3 users. You also don't need 3 separate methods for each user. It's hard to read, maintain, and debug.

I wrote a very simple program for you. In your application, synchronised is used to wrap the whole method. It's hard to read, and it's unclear why you synchronised the entire method body. In my example, there is a single point where synchronisation is required and it's well defined: when a user wants to say something to the chat.

Have a look.

class Application {

public static void main(String[] args) {
final Chat chat = new Chat();

chat.registerUser(new User("user1", chat));
chat.registerUser(new User("user2", chat));
chat.registerUser(new User("user3", chat));
}

}

class Chat {

private final Scanner scanner = new Scanner(System.in);
private final List<User> users = new ArrayList<>();

private void registerUser(User user) {
users.add(user);
System.out.format("'%s' connected to the chat.\n", user);
user.start();
}

public void sendMessage(User user) {
final String reply = scanner.nextLine();
System.out.format("%s: %s\n", user, reply);

if (reply.equalsIgnoreCase("bye")) {
users.forEach(Thread::interrupt);
System.out.println("The chat is over.");
} else {
notifyAll();
}
}

}

class User extends Thread {

private final String id;
private final Chat chat;

public User(String id, Chat chat) {
this.id = id;
this.chat = chat;
}

public void run() {
while (true) {
try {
synchronized (chat) {
chat.sendMessage(this);
chat.wait();
}
} catch (InterruptedException e) {
throw new IllegalArgumentException("Someone left. I am done as well.");
}
}
}

@Override
public String toString() {
return id;
}

}

An example would be

'user1' connected to the chat.
'user2' connected to the chat.
'user3' connected to the chat.
hi
user1: hi
hello
user3: hello
what's up?
user2: what's up?
nothing
user3: nothing
what?
user1: what?
bye
user2: bye
The chat is over.

To read:

the producer–consumer problem

What's the inter-thread communication mechanism that can await and release underlying thread resource at the same time

The only way to release underlying thread resource is to completely return from the main method of the task (usually Runnable::run). To await at the same time, the event producer should be subscribed in asynchronous way. Not every producer has asynchronous interface. CompletbleFuture has (method whenComplete), but CountDownLatch has not. However, you can extend CountDownLatch with asynchronous finctionality, subscribe to its completion, return from run() and wait. I did so in my DF4J library: AsyncCountDownLatch.java

Interthread communication between producer and consumer threads?

One way I am thinking, there could be other better ways also:

@Override
public void run() {
String name = Thread.currentThread().getName();
while (true) {

while (queue.peek() == null) {
//some sleep time
}

synchronized (lock) {
while (queue.peek() != null && !name.equals(String.valueOf(queue.peek().intValue() % 2 ))) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(queue.peek() != null) {
try {
System.out.println(name + ",consumed," + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.notify();
}
}
}

Another Way: To have anotherLock that will be notified by producer thread whenever element is added to queue.

@Override
public void run() {
String name = Thread.currentThread().getName();
while (true) {

synchronized (anotherLock) {
while (queue.peek() == null) {
anotherLock.wait();
}
}

synchronized (lock) {
while (queue.peek() != null && !name.equals(String.valueOf(queue.peek().intValue() % 2 ))) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(queue.peek() != null) {
try {
System.out.println(name + ",consumed," + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.notify();
}
}
}

Why do we need notify() for inter thread communication

Good question. Will point you to take a look at the Thread State Class.

A thread that calls the Object.notify method enables a thread that previously called Object.wait is now enabled to be scheduled by the thread scheduler. In parlance, the thread that was waiting is now "runnable". Although it is "runnable", it is not "running".

It can only continue running when the thread invoking notify releases the lock - one way is when it exits out of the synchronized block.

There are a lot of schematics on the web on the Thread States. Some of them are completely incorrect or confusing since they introduce terminology not in the official docs. Here is one that makes sense to me.

Inter-thread communication in Java

I've tried your test in JUnit and I get the opposite from what you experienced:

  1. When Thread.sleep is commented away, the test runs fine and it prints "Resumed in <>"
  2. When Thread.sleep is in the code (actually executed), the JVM terminates and the "Resumed in ..." is not printed.

The reason is that JUnit terminates the VM when the tests are complete. It does a System.exit();. You're lucky that you get the full output in case 1., because it's printed in a separate thread, and JUnit is not waiting for that thread.

If you want to make sure that the Thread is complete before the end of the test method, either you need to have your API wait for the thread, or you need to have the test wait for the thread.

If your stoppingViaMonitorWait method returns the Thread it creates, you can wait in the test.

@Test
public void testStoppingViaMonitorWait() throws Exception {
Thread x = cut.stoppingViaMonitorWait();
x.join();
}

Another option is that you inject a thread pool (an instance of ExecutorService) into the class that you're testing, have it schedule its thread on the pool (that's nicer in any case), and in your test method you can call ExecutorService.awaitTermination.

inter thread communication in java

That depends on the nature of the communication.

  • Is it duplex (ie A talks to B and B talks to A)?
  • Is it communication of data or communication of completion?

and so on.

The simplest and most advisable form of inter-thread communication is simply to wait for the completion of other threads. That's most easily done by using Future:

ExecutorService exec = Executors.newFixedThreadPool(50);
final Future f = exec.submit(task1);
exec.submit(new Runnable() {
@Override
public void run() {
f.get();
// do stuff
}
});

The second task won't execute until the first completes.

Java 5+ has many concurrent utilities for dealing with this kind of thing. This could mean using LinkedBlockingQueues, CountDownLatch or many, many others.

For an in-depth examination of concurrency Java Concurrency in Practice is a must-read.

Interthread communication Producer Consumer Problem

You're issue is a pretty basic flow issue: When 'notify' is called and the thread that was waiting starts again, the method completes and the thread stops running.

Note that both of your methods are synchronized on the same object, so they will never run at the same time. Eg. one thread will get the monitor, and increment/decrement the box until it finally waits. Then the other thread will go, until it waits.

You have a few other issues that are pretty common when using wait and notify.

    synchronized void withdrawBoxConsumer() {
while( !Thread.currentThread().isInterrupted() ) {
if(box > 0){
box --;
System.out.println("Took one box now boxes left "+ box);
}
while(box == 0) {
System.out.println("Please put more boxes");
notifyAll();
try{
wait();
}catch(Exception e){
throw new RuntimeException(e);
}
}
}
}

synchronized void putBoxProducer() {
while( !Thread.currentThread().isInterrupted() ) {
if(box < 10){
box ++;
System.out.println("Put one box now boxes are "+ box);
}
while(box == 10) {
System.out.println("Please Consume boxes");
notifyAll();
try{
wait();
}catch(Exception e){
throw new RuntimeException(e);
}
}
}
}
  • I made it non-recursive because the way you were doing it, the stack will overflow.
  • The wait condition is in a loop because of spurious wake-ups.
  • I switched to notifyAll notify would only wake one waiting thread, in this case it should be okay, but it is better to be safe.
  • box should ideally be a concurrent class or volatile, but since you're always working in a synchronized method it should be ok.
  • Likewise box++ and box-- are race conditions.
  • e.fillInStackTrace() is not what you want to use.


Related Topics



Leave a reply



Submit