saxpath icon indicating copy to clipboard operation
saxpath copied to clipboard

Add new Stream() class to expose a duplex stream

Open rprieto opened this issue 10 years ago • 10 comments

As discussed in #10, this adds a new Stream class:

var fileStream = fs.createReadStream('test/bookstore.xml');
var saxParser  = sax.createStream(true);

// normal usage
var streamer = new saxpath.SaXPath(saxParser, '//book', /* optional recorder*/);
streamer.on('match', /* ... */);

// new streaming version
var streamer   = new saxpath.Stream(saxParser, '//book', /* optional recorder*/, /* stream args */);
fileStream.pipe(streamer).pipe(process.stdout);

rprieto avatar Jan 26 '15 23:01 rprieto

I like the simple API.

A concern is pause / resume behavior, which can be hard to test… did you have a chance to look into that?

PaulMougel avatar Jan 27 '15 06:01 PaulMougel

Hi @PaulMougel, since this is based on the new Readable, my understanding is that it will handle backpressure automatically, buffering all pushed data in memory. However it doesn't propagate the backpressure by pausing the saxParser itself, so it would just keep getting matches and buffering them. Would that be OK?

I'll try adding some tests to prove/cover that, if you have any tips/advice please let me know!

rprieto avatar Jan 28 '15 22:01 rprieto

However it doesn't propagate the backpressure by pausing the saxParser itself, so it would just keep getting matches and buffering them.

My point indeed. If your XML is large or even infinite, it can cause some issues. Not 100% sure about that because saxParser is sort of included by your stream… So if data comes out too fast of streamer (which really is the output of saxParser), fileStream will stop feeding input to streamer, which will in turn stop feeding input to saxParser.

Will everything be fine? Not quite sure about that :/

PaulMougel avatar Jan 29 '15 08:01 PaulMougel

Agreed, back-pressure would be really nice to have. I'll investigate to see if the saxParser can be paused, but it's not a readable stream itself so it might not be possible.

However the new Stream class could listen to this.on('pipe'), and keep track of the upstream (most likely a file, or the infinite source you mentioned). Then it can pause that upstream directly when the downstream is full. I'm happy to have a play with this and push a separate commit for review.

rprieto avatar Jan 29 '15 09:01 rprieto

Nice work, please look into it if you want! I'm fine with merging this pull request as is, though.

StevenLooman avatar Jan 30 '15 06:01 StevenLooman

I've updated package.json to include mocha 2.1.0 and no longer depend on it to be globally installed, but run it locally. (In hindsight, requiring it to be installed locally and using it globally doesn't make sense for me...) You run into merge errors now, but they should be easy to fix.

StevenLooman avatar Jan 30 '15 06:01 StevenLooman

Good catch, I hadn't updated the test: "mocha" part of the package. I'll rebase on top of master so it doesn't conflict. And I'll give back-pressure a try over the weekend and let you know. Cheers

rprieto avatar Jan 30 '15 07:01 rprieto

Great!

StevenLooman avatar Jan 31 '15 10:01 StevenLooman

@rprieto Any luch with the back-pressure testing?

StevenLooman avatar Feb 15 '15 10:02 StevenLooman

Sorry @StevenLooman, played with it a bit but I haven't found a good way to test it yet. I'm still keen to have a look if you don't mind, see if this other implementation makes a difference:

  • push the item to an array in streamer.on('match')
  • read items from the array in readable._read, until the reader returns false

rprieto avatar Feb 15 '15 22:02 rprieto