nest icon indicating copy to clipboard operation
nest copied to clipboard

Fix rpc exception kafka

Open RaiMX opened this issue 3 years ago • 1 comments
trafficstars

PR Checklist

Please check if your PR fulfills the following requirements:

  • [x] The commit message follows our guidelines: https://github.com/nestjs/nest/blob/master/CONTRIBUTING.md
  • [ ] Tests for the changes have been added (for bug fixes / features)
  • [ ] Docs have been added / updated (for bug fixes / features)

PR Type

What kind of change does this PR introduce?

  • [x] Bugfix
  • [ ] Feature
  • [ ] Code style update (formatting, local variables)
  • [ ] Refactoring (no functional changes, no api changes)
  • [ ] Build related changes
  • [ ] CI related changes
  • [ ] Other... Please describe:

What is the current behavior?

https://github.com/nestjs/nest/issues/10113

Issue Number: 10113

What is the new behavior?

Microservice return custom RPC error through Kafka transport

Does this PR introduce a breaking change?

  • [ ] Yes
  • [x] No

Other information

RaiMX avatar Oct 07 '22 10:10 RaiMX

Pull Request Test Coverage Report for Build d8d9a5ab-12b5-4810-9ec6-34797051705d

  • 1 of 3 (33.33%) changed or added relevant lines in 1 file are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage decreased (-0.003%) to 93.784%

Changes Missing Coverage Covered Lines Changed/Added Lines %
packages/microservices/server/server-kafka.ts 1 3 33.33%
<!-- Total: 1 3
Totals Coverage Status
Change from base Build 6b637f64-9c7a-4d7b-8537-b6a8d098f58e: -0.003%
Covered Lines: 6110
Relevant Lines: 6515

💛 - Coveralls

coveralls avatar Oct 07 '22 10:10 coveralls

Hey @kamilmysliwiec could you take a quick look at this PR? It seems to fix an issue introduced by this commit: cfe1191d225cd40a225ca98e875e15e307702e55

EugeneKorshenko avatar Oct 18 '22 14:10 EugeneKorshenko

So no updates at the moment?

RaiMX avatar Nov 18 '22 09:11 RaiMX

I was about to open a PR to add that resolve that is blocking the execution after an error is thrown. There's something missing in this PR so that it can be merged? This PR would fix my issue as well

TheVoid-0 avatar Nov 18 '22 14:11 TheVoid-0

@kamilmysliwiec no updates yet? because of this bug I can't use version 9 in my production...sad

RaiMX avatar Dec 02 '22 07:12 RaiMX

+1 Please someone look into it. @kamilmysliwiec

JKKGBE avatar Dec 14 '22 13:12 JKKGBE

I added the following code to my application as a workaround for now:

// created file kafka-server.ts


// HACK - This is a hack to fix the issue with the @nestjs/microservices package tracked in https://github.com/nestjs/nest/pull/10379
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
export class KafkaServer extends ServerKafka {
  private combineStreamsAndThrowIfRetriable(
    response$: { subscribe: (arg0: { next: (val: any) => void; error: (err: any) => void; complete: () => any }) => void },
    replayStream$: { next: (arg0: any) => void; error: (arg0: any) => void; complete: () => any },
  ) {
    return new Promise<void>((resolve, reject) => {
      let isPromiseResolved = false;
      response$.subscribe({
        next: val => {
          replayStream$.next(val);
          if (!isPromiseResolved) {
            isPromiseResolved = true;
            resolve();
          }
        },
        error: err => {
          if (err instanceof KafkaRetriableException && !isPromiseResolved) {
            isPromiseResolved = true;
            reject(err);
          }
          replayStream$.error(err);
          resolve(); // This line is needed to prevent server stall
        },
        complete: () => replayStream$.complete(),
      });
    });
  }
}

// on main.ts

 app.connectMicroservice<MicroserviceOptions>(
      {
        strategy: new KafkaServer({
          client: {
            brokers: environment.KAFKA_URL.map(url => `${url}:${environment.KAFKA_PORT}`),
            clientId: 'logs',
          },
          consumer: { groupId: 'logs' },
        }),
      },
      { inheritAppConfig: true },
    );

    await app.startAllMicroservices();

TheVoid-0 avatar Dec 28 '22 17:12 TheVoid-0

Fixed in this PR https://github.com/nestjs/nest/pull/10982

kamilmysliwiec avatar Feb 01 '23 13:02 kamilmysliwiec