feat: maybeIterator
Resolves #66
This PR introduces a maybeIterator which takes an iterator as input, and returns an iterator only if it is non-empty. Otherwise null is returned.
Happy to accept naming suggestions on this one.
Pinging @jacosaz
Could you elaborate on use cases?
The use case for me is to terminate forwards chaining reasoning here is some of the code for it, very WIP. Below is a minimal naive example to give you an idea of what the primary logic is
function getConsequents(quad: RDF.Quad): AsyncIterator<Quad> {
// Gets any new consequences that can result from adding 'quad' to the dataset with the current rules
}
// New quad to be added to the dataset
let iterator = single(quad);
// N3 Store
const store = new Store();
while ((iterator = maybeIterator(iterator)) !== null) {
// Get any new quads, by applying rules to quads generated in previous 'loop'
iterator = union(iterator.map( q => getConsequents(q) ))
// Add new quads to the store, and filter out from the iterator if already present in the store
iterator = iterator.filter(q => store.addQuad(q))
}
Generally, it can be used to terminate any recursive operation that eventually stops generating new items.
I recognise that this is not as not as important as the performance issues that are open, so those should still be addressed before this.
The while loop is a good example!
while ((iterator = maybeIterator(iterator)) !== null) {
I take it this is
while ((iterator = await maybeIterator(iterator)) !== null) {
but yes, I see the case now. Thinking perhaps to call it ensureNonEmpty.
Although for this case (and if this is the general pattern), it could also be
do {
// Get any new quads, by applying rules to quads generated in previous 'loop'
iterator = union(iterator.map( q => getConsequents(q) ))
// Add new quads to the store, and filter out from the iterator if already present in the store
iterator = iterator.filter(q => store.addQuad(q))
} while (await hasItems(iterator))
with
function hasItems(iterator) {
return new Promise((resolve, reject) => {
iterator.once('error', reject);
iterator.once('data', () => resolve(true));
iterator.once('end', () => resolve(false));
});
}
await maybeIterator(iterator)
Ah yes, good catch, and indeed do-while is a better choice a lot of the time.
hasItems
The current hasItems implementation will result in one element being 'discarded' each time because of the once call, which is why there is need to return a new iterator with the item re-appended. Though the logic for retrieving the first element, and then re-appending it may well be improved.
I'm also hesitant about the use of once because the Readable API will leave an iterator in flowing mode once a when a data listener is unsubscribed (for backwards compat reasons) and I had made changes to align with that in https://github.com/RubenVerborgh/AsyncIterator/pull/43 - this means that an iterator would be left in flowing mode after one element has been emitted using once.
In addition I noticed some buggy behavior on a somewhat similar approach (due to the implementation of toArray) with
export async function maybeIterator<T>(source: AsyncIterator<T>): Promise<null | AsyncIterator<T>> {
const elem = await source.take(1).toArray();
return elem.length === 1 ? source.prepend(elem) : null;
}
where I get the following failing tests
1) maybeIterator
Should return an iterator with all elements if the iterator is not empty
fromArray:
AssertionError: expected [ 1 ] to deeply equal [ 1, 2, 3 ]
+ expected - actual
[
1
+ 2
+ 3
]
at Context.<anonymous> (file:///home/jeswr/comunica-perf/AsyncIterator/test/maybeIterator-test.js:94:83)
2) maybeIterator
Should return an iterator with all elements if the iterator is not empty
range 1-3:
AssertionError: expected [ 1 ] to deeply equal [ 1, 2, 3 ]
+ expected - actual
[
1
+ 2
+ 3
]
at Context.<anonymous> (file:///home/jeswr/comunica-perf/AsyncIterator/test/maybeIterator-test.js:97:74)
3) maybeIterator
Should return an iterator with all elements if the iterator is not empty
MyItemBufferingIterator:
AssertionError: expected [ 8 ] to deeply equal [ 8, 6, 4, 2, 0 ]
+ expected - actual
[
8
+ 6
+ 4
+ 2
+ 0
]
at Context.<anonymous> (file:///home/jeswr/comunica-perf/AsyncIterator/test/maybeIterator-test.js:103:92)
The current
hasItemsimplementation will result in one element being 'discarded' each time because of theoncecall,
No, because it's after the filter has been applied? So it's just a second listener?
I'm also hesitant about the use of once
Yes, the better implementation is to remove the listeners yourself.
No, because it's after the filter has been applied? So it's just a second listener?
But won't it still pull the first element out of the list before iterator = union(iterator.map( q => getConsequents(q) )) is applied in the next iteration of the do-while loop?
Yes, the better implementation is to remove the listeners yourself.
Even then the readable API leaves the iterator in flowing mode; it requires that you explicitly call iterator.pause() in order to stop flowing mode IIRC.
https://nodejs.org/api/stream.html#two-reading-modes
For backward compatibility reasons, removing 'data' event handlers will not automatically pause the stream. Also, if there are piped destinations, then calling stream.pause() will not guarantee that the stream will remain paused once those destinations drain and ask for more data.
But won't it still pull the first element out of the list before
iterator = union(iterator.map( q => getConsequents(q) ))is applied in the next iteration of thedo-whileloop?
Oof, you're right.