Node.Js Piping the Same Readable Stream into Multiple (Writable) Targets

Node.js Piping the same readable stream into multiple (writable) targets

You have to create duplicate of the stream by piping it to two streams. You can create a simple stream with a PassThrough stream, it simply passes the input to the output.

const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;

const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();

a.stdout.pipe(b);
a.stdout.pipe(c);

let count = 0;
b.on('data', function (chunk) {
count += chunk.length;
});
b.on('end', function () {
console.log(count);
c.pipe(process.stdout);
});

Output:

8
hi user

Unable to use one readable stream to write to two different targets in Node JS

You have 2 alternatives, which depends on how your software is designed.

If possible, I would avoid to execute two transform operations on the same stream in the same "context", eg: an API endpoint. I would rather separate those two different tranform so they do not work on the same input stream.

If that is not possible or would require too many changes, the solution is to fork the input stream and the pipe it into two different Writable. I normally use Highland.js fork for these tasks.

Please also see my comments on how to properly handle streams with async/await to check when the write operation is finished.

Node.js: appending to readable stream WHILE it is piped to a writable stream?

Does this give you answer

const Stream = require('stream')

const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}

readableStream.pipe(writableStream)

readableStream.push('ping!')
readableStream.push('pong!')

writableStream.end()

Piping to same Writable stream twice via different Readable stream

Found the reason why was the second piping to the writable stream w wasn't working.

When the first .pipe(w) finishes, the writable at w is also automatically closed.

So instead of using s.pipe(zlib.createGzip()).pipe(w); I should have used:

s.pipe(zlib.createGzip()).pipe(w, {end: false});

Where the {end: false} is passed to pipe() as the second argument, stating that the writable should not get closed when the piping is done.

Multiple listeners reading from the same stream in NodeJS

  1. When the readable event is emitted, both the listeners listen. However, the first to call the read() function would get the data and second one would get an empty buffer because it's the same input stream being read twice and only a single chunk was present.

  2. When the data event is emitted the listener receives the chunk of data being read from the stream. So all the listeners receive the same data.

[EDIT] In detail how it works:

All readable streams begin in 2 modes: flowing and paused. All Readable streams begin in paused mode by default but they can be switched to the flowing mode using any of the three methods:

1. Attaching a 'data' event handler to the stream

2. Calling the stream.resume() method

3. Calling the stream.pipe() method to send the data to a Writable.

When you use any of the above method the stream starts to flow. It doesn't care if the data listeners are attached to the stream and there's a possibility of losing the data. Internally, the read() method is called on the stream and whatever data that is being accumulated in the internal buffer is read and emitted to the listeners. The memory usage is quite low.

When you attach a readable listener on your stream, it takes priority over the data listener and hence your stream remains in the paused mode. In the paused mode, you've to explicitly read the data from the internal buffer by calling the read() method. When the readable data is available it keeps on getting accumulated in the internal buffer until the read() method is called explicitly or the stream is resumed. You can specify the size in bytes of the chunk to be read from the internal buffer or all the available data is returned. When read() is called data event is also emitted with the chunk of data read. After consuming this data, the internal buffer is emptied. So when you have multiple readable events attached and try to consume from the same internal buffer, you are not able to get the same data multiple times.

My suggestion to you would be have just one readable listener and multiple data listeners. Having a readable would give you the flexibility to read when you want without missing any data. And with the data event handlers, you would be able to get that data in all the handlers.

Piping data to writable stream that is not ready to receive data yet

Note, I'm using an interval here to mimic the writer being able to read or not. You can do this any way you want ie if the writer returns false you would update the state to start buffering etc. I think the last line is what you want ie

r.pipe(b).pipe(w);

This reads as follows

readStrem.pipe(transformBbuffer).pipe(writeStream);

The example code, there are some changes we can make to buffer all data. I'll describe after the code. Everything you need to know about streams and more are in the docs, I think they could do with more complete examples but they're pretty good as is...

https://nodejs.org/api/stream.html#stream_class_stream_transform_1

This the code.

var fs     = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff = 0;
var DRAIN_ME = 0;

var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');

var BufferStream = function () {
stream.Transform.apply(this, arguments);
this.buffer = [];
};

util.inherits(BufferStream, stream.Transform);

var intId;
intId = setInterval(function(){
if(check_buff % 3 == 0) {
DRAIN_ME = 1;
return;
}
DRAIN_ME = 0;
},10);

BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
while(DRAIN_ME > 0 && this.buffer.length > 0) {
this.push(this.buffer.shift());
}
console.log(chunk.length);
console.log(this.buffer.length);
done();
};

var b = new BufferStream();
b.on('end', function(chunk) {
clearInterval(intId);
});
r.pipe(b).pipe(w);

I am looking for the canonical way to implement a transform/through
stream, that buffers all data until pipe is call on it.

Make the following changes

BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));

console.log(chunk.length);
console.log(this.buffer.length);
done();
};
......
BufferStream.prototype._flush = function (cb) {
var len = this.buffer.length;
for (var i = 0; i < len; i++) {
this.push(this.buffer.shift());
};
cb();
};

You can also pause the readable stream which will in effect pause the writable stream because it stops receiving data ie...

To test this create a fairly large file on disk ie 100MB or more and run this...

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
var ready = 0;
readableStream.pause();
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
writableStream.write(chunk);
});

The reason for the immediate pause is because by the time the interval has fired 10ms the file may already have been written. There are variations on this ie...

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');

var ready = 0;
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);

readableStream.on('data', function(chunk) {
writableStream.write(chunk);
readableStream.pause();
});


Related Topics



Leave a reply



Submit