I/O in Concurrent Program

Concurrent IO on single core Java 11

"The default threadpool uses core# - 1, which, on a single core machine, that I am using, has NO concurrency." - Why? A concurrent program can very well run on a single core machine. It has nothing to do with parallelism.

When a Java thread is waiting for I/O, the kernel's scheduler will move it to the wait queue, and some other thread that requires CPU time will be run. So you can create a thread pool with as many threads as you want, and the scheduler will take care of the concurrency. And this will work fine even on a single core machine.

The only limit here is the number of threads you will create. The default stack size of a thread varies b/w 512K to 1M. So this does not scale very well, and at some point, you'll run out of threads. On my system, I could create around 5k of them. Languages like Go manage this by multiplexing multiple goroutines on a limited number of kernel threads. This requires scheduling by the Go runtime.

If you want to alleviate this, you should look into NIO. I wrote a quick program that you can use to find out how many concurrent connections you can actually support this way. This should run as-is after the imports:

public class ConcurrentBlockingServer {

private ExecutorService pool = Executors.newCachedThreadPool();

public static void main(String[] args) {
ConcurrentBlockingServer bs = new ConcurrentBlockingServer();
try {
bs.listen();
} catch (IOException e) {
e.printStackTrace();
}
}

private void listen() throws IOException {
int connectionId = 0;
ServerSocket ss = new ServerSocket(8080);
while (true) {
Socket s = ss.accept(); // blocking call, never null
System.out.println("Connection: " + (++connectionId));
process(s);
}
}

private void process(Socket s) {
Runnable task =
() -> {
try (InputStream is = s.getInputStream();
OutputStream os = s.getOutputStream()) {
int data;
// -1 is EOF, .read() is blocking
while (-1 != (data = is.read())) {
os.write(flipCase(data));
os.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
};
pool.submit(task);
}

private int flipCase(int input) {
if (input >= 65 && input <= 90) {
return input + 32;
} else if (input >= 97 && input <= 122) {
return input - 32;
} else {
return input;
}
}
}

Run this program and see how many connections you could make.

public class RogueClient {

private static long noClients = 9000;

public static void main(String[] args) {
for (int i = 0; i < noClients; i++) {
try {
new Socket("localhost", 8080);
System.out.println("Connection No: " + i);
} catch (IOException e) {
System.err.println("Exception: " + e.getMessage() + ", for connection: " + i);
}
}
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Edit: The pool size should depend upon the nature of your program. If it's an I/O bound task, you could go ahead and create many threads. But for CPU bound programs, the number of threads should be equal to the number of cores.

Java Socket Programming with concurrent I/O

This tutorial should show you the basics on concurrent servers. This is another tutorial you might want to look at as well. The second deals more with threading and concurrency.

Haskell -- Concurrent I/O Routing

Implementation always matters. It turns out that runJack called waitForBreak in Sound.JACK, which has the definition:

 waitForBreak :: IO ()
waitForBreak =
let go = getLine >> go
in go

Using collectInput in its place solved the problem.

Why do we need IO?

That wrapper exists. It’s called Prelude.interact. I use the Data.ByteString versions of it often. The wrapper to a pure String -> String function works, because strings are lazily-evaluated singly-linked lists that can process each line of input as it’s read in, but singly-linked lists of UCS-4 characters are a very inefficient data structure.

You still need to use IO for the wrapper because the operations depend on the state of the universe and need to be sequenced with the outside world. In particular, if your program is interactive, you want it to respond to a new keyboard command immediately, and run all the OS system calls in sequence, not (say) process all the input and display all the output at once when it’s ready to quit the program.

A simple program to demonstrate this is:

module Main where
import Data.Char (toUpper)

main :: IO ()
main = interact (map toUpper)

Try running this interactively. Type control-D to quit on Linux or the MacOS console, and control-Z to quit on Windows.

As I mentioned before, though String is not an efficient data structure at all. For a more complicated example, here is the Main module of a program I wrote to normalize UTF-8 input to NFC form.

module Main ( lazyNormalize, main ) where

import Data.ByteString.Lazy as BL ( fromChunks, interact )
import Data.Text.Encoding as E (encodeUtf8)
import Data.Text.Lazy as TL (toChunks)
import Data.Text.Lazy.Encoding as LE (decodeUtf8)
import Data.Text.ICU ( NormalizationMode (NFC) )
import TextLazyICUNormalize (lazyNormalize)

main :: IO ()
main = BL.interact (
BL.fromChunks .
map E.encodeUtf8 .
TL.toChunks . -- Workaround for buffer not always flushing on newline.
lazyNormalize NFC .
LE.decodeUtf8 )

This is an Data.Bytestring.Lazy.interact wrapper around a Data.Text.Lazy.Text -> Data.Text.Lazy.Text function, lazyNormalize with the NormalizationMode constant NFC as its curried first argument. Everything else just converts from the lazy ByteString strings I use to do I/O to the lazy Text strings the ICU library understands, and back. You will probably see more programs written with the & operator than in this point-free style.

How to synchronize file I/O of many independent applications on Linux?

I believe there are three possible solutions

1) Make all programs to use a custom file I/O library that implement the features that you need. This solution may not be feasible if you do not have access to the source code. You may also consider to use mmap so that changes are written to memory. You use a background process to synchronize dirty pages to existing or new files.

2) Replace standard C/C++ libraries (such as libc.so) that affected programs would use. You could use ldd to find out library dependency. You need to update source code for standard C/C++ to implement features that you need. This may be too difficult for most people.

3) Create your file system. You may refer to many articles in the internet, such as https://kukuruku.co/post/writing-a-file-system-in-linux-kernel/. This is the best and cleanest solution.

Hope it helps.



Related Topics



Leave a reply



Submit