moleculer
moleculer copied to clipboard
Dynamic event subscriptions
Is your feature request related to a problem? Please describe.
It is currently not possible (as far as I can tell) to modify a service's events after the service has started. Imagine a service where event subscriptions are created and updated by an action call (or other event trigger). For example, there could be a function-as-a-service platform that allows users to define functions and have them be executed when a particular event is received. When a function is created, the service needs to modify its event handlers to listen for that function's triggers.
Describe the solution you'd like
I'd like a simple way for an action or event handler to subscribe and unsubscribe event handlers while a service is running.
Describe alternatives you've considered
One alternative is to destroy and recreate a service with updated event subscriptions any time the list of subscriptions changes. This is not ideal.
Could you show a pesudo code how do you think?
@ngraef Another alternative is a ** event listener that dynamically filters incoming events and deals with them as needed. I've implemented a couple services this way personally. One service listens to all events and filters them through several notification definition objects which determine whether notification messages need to be created for users/teams associated to the event. The other service listens for all events and filters them for processing to an AWS EventBridge Event Bus. Works quite well and does not require dynamically updating which events the service is listening for.
With that being said, each service is created by a broker and stores a reference to that broker as a dependency for emitting/broadcasting events. The broker is similar to an event emitter in this case so it may make most sense to further implement that interface on the broker to support calls like .on(event, callback) and .once(event, callback). I would image that calls like this would update the internal event registry and announce those changes to the system.
@ccampanale That is an alternative, though it's essentially reimplementing the responsibilities of the event subsystem (NATS in our case).
@icebob Here's an idea using the example of a function-as-a-service platform.
{
name: 'functionManager',
actions: {
get(ctx) {
// retrieve from database
const fn = {
id: 'fn-id',
triggers: ['event.a', 'event.b'],
payload: '...'
};
return fn;
},
create(ctx) {
// ...
ctx.emit('function.created', fn);
},
update(ctx) {
// ...
ctx.emit('function.updated', { old: orig, new: fn });
},
destroy(ctx) {
// ...
ctx.emit('function.destroyed', fn);
}
}
}
{
name: 'functionExecutor',
actions: {
async execute(ctx) {
const { id } = ctx.params;
const { payload } = await ctx.call('functionManager.get', { id });
// safely execute payload in a sandbox
// ...
ctx.emit('funciton.executed', { id });
}
}
}
{
name: 'functionListener',
events: {
async 'function.created'(ctx) {
const { id, triggers } = ctx.params;
triggers.forEach(event => {
this.subscribe(event, id, this.methods.buildTriggerHandler(id));
});
},
async 'function.updated'(ctx) {
const { old, new } = ctx.params;
// subscribe added triggers and unsubscribe removed triggers
}
async 'function.destroyed'(ctx) {
const { id, triggers } = ctx.params;
triggers.forEach(event => {
this.unsubscribe(event, id);
});
}
},
methods: {
buildTriggerHandler(id) {
return (ctx) => {
ctx.call('functionExecutor.execute', { id });
};
}
},
started() {
// get active triggers and register subscriptions
}
}
Service Instance Methods
subscribe(eventName, subscriptionId, handler): subscriptionId
- where
subscriptionIdis unique pereventName. IfsubscriptionIdisnullorundefined, one is generated and returned. If a listener foreventNamewithsubscriptionIdalready exists, the handler is updated.
unsubscribe(eventName, subscriptionId): void
Minor changed:
Service Instance Methods
subscribe(eventName, handler, subscriptionId?): subscriptionId
- where
subscriptionIdis unique pereventName. IfsubscriptionIdisnullorundefined, one is generated and returned. If a listener foreventNamewithsubscriptionIdalready exists, the handler is updated.
unsubscribe(eventName, subscriptionId?): boolean
unsubscribe all listeners if no subscriptionId. return false if no such subscriptionId or eventName.