node
                                
                                 node copied to clipboard
                                
                                    node copied to clipboard
                            
                            
                            
                        Introduce a way to interrupt `diagnostics_channel` flow
This PR introduces a way to interrupt the flow of diagnostics_channel.
Problem
Sometimes, a subscriber to a channel will want to stop further execution of the event. In a usual context this would be done by throwing an exception. However, diagnostics_channel has a mecanism to catch any exception coming from subscribers, and redirect it to the process event uncaughtException. This makes sense because we don't want a publisher to crash only because a subscriber is doing something wrong.
But a way to circumvent this catch-all is necessary for some use cases.
Use cases
An example of a very real use case would be to have a monitoring subscriber that would collect metrics and monitor the health of the app, and could decide to interrupt an action if it exceeds certain limits, contains invalid or malicious data, etc...
Another possibility is that a user would want to not go forward with a certain action if it is not succesfully consummed by a subscriber.
The general idea is that we should let the user decide where an exception thrown inside a subscriber should go, by having a safe default (ie: uncaughtException) and an explicit bypass (ie: passed to the publisher).
Solution
The proposed mecanism would rethrow a subscriber exception if it is an instance of DCInterruptError (taking suggestions for a better name), not call the uncaughtException event, and not execute the subsequent subscribers.
The public API is very simple (more details available in doc/api/diagnostics_channel.md
const dc = require('diagnostics_channel');
const channel = dc.channel('test');
channel.subscribe((data) => {
  throw new dc.DCInterruptError('Subscriber Interruption');
});
try {
  channel.publish({ foo: 'bar' });
} catch (err) {
  // DCInterruptError: Subscriber Interruption
}
Here is an alternative approach to the public API addition (courtesy of @Qard), that I could implement instead, if requested:
/////////////////////////////
// diagnostics_channel.js
// export an `interrupt` function that adds a symbol to a provided error object
const SymbolInterrupt = Symbol('DCInterrupt');
module.exports.interrupt = function interrupt (err) {
  err[SymbolInterrupt] = true;
  return err;
};
////////////////////////////
// app.js
channel.subscribe((data) => {
  throw dc.interrupt(new Error('Subscriber Interruption'));
});
////////////////////////////
// diagnostics_channel.js:ActiveChannel.publish()
// check if error contains the special symbol
try {
  onMessage(data);
} catch (err) {
  if (err[SymbolInterrupt]) throw err;
  // etc...
}
Since the diagnostics_channel module is still in experimental status, and early in its ecosystem adoption, I think it's a good idea to iterate on the idea now more than later.
Is there a reason why all errors shouldn't be thrown up to the publisher? @Qard from what I can tell, passing exceptions to 'uncaughtException' was done as a response to "what do we do with exceptions thrown by subscribers?", but maybe it's reasonable enough just to consider them as synchronous function calls, and let errors be propagated as they would?
That was how it worked originally, but people didn't like the idea of issues in that code interfering with core behaviour. I think the specialized error idea is better than letting everything through.
It seems to me that the goal here is to pass some information to the publisher, rather than preventing other subscribers from receiving the message (which I think depends on the order of the calls to subscribe()).
Couldn't this be achieved with the message itself? For example by passing an abort function that the subscriber would call.
@targos That would require protection on the publisher side from abort being called more than once. That's certainly possible, but means more logic on the publisher side, which I think we probably want to avoid in core modules (maybe we don't care? I dunno).
A similar idea is to have an additional parameter to publish that is a callback (effectively your abort). This still requires some work on the publisher side, but at least it would be in a standardized place, normalizing its usage for library (and core) authors.
e.g. something to this effect:
--- a/doc/api/diagnostics_channel.md
+++ b/doc/api/diagnostics_channel.md
@@ -117,9 +117,11 @@ if (channel.hasSubscribers) {
 }
  ```
-#### `channel.publish(message)`
+#### `channel.publish(message, abort)`
 * `message` {any} The message to send to the channel subscribers
+* `abort` {Function} A function that will be called by subscribers to instruct
+  the publisher to abort the instrumented operation.
 Publish a message to any subscribers to the channel. This will trigger
 message handlers synchronously so they will execute within the same context.
@@ -131,6 +133,8 @@ const channel = diagnostics_channel.channel('my-channel');
 channel.publish({
   some: 'message'
+}, err => {
+  throw err;
 });
 ```
@targos the point of this exception mecanism is that I think it requires the least amount of effort on both sides of the channel (publisher and subscriber).
If we start adding a different parameter like bengl proposed, or a feedback/callback mecanism throught the message object, it means added implementation bloat on every publisher. The reason I've chosen an exception, is that it's a general concept that shouldn't need specific handling.
Since both publishers and subscribers might be third parties independent from eachothers and the user app, we probably should use the most generic and universally supported syntax (for instance, a user would not be able to modify the code of fastify-diagnostics-channel to make it support this abort idea).
Also blocking of subsequent publisher cannot be achieved with the callback mecanism. So if the user does not need subsequent publisher blocking, then let him use his own callback. But let's also give him a way to block subsequent publishers.
I think it needs to be decided if the publisher should have to explicitly support being aborted/interrupted or if it should be considered a universal feature that a publish could throw some sort of abort/interrupt error.
it's a general concept that shouldn't need specific handling
try / catch is a specific handling
But with try/catch the publisher can opt to do nothing about it, and instead defer to calling code. This should simplify code on the publishing side to effectively just the current API/usage, which I think is an explicit goal here.
There's likely many places currently which do not expect to throw internally and would therefore potentially raise an exception up to an uncaughtException rather than, for example, reaching a callback. That may not be the desired effect. For example, if you throw somewhere in the internals of an http request it may not actually get to the point where it would trigger the appropriate events on the request object, it may need explicit handling. 🤔
@Qard For sure. In cases where simply bubbling an error up isn't an option, publisher code would need to be wrapped and handled accordingly. That's true with either "throw everything" or "throw only the instances of this special error class", which is why I see them as mostly equivalent.
Passing a callback requires a bunch of work on the publisher side that's not needed in many (but not all) cases if using the try/catch approach.
Isn't the main idea of the diagnosics channel to have publisher and subscirbers as decoupled as possbile? Why should one subscriber decide that another subscriber is not getting an event? Besides that the sequence in which subscribers are called is not specified therefore it's hard to ensure that your "master" subscriber is first.
An example of a very real use case would be to have a monitoring subscriber that would collect metrics and monitor the health of the app, and could decide to interrupt an action if it exceeds certain limits, contains invalid or malicious data, etc...
But why should this have impact to other subscribers which may be still able to consume the data.
The intent is not to interrupt other subscribers, it is to interrupt the publisher. For example, if you tried to fs.readFile(...) the passwd file it could throw some error denying access to it.
Welcome @simon-id ! I'm so excited by your first PR!
@targos would this mean something like an AbortController in the end ?
would this mean something like an AbortController in the end ?
Maybe. I don't really care about what it is in the end. What bothers me is the fact that it currently interrupts other remaining subscribers in addition to the publisher.
The intent is not to interrupt other subscribers, it is to interrupt the publisher. For example, if you tried to
fs.readFile(...)thepasswdfile it could throw some error denying access to it.
But this is not implemented. The loop iterating over all subscribers is left.
And I still don't agree that diagnostics channel is the right thing to use for such usecases. The main idea is to decouple a publisher from N subscribers to efficient send out data. But this usecase is more like a publisher "asks" subscribers.
So if there were a try...catch in the loop and that the error would be rethrown (what happen if we have multiple ones) then, it would guarantee that all subscribers get the info but the emitter would get (an) error(s). Would that make sense ?
So if there were a
try...catchin the loop and that the error would be rethrown (what happen if we have multiple ones) then, it would guarantee that all subscribers get the info but the emitter would get (an) error(s). Would that make sense ?
As mentioned above I don't think diagnostics channel was designed bidirectional. What if one subscriber says stop but the other two say go?
@Flarna If I'd reimplement it without the subsequent subscriber interruption, and instead throw after the loop, would you be +1 on it ?
I still don't understand the usecase where this is needed. As a result I have the feeling this breaks the main idea of diagnostics channel to allow efficient collection of data. Usually diagnostic tools should be as invisible as possible therefore throwing exceptions into unknown code areas sounds wrong to me.
I think we should also specify what a publisher should to if it receives a DCInterruptError. In the end this change implies that every publisher needs to add a try catch block now and it should be clear what is expected from them if such an error arrives.
As the one that originally wrote the diagnostics_channel stuff, this was very much on my mind when designing it, but we chose to simplify at the time because the specifics of how to use the data to interrupt execution was still under debate and I wanted to at least get the information gathering component in as that was the main part APM wanted. The ultimate goal though is to, as much as possible, have a unified hooks system that can be used for monitoring application behaviour and acting on it accordingly.
Personally, I'm leaning a bit toward something more explicit like an extra publish(...) argument to pass through an AbortController. As I implied before, being able to interrupt anything arbitrarily could have some unexpected side effects.
Having something like this more explicit and enabled by publisher sounds good to me. I could even imagine to have more features in this backchannel, e.g. adding http headers,... but that's most likely for future extensions.
+1 on this then ?
const dc = require('diagnostics_channel');
const channel = dc.channel('test');
// subscriber
channel.subscribe((message, name, abortController) => {
  abortController.abort();
});
// publisher
const ac = new AbortController();
channel.publish({data}, ac); // optional AbortController parameter
if (ac.signal.aborted) // stop whatever it's doing
This looks better as the event in question still arrives on all subscribers. And it's the publisher who decides if abort is supported or not (assuming here that AbortController is optional to use).
But not sure if AbortController is the right thing. Doc tells A utility class used to signal cancelation in selected Promise-based APIs.  - this is not promise based. But maybe it's just a matter of updating doc there.
Doc tells A utility class used to signal cancelation in selected Promise-based APIs.
Yes I've seen that too, but I found that there also is a prop AC.signal.aborted that makes it possible for synchronous environements. Also I've checked the code and abort events are supposed to fire synchronously. So as you say it might be just a question of documentation.
What's the status here? If we still want to land this, it would need to be rebased on top of main.
This issue/PR was marked as stalled, it will be automatically closed in 30 days. If it should remain open, please leave a comment explaining why it should remain open.
@aduh95 Honestly I don't know. We worked around this by passing an object to publish() that includes an abort controller along the rest of the data. Though we can only do this with channels that we own as this is not a standardized way to interrupt DC flow.
I could still implement it again with modifications if you guys want but I don't have more ideas on how to implement this, or if we should even implement it at all tbh.
I was trying to plan ahead on the future of DC potentially reporting data by every major libraries, and give a standard way to provide flow interruption, but maybe we're still too early ?
I think it's reasonable given that abort support should be left as a choice to the publisher if they wish to support it that it just be a suggested pattern that an abortController property exist on the event object in the case that the API wants to enable subscribers to abort it. It's always left up to the channel provider what they wish to share through their channels and the intent was always for us to describe standards to adhere to for common cases such as this, which is what TracingChannel already does for the tracing use case, with some sugar API over top of the documented pattern. The same could be done with other use cases like supporting AbortController or whatever other scenarios we think of in the future.
An example of the use:
function get(url) {
  return new Promise((resolve, reject) => {
    if (start.hasSubscribers) {
      const abortController = new AbortController()
      abortController.signal.on('abort', reject)
      start.publish({ url, abortController })
    }
    
    // Actual get logic goes here...
  }
}
// elsewhere...
start.subscribe(({ url, abortController }) => {
  if (isBlocked(url) {
    abortController.abort()
  }
})
A benefit of defining suggested patterns rather than explicit API support is that we don't have to deal with API compatibility differences over time. A channel provider may assume a second argument to channel.publish(...) will flow through to the subscriber even though that would not be the case for any existing versions of the API. Similarly, the subscriber would be unable to tell if an abortController is absent because it was not provided or because the API level doesn't yet support passing one through.
@Qard I'll agree with your approach here. If we all agree to not put this directly in an explicit API but more as a suggested pattern, should I make a replacement PR to write up an example like that on the documentation instead ? Do we have other ways to "suggest" patterns ?
We currently have this section explaining the TracingChannel pattern. You could write something similar for making things abortable.