Node.js Piping the same readable stream into multiple (writable) targets
You have to create duplicate of the stream by piping it to two streams. You can create a simple stream with a PassThrough stream, it simply passes the input to the output.
const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;
const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();
a.stdout.pipe(b);
a.stdout.pipe(c);
let count = 0;
b.on('data', function (chunk) {
count += chunk.length;
});
b.on('end', function () {
console.log(count);
c.pipe(process.stdout);
});
Output:8
hi user
Unable to use one readable stream to write to two different targets in Node JS
You have 2 alternatives, which depends on how your software is designed.
If possible, I would avoid to execute two transform operations on the same stream in the same "context", eg: an API endpoint. I would rather separate those two different tranform so they do not work on the same input stream.
If that is not possible or would require too many changes, the solution is to fork the input stream and the pipe it into two different Writable. I normally use Highland.js fork for these tasks.
Please also see my comments on how to properly handle streams with async/await to check when the write operation is finished.
Node.js: appending to readable stream WHILE it is piped to a writable stream?
Does this give you answer
const Stream = require('stream')
const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}
readableStream.pipe(writableStream)
readableStream.push('ping!')
readableStream.push('pong!')
writableStream.end()
Piping to same Writable stream twice via different Readable stream
Found the reason why was the second piping to the writable stream w
wasn't working.
When the first .pipe(w) finishes, the writable at w is also automatically closed.
So instead of using s.pipe(zlib.createGzip()).pipe(w);
I should have used:
s.pipe(zlib.createGzip()).pipe(w, {end: false});
Where the {end: false} is passed to pipe() as the second argument, stating that the writable should not get closed when the piping is done. Multiple listeners reading from the same stream in NodeJS
When the readable event is emitted, both the listeners listen. However, the first to call the read() function would get the data and second one would get an empty buffer because it's the same input stream being read twice and only a single chunk was present.
When the data event is emitted the listener receives the chunk of data being read from the stream. So all the listeners receive the same data.
All readable streams begin in 2 modes: flowing and paused. All Readable streams begin in paused mode by default but they can be switched to the flowing mode using any of the three methods:
1. Attaching a 'data' event handler to the stream
2. Calling the stream.resume() method
3. Calling the stream.pipe() method to send the data to a Writable.
When you use any of the above method the stream starts to flow. It doesn't care if the data listeners are attached to the stream and there's a possibility of losing the data. Internally, the read() method is called on the stream and whatever data that is being accumulated in the internal buffer is read and emitted to the listeners. The memory usage is quite low.
When you attach a readable listener on your stream, it takes priority over the data listener and hence your stream remains in the paused mode. In the paused mode, you've to explicitly read the data from the internal buffer by calling the read() method. When the readable data is available it keeps on getting accumulated in the internal buffer until the read() method is called explicitly or the stream is resumed. You can specify the size in bytes of the chunk to be read from the internal buffer or all the available data is returned. When read() is called data event is also emitted with the chunk of data read. After consuming this data, the internal buffer is emptied. So when you have multiple readable events attached and try to consume from the same internal buffer, you are not able to get the same data multiple times.
My suggestion to you would be have just one readable listener and multiple data listeners. Having a readable would give you the flexibility to read when you want without missing any data. And with the data event handlers, you would be able to get that data in all the handlers.
Piping data to writable stream that is not ready to receive data yet
Note, I'm using an interval here to mimic the writer being able to read or not. You can do this any way you want ie if the writer returns false you would update the state to start buffering etc. I think the last line is what you want ie
r.pipe(b).pipe(w);
This reads as followsreadStrem.pipe(transformBbuffer).pipe(writeStream);
The example code, there are some changes we can make to buffer all data. I'll describe after the code. Everything you need to know about streams and more are in the docs, I think they could do with more complete examples but they're pretty good as is...https://nodejs.org/api/stream.html#stream_class_stream_transform_1
This the code.
var fs = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff = 0;
var DRAIN_ME = 0;
var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');
var BufferStream = function () {
stream.Transform.apply(this, arguments);
this.buffer = [];
};
util.inherits(BufferStream, stream.Transform);
var intId;
intId = setInterval(function(){
if(check_buff % 3 == 0) {
DRAIN_ME = 1;
return;
}
DRAIN_ME = 0;
},10);
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
while(DRAIN_ME > 0 && this.buffer.length > 0) {
this.push(this.buffer.shift());
}
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
var b = new BufferStream();
b.on('end', function(chunk) {
clearInterval(intId);
});
r.pipe(b).pipe(w);
Make the following changesI am looking for the canonical way to implement a transform/through
stream, that buffers all data until pipe is call on it.
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
......
BufferStream.prototype._flush = function (cb) {
var len = this.buffer.length;
for (var i = 0; i < len; i++) {
this.push(this.buffer.shift());
};
cb();
};
You can also pause the readable stream which will in effect pause the writable stream because it stops receiving data ie...To test this create a fairly large file on disk ie 100MB or more and run this...
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
readableStream.on('data', function(chunk) {
var ready = 0;
readableStream.pause();
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
writableStream.write(chunk);
});
The reason for the immediate pause is because by the time the interval has fired 10ms
the file may already have been written. There are variations on this ie...var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
var ready = 0;
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
readableStream.pause();
});
Related Topics
What Is the Most Efficient Way to Reverse an Array in JavaScript
Read a Text File Using Node.Js
Handling Key-Press Events (F1-F12) Using JavaScript and Jquery, Cross-Browser
How to Sort an Associative Array by Its Values in JavaScript
Invoke a Callback at the End of a Transition
JavaScript - Telling Setinterval to Only Fire X Amount of Times
How to Bind Bootstrap Popover on Dynamic Elements
Get the Current Year in JavaScript
Removing Duplicate Objects with Underscore for JavaScript
I Can't Reference an Image in Next.Js
Parse String to Date with Moment.Js
Getting a Union of Two Arrays in JavaScript
How to 'Await' on an Rx Observable
Write Elements into a Child Iframe Using JavaScript or Jquery