nest icon indicating copy to clipboard operation
nest copied to clipboard

It is not possible to use RegEx for event matching on the @EventPattern() adn @MessagePattern()

Open alfredoperez opened this issue 4 years ago • 31 comments

Bug Report

.

Current behavior

It is not possible to use a RegEx for event and message patterns. This is possible when using KafkaJS library but is not possible in NestJS

Input Code

Having a controller emitting a message 'pre-notify-post'

@Post('notify-with-regex')
  async sendNotificationForRegExMatching(): Promise<any> {
    return this.client.emit('pre-notify-post', { notify: true });
  }

Also having a controller with the event handler and expecting messages that match the regular expression of /.*notify.*/ :

 @EventPattern(/.*notify.*/)
  secondEventHandler(data: any) {
    KafkaController.IS_NOTIFIED_WITH_REGEX = data.value.notify;
  }

Currently, the event handler never gets called, because the handler is not matched using regular expressions.

Expected behavior

It is expected to listen to events and messages by using RegEx patterns. This is possible in KafkaJS library and it is useful when having a set of a topic that the consumer wants to listen to.

Alternatively, you can subscribe to multiple topics at once using a RegExp:

await consumer.connect() await consumer.subscribe({ topic: /topic-(eu|us)-.*/i })

https://kafka.js.org/docs/consuming

Possible Solution

This could be fixed by getting the matching routes.

Environment

Nest version: 6.7

alfredoperez avatar Oct 01 '19 12:10 alfredoperez

I can work on the fix for this, but I want to get your opinion first.

alfredoperez avatar Oct 01 '19 12:10 alfredoperez

I'd love to see the draft PR. Would you like to create one with your idea?

kamilmysliwiec avatar Oct 03 '19 06:10 kamilmysliwiec

Any updates on this? Without this feature kafka is pretty much useless for us.

Vetm avatar May 10 '21 11:05 Vetm

I also need this feature. Some microservices need to handle different kafka topics in the same way. We have to write a separate listener for each topic.

inmativ avatar Jul 15 '21 07:07 inmativ

Hello! Any updates? We also stuck with this.

vviital avatar Jul 29 '21 13:07 vviital

I'm just a random with some experience in creating own microservice transport (actually at work we replaced nearly whole nest/microservices to match our needs), but i can try to implement it in free time. @alfredoperez if i understand it correctly, this kind of pattern matching would require an array of regexes checking if given event has a regex matching it pattern. However, unlike for string matching, which is exact, more than one regex can match one string. What should happen (/what is currently happening in kafka in implementation you mentioned) when it is happening?

For example in case when the event is "someString", and there are 2 handlers: someHandler matching /some.*/ and string handler matching for /.*string/i. Should the first (which is first?) handler be executed, or both of them?

Sorry for mentioning, but @kamilmysliwiec how would you handle this situation? Is there any place in code when there is similiar (regex based) patter matching?

kuskoman avatar Aug 03 '21 21:08 kuskoman

Hi, any update on this?

Idan747 avatar Oct 13 '21 09:10 Idan747

We have a similar requirement, either the MessagePattern decorator takes in an array of strings or a regex could also work.

Jaikant avatar Nov 22 '21 15:11 Jaikant

If anyone needs, then there is a workaround for regexp support. You just need to add custom strategy and override bindEvents method as shown below.

import { CustomTransportStrategy, ServerKafka } from '@nestjs/microservices';
import { Consumer } from 'kafkajs';

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 2),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }
}

And use regexp like this

@EventPattern('/my-super-topic.retry.[0-9]+$/', { flags: 'i' })
async mySuperTopicRetry() {}

fjodor-rybakov avatar Mar 24 '22 10:03 fjodor-rybakov

@fjodor-rybakov Was there anything else you had to change? I copy / pasted your class and added it as the strategy for my microservice to just try it and it doesn't call the regular expression event pattern. I get the following Error instead.

@EventPattern('/myevent-.*/', { flags: 'i' })
async onSomeEvent(data: any) {
  // this never gets called
  console.log('->', data);
}

// ... elsewhere in the code I call the emit
this.client.emit('myevent-1', obj);
ERROR [ServerKafka] There is no matching event handler defined in the remote service. Event pattern: myevent-1

If I hard-code the event pattern it works...

@EventPattern('myevent-1')
async onSomeEvent(data: any) {
  // this never gets called
  console.log('->', data);
}

jfelicianiats avatar Jul 22 '22 16:07 jfelicianiats

@fjodor-rybakov Was there anything else you had to change? I copy / pasted your class and added it as the strategy for my microservice to just try it and it doesn't call the regular expression event pattern. I get the following Error instead.

@EventPattern('/myevent-.*/', { flags: 'i' })
async onSomeEvent(data: any) {
  // this never gets called
  console.log('->', data);
}

// ... elsewhere in the code I call the emit
this.client.emit('myevent-1', obj);
ERROR [ServerKafka] There is no matching event handler defined in the remote service. Event pattern: myevent-1

If I hard-code the event pattern it works...

@EventPattern('myevent-1')
async onSomeEvent(data: any) {
  // this never gets called
  console.log('->', data);
}

Sry, i forgot...

You also need to override the getHandlerByPattern method, that it looks for a handler with a suitable regex for the consumer topic

fjodor-rybakov avatar Jul 25 '22 09:07 fjodor-rybakov

So thanks for the reply @fjodor-rybakov I got it working somewhat now but there are definite issues still. For example, if you have multiple EventPatterns using Regular Expressions that overlap on the event that should fire (ie. /myevent-[0-9]+/ and /myevent-.*/) only the first one found will execute or if there is an exact string event name only that then would fire.

Here is the updated class that I was testing with...

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 1),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }

  override getHandlerByPattern(pattern: string) {
    const route = this.getRouteFromPattern(pattern);

    return this.messageHandlers.has(route)
      ? this.messageHandlers.get(route)
      : this.testRegularExpressions(route) || null;
  }

  private testRegularExpressions(pattern: string) {
    for (const [key, val] of this.messageHandlers.entries()) {
      if (!key.startsWith('/') || !key.endsWith('/')) continue;

      const regex = new RegExp(
        key.slice(1, pattern.length - 1),
        val.extras.flags,
      );
      if (regex.test(pattern)) {
        return val;
      }
    }
  }
}

I also attempted to resolve these issues but this is over my head atm for the amount of time I have to work with it. I took a peek at the source code and attempted to mimic chaining the funcitons via the next property but it still doesn't work correctly. There is probably something else happening behind the scenes with the way next is setup and used I didn't see or understand.

Here is the code for that if anyone wants to expand off the idea...

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 1),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }

  override getHandlerByPattern(pattern: string) {
    const route = this.getRouteFromPattern(pattern);
    const handlers: MessageHandler[] = [];

    if (this.messageHandlers.has(route))
      handlers.push(this.messageHandlers.get(route));

    for (const [key, val] of this.messageHandlers.entries()) {
      if (!key.startsWith('/') || !key.endsWith('/')) continue;

      const regex = new RegExp(key.slice(1, key.length - 1), val.extras.flags);

      if (regex.test(pattern)) {
        handlers.push(val);
      }
    }

    const allHandlers: MessageHandler[][] = [];
    for (let i = 0; i < handlers.length; i++) {
      const handler = handlers[i];
      const hierarchy: MessageHandler[] = [];

      let nextChild = handler;
      while (nextChild) {
        hierarchy.push(this.cloneHandle(nextChild));
        nextChild = nextChild.next;
      }

      allHandlers.push(hierarchy);
    }

    const flattened = allHandlers.flat();
    for (let i = flattened.length - 1; i >= 0; i--) {
      const handler = flattened[i];
      const prev = flattened[i - 1];

      if (prev) prev.next = handler;
    }

    return flattened.length > 0 ? flattened[0] : null;
  }

  private cloneHandle(handle: MessageHandler) {
    const dup = handle.bind({}) as MessageHandler;
    dup.isEventHandler = handle.isEventHandler;
    dup.extras = { ...handle.extras };
    return dup;
  }
}

jfelicianiats avatar Jul 25 '22 14:07 jfelicianiats

@jfelicianiats

This should work

import { CustomTransportStrategy, ServerKafka } from '@nestjs/microservices';
import { Consumer } from 'kafkajs';

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 2),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }

  public override getHandlerByPattern(pattern: string) {
    const handler = super.getHandlerByPattern(pattern);
    if (handler) {
      return handler;
    }

    return this.getHandlerByRegExp(pattern);
  }

  private getHandlerByRegExp(pattern: string) {
    const route = this.getRouteFromPattern(pattern);

    const keys = this.messageHandlers.keys();
    for (const key of keys) {
      const regexp = new RegExp(key);
      if (regexp.test(route)) return this.messageHandlers.get(key);
    }

    return null;
  }
}

fjodor-rybakov avatar Jul 29 '22 11:07 fjodor-rybakov

Hi everybody,

Thanks for the discussion and posting workarounds.

My question is: do these workarounds also support topics that get created in future? If my app is deployed, and then a topic whose name match the regex gets created, will the app get subscribed to this new topic?

Thank you!

Cheers, Wei

ww917352 avatar Aug 04 '22 09:08 ww917352

Hi everybody,

Thanks for the discussion and posting workarounds.

My question is: do these workarounds also support topics that get created in future? If my app is deployed, and then a topic whose name match the regex gets created, will the app get subscribed to this new topic?

Thank you!

Cheers, Wei

No, subscribe on topic only happens when consumer starts

You can pass a list of matching topics at once

@EventPattern(['topic.one', 'topic.two'])

I think this will solve your problem

fjodor-rybakov avatar Aug 04 '22 09:08 fjodor-rybakov

Thank you @fjodor-rybakov

We have a topic for each customer such as <<customerId>>.traffic. It is impossible to list them all in @EventPattern. Also when new customers are onboarded, new topics will be added.

We try to do with @EventPattern('/*.traffic/'), but it does not work.

Or, do I misunderstand your suggestion?

ww917352 avatar Aug 04 '22 10:08 ww917352

Same issue as @ww917352, any updates on this?

lfowlie avatar Sep 01 '22 13:09 lfowlie

Adding +1, this is greatly needed since any message that comes in with a malformed pattern will stop the app from consuming completely.

NishKebab avatar Sep 01 '22 14:09 NishKebab

+1 for this

NenadJovicic avatar Sep 09 '22 15:09 NenadJovicic

+1

g-capasso avatar Oct 16 '22 15:10 g-capasso

+1

beebeebeeebeee avatar Oct 27 '22 07:10 beebeebeeebeee

Before deciding on an official Api for this, please consider the same partial matching in Event/MessagePattern, when the pattern is an object.

Eg. The Nest microservice consumes messages from a queue and handler qualifies based on a sub-set of the message properties/attributes

Given message handler definition

@MessagePattern({ foo: 'foo' }, { partialMatch: true }) // ?
public handlerMethod() {}

it qualifies incoming message patterns that partially match

{ foo: 'foo', bar: 'bar', ... }

katrotz avatar Nov 02 '22 14:11 katrotz

+1

eugene-chernyshenko avatar Nov 19 '22 23:11 eugene-chernyshenko

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 2),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }

  public override getHandlerByPattern(pattern: string) {
    const handler = super.getHandlerByPattern(pattern);
    if (handler) {
      return handler;
    }

    return this.getHandlerByRegExp(pattern);
  }

  private getHandlerByRegExp(pattern: string) {
    const route = this.getRouteFromPattern(pattern);

    const keys = this.messageHandlers.keys();
    for (const key of keys) {
      const regexp = new RegExp(key);
      if (regexp.test(route)) return this.messageHandlers.get(key);
    }

    return null;
  }
}

Some fix

const regexp = new RegExp(key.slice(1, key.length - 1));

eugene-chernyshenko avatar Nov 20 '22 01:11 eugene-chernyshenko

Any update?

bozorgmehr96 avatar Nov 28 '22 11:11 bozorgmehr96

+1

ryakoviv avatar Mar 21 '23 16:03 ryakoviv

Also would like somthing like this (more specifically with rabbitMq) - I was slightly misled by the docs here https://docs.nestjs.com/microservices/basics#decorators which implies to me that

@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`); // e.g. "time.us.east"
  return new Date().toLocaleTimeString(...);
}

would match "time.us.west", "time.us.east" etc.

johansenja avatar Apr 05 '23 12:04 johansenja

@kamilmysliwiec Hello! Do you know if this was fixed? Or there wasn't a solution found? We could really use this feature since we have different envs, hence different topics to consume from.

abrudane avatar May 15 '23 14:05 abrudane

NestJS feels like a collection of leaky abstractions with missing features and edge cases creeping there to surprise you every now and then and the official docs don't go into the details deeply enough to prepare for those things so you discover them only as you go :(

PiotrOwsiak avatar Jun 07 '23 13:06 PiotrOwsiak

Updates?

buccfer-knauf avatar Oct 05 '23 13:10 buccfer-knauf