async-csp
async-csp copied to clipboard
A different approach to filtering?
I love CSP and think it may be a better approach than asynchronous iteration. For filtering channels, I’d prefer an approach that is different than constructor + callback. What do you think? If this makes sense, it could be added as, e.g., Channel.filter().
Error handling and the sentinel value END_OF_CHANNEL feel like work-arounds. Are there ways of handling closing and errors that are more in line with the style of the API?
Example:
import fs from 'fs';
import Channel from 'async-csp';
const END_OF_CHANNEL = Symbol();
function filter(inputChannel, ...filterFuncs) {
for (const filterFunc of filterFuncs) {
const outputChannel = new Channel();
filterFunc(inputChannel, outputChannel);
inputChannel = outputChannel;
}
return inputChannel;
}
function readFile(fileName) {
const channel = new Channel();
const readStream = fs.createReadStream(fileName,
{ encoding: 'utf8', bufferSize: 1024 });
readStream.on('data', buffer => {
const str = buffer.toString('utf8');
channel.put(str);
});
readStream.on('error', err => {
channel.put(err);
});
readStream.on('end', () => {
// Signal end of output sequence
channel.put(END_OF_CHANNEL);
});
return channel;
}
async function splitLines(input, output) {
let previous = '';
while (true) {
const chunk = await input.take();
if (chunk === END_OF_CHANNEL) break;
if (chunk instanceof Error) {
output.put(chunk);
return;
}
previous += chunk;
let eolIndex;
while ((eolIndex = previous.indexOf('\n')) >= 0) {
const line = previous.slice(0, eolIndex);
output.put(line);
previous = previous.slice(eolIndex+1);
}
}
if (previous.length > 0) {
output.put(previous);
}
output.put(END_OF_CHANNEL);
}
async function prefixLines(input, output) {
while (true) {
const line = await input.take();
if (line === END_OF_CHANNEL) {
output.put(END_OF_CHANNEL);
return;
}
if (line instanceof Error) {
output.put(line);
return;
}
output.put('> '+line);
}
}
async function main() {
const fileName = process.argv[2];
const ch = filter(readFile(fileName), splitLines, prefixLines);
while (true) {
const line = await ch.take();
if (line === END_OF_CHANNEL) break;
console.log(line);
}
}
main();
@rauschma This is an interesting example. How would you write this example using the constructor+callback pattern (curious to see how it compares to this idea, and to see if it wouldn't be as clean).
@trusktr IINM, each of the filterFuncs would be a callback of a channel and the channels would be piped. Obviously, they’d have to be restructured appropriately; not trivial in the case of splitLines().
I have actually considered going the whole 9 yards, and trying to support things like filter, map, reduce, etc. Or transducers directly. Does that fit in line with what you were thinking? The constructor callback does try to be a one-size-fits-all, which in turn tends to mean it probably does too much.
One other thing to consider, humor me and pretend .filter and .map exist on the following setup, and assume they are "immutable" (in the sense that they return a new Channel each time, setting up the pipes as expected):
const ch = new Channel([1, 2, 3, 4, 5])
.map(x => {
if (x === 3) { throw new Error(); }
return x + 1;
})
.filter(x => x % 2 === 0)
await ch.take();
await ch.take();
await ch.take(); //=> Error lifted into a throw / rejection here, from the `#map()` call?
Seems like it may be tricky to get the error all the way through to the "userland" #take(). But I suppose that would be the least-surprising implementation?
So if that's the case, I would need a way to pipe the error from the mapped channel to the filtered channel, at least until I find a user-specified #take() call that isn't part of the pipe.
But that introduces async issues to consider. Even if a #take() is not specified right now on the mapped channel, it may be attached in the future. Is missing that intended #take() because we piped the error all the way through a problem, or intentional?
Considering making an error in one channel propagate the error to each channel in a pipeline, closing them all.
There's some discussion around whether or not to do this in native streams here. Seems a bit unclear whether or not they landed on a solution.
To give you an eye into what I'm considering, I have started a new branch here.
Currently, the only new method implemented is #map() (ignoring #toArray() anyways). I included some functor law tests, as well. Other methods like #filter() and #reduce() would likely be close to follow, if I like the way it plays out. Going to sit and mull this one over.