Piped streams not flowing with batch
We're dealing with an issue where piping streams through a highland batched stream causes it to halt. The actual code involves streaming data from HTTP, but I think I've managed to narrow it down to the following code.
let {Transform, Writable} = require('stream');
let _ = require('highland');
// Dummy transform
class NullTransform extends Transform {
constructor(options) {
options = options || {};
options.writableObjectMode = true;
options.readableObjectMode = true;
super(options);
}
_transform(chunk, encoding, callback) {
console.log('transform', typeof chunk);
this.push(chunk);
callback();
}
_flush(callback) {
callback();
}
}
// Dummy writer
class NullWriter extends Writable {
constructor(options) {
options = options || {};
options.objectMode = true;
// options.highWaterMark = 1;
super(options);
this._storage = [];
}
_write(chunk, encoding, callback) {
console.log('write', chunk, typeof chunk, chunk.length);
callback();
}
}
// Infinite stream of random numbers
const readStream = _({
next() {
console.log('next!');
return {
done: false,
value: Math.random(),
};
}});
const transform = new NullTransform();
const writer = new NullWriter();
This will cause the application to exit:
readStream
.pipe(_().batch(1000))
.pipe(writer);
Strangely, adding the dummy transform before the batching causes it to flow properly:
readStream
.pipe(transform)
.pipe(_().batch(1000))
.pipe(writer);
Am I missing something?
At first glance, it looks like the problem is in pipe(_().batch(1000)). The only Highland stream that you can pipe to is the one that is returned by _() or _.pipeline(...). When you call batch, it returns a new stream that is read-only, so you can't pipe to that stream.
It's pretty rare that you'll need to use _() at all. In this case, since readStream is already a Highland stream, you can call batch on it directly.
readStream
.batch(1000)
.pipe(writer);
If readStream is a Node stream (since you mentioned HTTP streaming), you can create a Highland stream of out it using the Highland stream constructor.
_(readStream)
.batch(1000)
.pipe(writer);
I don't know why the code you showed works when you add the dummy transform, but it only works by coincidence.