Otherwise not picking up empty stream
In my application repo I have the following function. I stream an immutable object down the pipeline. It filters it to see if we need to run a git diff to determine if we should copy the last version's assets or run a full build. However, it appears that after filter runs and if it returns false, that otherwise never fires. The stream appears to end, however it works completely as expected in my unit test. Does any cause come to mind which as to why the stream stops if that filter function returns false? I've put a console log statement in the otherwise stream-returning function but it does not fire.
function getBuildAction (stream) {
let stateStream = stream.observe();
return stream
// Only process when we know there is a newer version of the source
.filter((state) => state.lastAssetVersion !== state.currentAssetVersion)
// Diff the repo against the target branch & emit diff files
.flatMap((state) => runGitDiff(state.sourceDir))
// Flip back to original state stream if diff was not required
.otherwise(stateStream)
}
The code before it also runs from an observe like this:
function createBuildStream (stateStream) {
let buildStream = stateStream.observe();
buildStream
.through(getBuildAction) // Where the stream stops
.through(buildProcess) // Runs a bunch of commands
.pipe(writeToLog()); // returns fs.createWriteStream
return stateStream;
}
I can't think of any reason why this wouldn't work. However, the implementation of otherwise uses the stream redirect feature, and that feature has always been a bit brittle in 2.x, so I wouldn't be surprised if this is a race condition bug in Highland.
Can you provide a test case that I can run that causes this behavior? I can't tell you much more without one.
Alternatively, the workaround is to implement getBuildAction without using otherwise.
function getBuildAction (stream) {
return stream
.flatMap(state => {
if (state.lastAssetVersion !== state.currentAssetVersion) {
return runGitDiff(state.sourceDir);
} else {
return _.of(state);
}
});
}
Hah the solution I've put in place of the otherwise is almost the exact same as what you proposed. Guess I'm slowly starting to get a feel for it! Though, it would be all the more rewarding to create these elaborate flows without if statements if possible.
I've been trying to isolate it in a test case but unfortunately it's a little challenging to track down. Below is the closest I've been able to reproduce the error where none of the getBuildAction steps are executed and the test case times out. It's not quite the exact situation but I think it's similar? I will continue to hunt down the exact case tomorrow.
let _ = require('highland');
let assert = require('assert');
function getBuildAction (stream) {
let stateStream = stream.observe();
return stream
// Only process when we know there is a newer version of the source
.filter((state) => state.lastAssetVersion !== state.currentAssetVersion)
// Diff the repo against the target branch & emit diff files
.flatMap((state) => _.of({ action: 'build' }))
// Flip back to original state stream if diff was not required
.otherwise(stateStream)
}
it('Should return a non-buildable state if not new source', (done) => {
let state = {
action: null,
lastAssetVersion: 'abcdefgh',
currentAssetVersion: 'abcdefgh'
};
let stream = _.of(state).observe();
return stream
.through(getBuildAction)
.toCallback((err, buildState) => {
assert.equal(buildState.action, null, 'Build action should be unmodified and left as null.');
done(err, buildState);
})
});
The reason why your test case doesn't execute properly is because you only observe your source stream but never consume it. Thus, your observer stream never sees anything.
If you change your code to this, it should work.
let stream = _.of(state);
stream.observe()
.through(getBuildAction)
.toCallback((err, buildState) => {
assert.equal(buildState.action, null, 'Build action should be unmodified and left as null.');
done(err, buildState);
})
stream.resume();
This is probably not the root cause of your problem, right?
Oh my mistake. Yeah unfortunately the reported issue is not so simple as it fires that first filter operation then stops. My hunch is that since this function works fine in the unit test it has more to do with the state of the stream coming into it.
Hmm no luck so far but I do have a couple of questions:
1.) When are .through functions fired? It seems like it is happening a lot sooner than when their stream transforms are fired.
2.) What's the best way to debug the state of the stream? Just add console statements to node_modules/highland/src/index.js?
-
throughfunctions are called immediately.stream.through(fn)is syntactic sugar forfn(stream). -
Yes. If you want to see the state of the stream, add console statements to index.js. There's already a bunch of commented out debug statements that you might find useful.
You'll want to reduce the test case as much as possible first. The stream state being printed out can get verbose very quickly as your pipeline becomes more complicated. Try starting with a failing pipeline and removing transforms until it stops breaking. For example, is .through(buildProcess) necessary? What about .pipe(writeToLog())? Can that be replaced with .resume() or .pipe(process.stdout)?
Hey again, it's been a while. Today I was updating one of the util functions this project uses and while writing unit tests for it I stumbled upon the cause of this issue and confirmed it with a separate, isolated test.
In short before the aforementioned through function I was calling .consume(err, x, push, next) and pushing data but not calling next as I didn't think it always necessary like in a stream generator.
describe('consume', () => {
it('Should fail if next is not called', (done) => {
let preFilterSpy = expect.createSpy();
let endSpy = expect.createSpy();
_([ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ])
.consume((err, x, push) => {
push(err, x);
})
.tap(preFilterSpy)
.filter(x => x === 'h')
.otherwise(_.of('h'))
.each(x => {
expect(x).toBe('h');
})
.done(endSpy);
expect(preFilterSpy).toHaveBeenCalled();
expect(preFilterSpy.calls.length).toBe(2);
expect(endSpy).toNotHaveBeenCalled();
done();
});
it('Should succeed if next is called', (done) => {
let preFilterSpy = expect.createSpy();
let endSpy = expect.createSpy();
_([ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ])
.consume((err, x, push, next) => {
push(err, x);
if (x !== _.nil) next();
})
.tap(preFilterSpy)
.filter(x => x === 'h')
.otherwise(_.of('h'))
.each(x => {
expect(x).toBe('h');
})
.done(endSpy);
expect(preFilterSpy).toHaveBeenCalled();
expect(preFilterSpy.calls.length).toBe(7);
expect(endSpy).toHaveBeenCalled();
done();
});
});
While I'm ecstatic this issue is resolved, I still have a couple of questions for you @vqvu if it's no trouble.
-
What is the difference between calling
next()in a value generator vsnext()in a consume function? For clarity purposes "value generator" refers to the_((push, next) => { ... });api. -
When would you not call
next()in both a value generator function and aconsumemethod?
I'm glad you found the problem! And thanks for posting your repro case. It actually shows an unrelated bug in Highland. In the failure case, preFilterSpy.calls.length should be 1 and not 2, since you never call next() in your consume handler. Fix in #608.
To answer your questions:
What is the difference between calling
next()in a value generator vsnext()in aconsumefunction?
They're not all that different. next is used in both cases to signal that you are ready for the handler to be called again. Typically, you call it once you are done calling push.
For value generators, generally, you'll have code that looks like
- Generator called.
- Generator produces some values and emits it with
push. It may do this asynchronously. - Generator call
push(null, _.nil)if there's no more data or it callsnextto ask Highland to call it again when the stream needs more data.
For consume handlers, you have a very similar thing,
- Generator called with the first value that the source emits.
- Generator transforms that value to 0 or more other values and emits it with
push. It may do this asynchronously. - Generator call
push(null, _.nil)if it doesn't want to emit any more data, or it callsnextto ask Highland for the next value that the source emits. As before, Highland will only call the handler again when the stream needs more data.
When would you not call
next()in both a value generator function and aconsumemethod?
I can think of two reasons.
- You are not allowed to call
next()after pushing_.nil. For aconsumehandler, this is true even if you have not received a_.nilfrom the source stream. For example,takewill pushnilwhen it has pushed enough values, and never callnext()again, even though the source stream never emittednil. - You are implementing a generator for a "hot" source where it does not make sense to support backpressure. For example, if you are wrapping click events from the DOM, you would just call
pushwhenever the event fires. There's no reason to ever callnext.
It actually shows an unrelated bug in Highland. In the failure case,
preFilterSpy.calls.lengthshould be 1 and not 2, since you never callnext()in your consume handler.
FYI, I've released ~~2.10.3~~ 2.10.4, which should address this issue. The correct assert statement is
expect(preFilterSpy.calls.length).toBe(1);
Let me know if the new release causes any issues.
Edit: Fix typos.
Cool, glad it uncovered something that may help others. Will give it a shot and report any issues that may come up.
Thanks for the explanation of the next() function in stream generation and consumption.
So for reading from an array you would probably want to use next so that data is only sent down when requested by the consumers:
let data = ['one', 'two', 'three', 'four'];
_((push, next) => {
push(data.shift());
if (data.length) next();
else push(null, _.nil);
});
But for running a one-and-done like child_process.exec you wouldn't call next?
_((push) => {
child_process.exec('ls', (err, stdout, stderr) => {
if (err === null || err.code === 0) {
push(null, stdout.toString('utf8'));
}
else if (err.code > 0) {
push(stdout.toString('utf8') + '\n' + stderr.toString('utf8'));
}
else {
push(err);
}
push(null, _.nil);
});
});
Yes to both cases, though there's a typo in your array example. It should be using
push(null, data.shift())