node-stream-transform
node-stream-transform copied to clipboard
Info Request
Hi, with a Transformer stream, is there a mechanism to get when a "_flush" occurs? By the time I get "finish" and "end", the stream can no longer be written to.
My use case is the following:
- Upstream, I have a transform stream running multiple instances in parallel (web scraping)
- I want to collect the data from the upstream data in transform stream, tweak it and cache it, but not output it right away (basically, it would be a "sink")
- Upon completion of the upstream stream but before finish or end, when the stream is closed and I can't push anymore, I want to push the sorted data downstream
I can do this with extending a standard stream.Transform stream; but would like to use Transformer, running parallel instances. The problem is I can't figure out a way to know when the inbound stream is "done" before I can't write out anymore.
If I were to do the legwork of adding support for a "flush" callback/listener, is it something that you would accept into the project?
Transformer.prototype._flush = function(cb) {
this._ending = function() {
if (this.state.running === 0) {
// *** Call "flush" callback here if one is defined... ***
return cb();
}
};
return this._ending();
};
Sorry I didn't get back to you on time. We entirely leverage the Node.js Stream API so if there is an existing solution then we can apply it to stream-transform
. It seems like you could plug another stream.Transform
stream before which catch the end event of the input stream and then do what need to be done before instructing the output stream to finish. Otherwise, I'll have to looks more closely at your pull request and the supported tests to validate them, I'm still not entirely sore to grasp the use case.
Lookahead and lookbehind are common requirements for stream algorithms. Often, multiple data records may be retained or processed before producing an output record, and if the stream ends before the final retained data can be processed, it can make implementing those algorithms impossible with this library.
I just faced this issue today, and spent a lot of time trying to work around the lack of a flush callback in this library. When transforming a stream by examining multiple records, such as aggregating data over a window, the stream may end with some retained data not fully processed.
In my use case, I was consuming CSV timeseries data and looking for a maximum value for each hour. The records representing the partial hour at the end of the dataset were being simply discarded, since there was no way to model the end of the stream.
Since "Skip and multiply records" is one of the stated use cases of the library, I believe it ought to give the user code some way to know when a stream is ending.
I ended up having to wrap _flush in a very messy fashion to get it to work.
var maxPerHour = (function () {
let end = null;
let max = null;
let stream = new Transform((x) => {
if (!max || max.sessions > x.sessions)
max = x;
if (!end) {
end = new Date(x.time);
end.setHours(end.getHours() + 1);
}
if (x.time > end) {
out = max;
end = null;
max = null
return out;
}
});
let original_flush = stream._flush;
stream._flush = (cb) => {
stream.push(max);
original_flush.call(stream, cb);
};
return stream;
})();
I ended up having to wrap _flush in a very messy fashion to get it to work.
How would you have expected/anticapted/wished your code to look like ?
It would work with a lot of different interfaces, but any of the following would have worked for me:
- Transformer(dataHandler, endingHandler)
- Transformer emitting a 'flush' event at the line @jasonterando commented "Call 'flush' callback here"
- Transformer calling a 'flush' or 'ending' callback, as per the OP.
Alternately, though this would be a major rearchitecture, refactoring Transformer as an es6 class so it can be more easily extended. My first approach was to try subclassing it and overloading _flush(), calling super._flush, but this turned out to be non-trivial, since it's a util.inherits class. But an ES6 class would be easily subclassed at the point of use to what I was calling "FlushableTransformer".
Whether or not it's implemented any of the ways I've suggested, without passing forward the information that the stream is ending in some way, Transformer misses any use case involving aggregating blocks of records and emitting records afterwards, so a use case involving a windowed aggregate would be a useful test.
Another option might be calling the transform callback with a null upon a _flush to let the transformer know the inbound stream is done.
I like the null arg idea. I didn't suggest it because I wasn't sure if it would be backwards-compatible. Maybe an option at creation for sendNullOnEnding? That way existing code doesn't have its contract broken.