moleculer icon indicating copy to clipboard operation
moleculer copied to clipboard

Dynamic event subscriptions

Open ngraef opened this issue 5 years ago • 4 comments

Is your feature request related to a problem? Please describe.

It is currently not possible (as far as I can tell) to modify a service's events after the service has started. Imagine a service where event subscriptions are created and updated by an action call (or other event trigger). For example, there could be a function-as-a-service platform that allows users to define functions and have them be executed when a particular event is received. When a function is created, the service needs to modify its event handlers to listen for that function's triggers.

Describe the solution you'd like

I'd like a simple way for an action or event handler to subscribe and unsubscribe event handlers while a service is running.

Describe alternatives you've considered

One alternative is to destroy and recreate a service with updated event subscriptions any time the list of subscriptions changes. This is not ideal.

ngraef avatar Dec 31 '19 22:12 ngraef

Could you show a pesudo code how do you think?

icebob avatar Jan 15 '20 19:01 icebob

@ngraef Another alternative is a ** event listener that dynamically filters incoming events and deals with them as needed. I've implemented a couple services this way personally. One service listens to all events and filters them through several notification definition objects which determine whether notification messages need to be created for users/teams associated to the event. The other service listens for all events and filters them for processing to an AWS EventBridge Event Bus. Works quite well and does not require dynamically updating which events the service is listening for.

With that being said, each service is created by a broker and stores a reference to that broker as a dependency for emitting/broadcasting events. The broker is similar to an event emitter in this case so it may make most sense to further implement that interface on the broker to support calls like .on(event, callback) and .once(event, callback). I would image that calls like this would update the internal event registry and announce those changes to the system.

ccampanale avatar Jan 15 '20 20:01 ccampanale

@ccampanale That is an alternative, though it's essentially reimplementing the responsibilities of the event subsystem (NATS in our case).

@icebob Here's an idea using the example of a function-as-a-service platform.

{
    name: 'functionManager',
    actions: {
        get(ctx) {
            // retrieve from database
            const fn = {
                id: 'fn-id',
                triggers: ['event.a', 'event.b'],
                payload: '...'
            };
            
            return fn;
        },
        create(ctx) {
            // ...
            ctx.emit('function.created', fn);
        },
        update(ctx) {
            // ...
            ctx.emit('function.updated', { old: orig, new: fn });
        },
        destroy(ctx) {
            // ...
            ctx.emit('function.destroyed', fn);
        }
    }
}
{
    name: 'functionExecutor',
    actions: {
        async execute(ctx) {
            const { id } = ctx.params;
            const { payload } = await ctx.call('functionManager.get', { id });
            // safely execute payload in a sandbox
            // ...
            ctx.emit('funciton.executed', { id });
        }
    }
}
{
    name: 'functionListener',
    events: {
        async 'function.created'(ctx) {
            const { id, triggers } = ctx.params;
            triggers.forEach(event => {
                this.subscribe(event, id, this.methods.buildTriggerHandler(id));
            });
        },
        async 'function.updated'(ctx) {
            const { old, new } = ctx.params;
            // subscribe added triggers and unsubscribe removed triggers
        }
        async 'function.destroyed'(ctx) {
            const { id, triggers } = ctx.params;
            triggers.forEach(event => {
                this.unsubscribe(event, id);
            });
        }
    },
    methods: {
        buildTriggerHandler(id) {
            return (ctx) => {
                ctx.call('functionExecutor.execute', { id });
            };
        }
    },
    started() {
        // get active triggers and register subscriptions
    }
}

Service Instance Methods

subscribe(eventName, subscriptionId, handler): subscriptionId

  • where subscriptionId is unique per eventName. If subscriptionId is null or undefined, one is generated and returned. If a listener for eventName with subscriptionId already exists, the handler is updated.

unsubscribe(eventName, subscriptionId): void

ngraef avatar Jan 28 '20 08:01 ngraef

Minor changed:

Service Instance Methods

subscribe(eventName, handler, subscriptionId?): subscriptionId

  • where subscriptionId is unique per eventName. If subscriptionId is null or undefined, one is generated and returned. If a listener for eventName with subscriptionId already exists, the handler is updated.

unsubscribe(eventName, subscriptionId?): boolean

unsubscribe all listeners if no subscriptionId. return false if no such subscriptionId or eventName.

snowyu avatar Jul 01 '21 00:07 snowyu