AsyncIterator icon indicating copy to clipboard operation
AsyncIterator copied to clipboard

feat: maybeIterator

Open jeswr opened this issue 3 years ago • 8 comments

Resolves #66

This PR introduces a maybeIterator which takes an iterator as input, and returns an iterator only if it is non-empty. Otherwise null is returned.

Happy to accept naming suggestions on this one.

Pinging @jacosaz

jeswr avatar Apr 28 '22 05:04 jeswr

Could you elaborate on use cases?

RubenVerborgh avatar Apr 28 '22 06:04 RubenVerborgh

The use case for me is to terminate forwards chaining reasoning here is some of the code for it, very WIP. Below is a minimal naive example to give you an idea of what the primary logic is

function getConsequents(quad: RDF.Quad): AsyncIterator<Quad> {
  // Gets any new consequences that can result from adding 'quad' to the dataset with the current rules
}

// New quad to be added to the dataset
let iterator = single(quad);
// N3 Store
const store = new Store();

while ((iterator = maybeIterator(iterator)) !== null) {
  // Get any new quads, by applying rules to quads generated in previous 'loop'
  iterator = union(iterator.map( q => getConsequents(q) ))
  // Add new quads to the store, and filter out from the iterator if already present in the store
  iterator = iterator.filter(q => store.addQuad(q))
}

Generally, it can be used to terminate any recursive operation that eventually stops generating new items.

I recognise that this is not as not as important as the performance issues that are open, so those should still be addressed before this.

jeswr avatar Apr 28 '22 12:04 jeswr

The while loop is a good example!

while ((iterator = maybeIterator(iterator)) !== null) {

I take it this is

while ((iterator = await maybeIterator(iterator)) !== null) {

but yes, I see the case now. Thinking perhaps to call it ensureNonEmpty.

Although for this case (and if this is the general pattern), it could also be

do  {
  // Get any new quads, by applying rules to quads generated in previous 'loop'
  iterator = union(iterator.map( q => getConsequents(q) ))
  // Add new quads to the store, and filter out from the iterator if already present in the store
  iterator = iterator.filter(q => store.addQuad(q))
} while (await hasItems(iterator))

with

function hasItems(iterator) {
  return new Promise((resolve, reject) => {
    iterator.once('error', reject);
    iterator.once('data', () => resolve(true));
    iterator.once('end', () => resolve(false));
  });
}

RubenVerborgh avatar Apr 28 '22 13:04 RubenVerborgh

await maybeIterator(iterator)

Ah yes, good catch, and indeed do-while is a better choice a lot of the time.

hasItems

The current hasItems implementation will result in one element being 'discarded' each time because of the once call, which is why there is need to return a new iterator with the item re-appended. Though the logic for retrieving the first element, and then re-appending it may well be improved.

I'm also hesitant about the use of once because the Readable API will leave an iterator in flowing mode once a when a data listener is unsubscribed (for backwards compat reasons) and I had made changes to align with that in https://github.com/RubenVerborgh/AsyncIterator/pull/43 - this means that an iterator would be left in flowing mode after one element has been emitted using once.

In addition I noticed some buggy behavior on a somewhat similar approach (due to the implementation of toArray) with

export async function maybeIterator<T>(source: AsyncIterator<T>): Promise<null | AsyncIterator<T>> {
  const elem = await source.take(1).toArray();
  return elem.length === 1 ? source.prepend(elem) : null;
}

where I get the following failing tests

 1) maybeIterator
       Should return an iterator with all elements if the iterator is not empty
         fromArray:

      AssertionError: expected [ 1 ] to deeply equal [ 1, 2, 3 ]
      + expected - actual

       [
         1
      +  2
      +  3
       ]
      
      at Context.<anonymous> (file:///home/jeswr/comunica-perf/AsyncIterator/test/maybeIterator-test.js:94:83)

  2) maybeIterator
       Should return an iterator with all elements if the iterator is not empty
         range 1-3:

      AssertionError: expected [ 1 ] to deeply equal [ 1, 2, 3 ]
      + expected - actual

       [
         1
      +  2
      +  3
       ]
      
      at Context.<anonymous> (file:///home/jeswr/comunica-perf/AsyncIterator/test/maybeIterator-test.js:97:74)

  3) maybeIterator
       Should return an iterator with all elements if the iterator is not empty
         MyItemBufferingIterator:

      AssertionError: expected [ 8 ] to deeply equal [ 8, 6, 4, 2, 0 ]
      + expected - actual

       [
         8
      +  6
      +  4
      +  2
      +  0
       ]
      
      at Context.<anonymous> (file:///home/jeswr/comunica-perf/AsyncIterator/test/maybeIterator-test.js:103:92)

jeswr avatar Apr 28 '22 13:04 jeswr

The current hasItems implementation will result in one element being 'discarded' each time because of the once call,

No, because it's after the filter has been applied? So it's just a second listener?

I'm also hesitant about the use of once

Yes, the better implementation is to remove the listeners yourself.

RubenVerborgh avatar Apr 28 '22 13:04 RubenVerborgh

No, because it's after the filter has been applied? So it's just a second listener?

But won't it still pull the first element out of the list before iterator = union(iterator.map( q => getConsequents(q) )) is applied in the next iteration of the do-while loop?

Yes, the better implementation is to remove the listeners yourself.

Even then the readable API leaves the iterator in flowing mode; it requires that you explicitly call iterator.pause() in order to stop flowing mode IIRC.


https://nodejs.org/api/stream.html#two-reading-modes

For backward compatibility reasons, removing 'data' event handlers will not automatically pause the stream. Also, if there are piped destinations, then calling stream.pause() will not guarantee that the stream will remain paused once those destinations drain and ask for more data.

jeswr avatar Apr 28 '22 13:04 jeswr

But won't it still pull the first element out of the list before iterator = union(iterator.map( q => getConsequents(q) )) is applied in the next iteration of the do-while loop?

Oof, you're right.

RubenVerborgh avatar Apr 28 '22 13:04 RubenVerborgh

Coverage Status

Coverage decreased (-0.7%) to 99.337% when pulling 9d293453c681312dacb9def09f6ba110d3b4ca0d on maybeIterator into fec14f4d19e3d594bc385b927863ebf80a792580 on main.

coveralls avatar Apr 29 '22 00:04 coveralls