node-stream-transform icon indicating copy to clipboard operation
node-stream-transform copied to clipboard

Info Request

Open jasonterando opened this issue 4 years ago • 6 comments

Hi, with a Transformer stream, is there a mechanism to get when a "_flush" occurs? By the time I get "finish" and "end", the stream can no longer be written to.

My use case is the following:

  • Upstream, I have a transform stream running multiple instances in parallel (web scraping)
  • I want to collect the data from the upstream data in transform stream, tweak it and cache it, but not output it right away (basically, it would be a "sink")
  • Upon completion of the upstream stream but before finish or end, when the stream is closed and I can't push anymore, I want to push the sorted data downstream

I can do this with extending a standard stream.Transform stream; but would like to use Transformer, running parallel instances. The problem is I can't figure out a way to know when the inbound stream is "done" before I can't write out anymore.

If I were to do the legwork of adding support for a "flush" callback/listener, is it something that you would accept into the project?

Transformer.prototype._flush = function(cb) {
  this._ending = function() {
    if (this.state.running === 0) {
      // *** Call "flush" callback here if one is defined... ***
      return cb();
    }
  };
  return this._ending();
};

jasonterando avatar May 05 '20 22:05 jasonterando

Sorry I didn't get back to you on time. We entirely leverage the Node.js Stream API so if there is an existing solution then we can apply it to stream-transform. It seems like you could plug another stream.Transform stream before which catch the end event of the input stream and then do what need to be done before instructing the output stream to finish. Otherwise, I'll have to looks more closely at your pull request and the supported tests to validate them, I'm still not entirely sore to grasp the use case.

wdavidw avatar May 20 '20 13:05 wdavidw

Lookahead and lookbehind are common requirements for stream algorithms. Often, multiple data records may be retained or processed before producing an output record, and if the stream ends before the final retained data can be processed, it can make implementing those algorithms impossible with this library.

I just faced this issue today, and spent a lot of time trying to work around the lack of a flush callback in this library. When transforming a stream by examining multiple records, such as aggregating data over a window, the stream may end with some retained data not fully processed.

In my use case, I was consuming CSV timeseries data and looking for a maximum value for each hour. The records representing the partial hour at the end of the dataset were being simply discarded, since there was no way to model the end of the stream.

Since "Skip and multiply records" is one of the stated use cases of the library, I believe it ought to give the user code some way to know when a stream is ending.

I ended up having to wrap _flush in a very messy fashion to get it to work.

var maxPerHour = (function () {
	let end = null;
	let max = null;

	let stream = new Transform((x) => {
		if (!max || max.sessions > x.sessions)
			max = x;
		
		if (!end) {
			end = new Date(x.time);
			end.setHours(end.getHours() + 1);
		}

		if (x.time > end) {
			out = max;
			end = null;
			max = null
			return out;
		}
	});

	let original_flush = stream._flush;
	stream._flush = (cb) => {
		stream.push(max);
		original_flush.call(stream, cb);
	};

	return stream;
})();

sehrgut avatar Jun 16 '20 22:06 sehrgut

I ended up having to wrap _flush in a very messy fashion to get it to work.

How would you have expected/anticapted/wished your code to look like ?

wdavidw avatar Jun 17 '20 07:06 wdavidw

It would work with a lot of different interfaces, but any of the following would have worked for me:

  1. Transformer(dataHandler, endingHandler)
  2. Transformer emitting a 'flush' event at the line @jasonterando commented "Call 'flush' callback here"
  3. Transformer calling a 'flush' or 'ending' callback, as per the OP.

Alternately, though this would be a major rearchitecture, refactoring Transformer as an es6 class so it can be more easily extended. My first approach was to try subclassing it and overloading _flush(), calling super._flush, but this turned out to be non-trivial, since it's a util.inherits class. But an ES6 class would be easily subclassed at the point of use to what I was calling "FlushableTransformer".

Whether or not it's implemented any of the ways I've suggested, without passing forward the information that the stream is ending in some way, Transformer misses any use case involving aggregating blocks of records and emitting records afterwards, so a use case involving a windowed aggregate would be a useful test.

sehrgut avatar Jun 17 '20 16:06 sehrgut

Another option might be calling the transform callback with a null upon a _flush to let the transformer know the inbound stream is done.

jasonterando avatar Jun 17 '20 19:06 jasonterando

I like the null arg idea. I didn't suggest it because I wasn't sure if it would be backwards-compatible. Maybe an option at creation for sendNullOnEnding? That way existing code doesn't have its contract broken.

sehrgut avatar Jun 18 '20 16:06 sehrgut