nest icon indicating copy to clipboard operation
nest copied to clipboard

Custom RPC exception when using Kafka based microservice is not returning error

Open RaiMX opened this issue 3 years ago • 9 comments

Did you read the migration guide?

  • [X] I have read the whole migration guide

Is there an existing issue that is already proposing this?

  • [X] I have searched the existing issues

Potential Commit/PR that introduced the regression

No response

NestJS version

8.4.6 -> 9.0.9

Describe the regression

Custom RPC exception when using Kafka based microservice is not returning error. Error is displayed in console, but not getting to Kafka topic. Before upgrading to v9 it was working fine.

Minimum reproduction code

No response

Input code

import { ArgumentsHost, Catch, HttpException, RpcExceptionFilter } from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';

@Catch()
export class CustomRpcExceptionFilter implements RpcExceptionFilter<RpcException> {
    
    catch(exception: RpcException | HttpException, host: ArgumentsHost): Observable<any> {
    ...
    return throwError(() => JSON.stringify(error)); // before migrating to v9 this line worked fine
    }
}

Expected behavior

Custom RPC exception should return error

Other

"dependencies": {
        "@nestjs/common": "^9.0.9",
        "@nestjs/config": "^2.2.0",
        "@nestjs/core": "^9.0.9",
        "@nestjs/microservices": "^9.0.9",
        "@nestjs/platform-express": "^9.0.9",
        "@nestjs/swagger": "^6.0.5",
        "@nestjs/typeorm": "^9.0.1",
        "@nestjsx/crud": "^5.0.0-alpha.3",
        "@nestjsx/crud-typeorm": "^5.0.0-alpha.3",
        "bcrypt": "^5.0.1",
        "cache-manager": "^4.1.0",
        "cache-manager-redis-store": "^2.0.0",
        "class-transformer": "^0.5.1",
        "class-validator": "^0.13.2",
        "ioredis": "^5.2.0",
        "kafkajs": "^2.1.0",
        "kafkajs-snappy": "^1.1.0",
        "mongoose": "^6.4.4",
        "nest-winston": "^1.7.0",
        "pg": "^8.7.3",
        "redis": "^4.2.0",
        "reflect-metadata": "^0.1.13",
        "rimraf": "^3.0.2",
        "rxjs": "^7.5.6",
        "typeorm": "^0.3.7",
        "typeorm-naming-strategies": "^4.1.0",
        "winston": "^3.8.1",
        "winston-daily-rotate-file": "^4.7.1"
    },
    "devDependencies": {
        "@nestjs/cli": "^9.0.0",
        "@nestjs/schematics": "^9.0.1",
        "@nestjs/testing": "^9.0.9",
        "@types/cache-manager": "^4.0.1",
        "@types/cache-manager-redis-store": "^2.0.1",
        "@types/express": "^4.17.13",
        "@types/jest": "^28.1.5",
        "@types/node": "^18.0.3",
        "@types/supertest": "^2.0.12",
        "@typescript-eslint/eslint-plugin": "^5.30.6",
        "@typescript-eslint/parser": "^5.30.6",
        "eslint": "^8.19.0",
        "eslint-config-prettier": "^8.5.0",
        "eslint-plugin-prettier": "^4.2.1",
        "jest": "^28.1.2",
        "prettier": "^2.7.1",
        "source-map-support": "^0.5.21",
        "supertest": "^6.2.4",
        "ts-jest": "^28.0.5",
        "ts-loader": "^9.3.1",
        "ts-node": "^10.8.2",
        "tsconfig-paths": "^4.0.0",
        "typescript": "^4.7.4"
    }

RaiMX avatar Aug 14 '22 21:08 RaiMX

Please provide a minimum reproduction repository (Git repository/StackBlitz/CodeSandbox project).

jmcdo29 avatar Aug 14 '22 23:08 jmcdo29

Please provide a minimum reproduction repository (Git repository/StackBlitz/CodeSandbox project).

Here is the repo https://github.com/RaiMX/nestjs-kafka-exceptions

RaiMX avatar Aug 15 '22 08:08 RaiMX

any updates?

RaiMX avatar Sep 09 '22 06:09 RaiMX

any updates?

peshkov3 avatar Sep 29 '22 15:09 peshkov3

Please anybody?

RaiMX avatar Oct 06 '22 08:10 RaiMX

forcing me to downgrade to version 8...

RaiMX avatar Oct 06 '22 08:10 RaiMX

Faced same problem week ago, there are bug in ServerKafka. Extend ServerKafka and override method combineStreamsAndThrowIfRetriable with:

private combineStreamsAndThrowIfRetriable(response$: Observable<any>, replayStream$: ReplaySubject<unknown>) {
  return new Promise<void>((resolve, reject) => {
    response$.subscribe({
      next: (val) => {
        replayStream$.next(val);
        resolve();
      },
      error: (err) => {
        if (err instanceof KafkaRetriableException) {
          reject(err);
        }
        replayStream$.error(err);
        resolve();
      },
      complete: () => replayStream$.complete(),
    });
  });
}

Also in new version you need to provide Exception instance instead of message of exception in RpcExceptionFilter:

catch(exception: RpcException | HttpException, host: ArgumentsHost): Observable<any> {
  ...
  return throwError(() => exception);
}

kimvladis avatar Oct 07 '22 04:10 kimvladis

Faced same problem week ago, there are bug in ServerKafka. Extend ServerKafka and override method

Thank you! I confirm that this solution works!

RaiMX avatar Oct 07 '22 07:10 RaiMX

I confirm that this solution works!

@kimvladis Could you submit a pull request?

RaiMX avatar Oct 07 '22 08:10 RaiMX

Let's track this here https://github.com/nestjs/nest/pull/10379

kamilmysliwiec avatar Oct 21 '22 15:10 kamilmysliwiec

Faced same problem week ago, there are bug in ServerKafka. Extend ServerKafka and override method combineStreamsAndThrowIfRetriable with:

private combineStreamsAndThrowIfRetriable(response$: Observable<any>, replayStream$: ReplaySubject<unknown>) {
  return new Promise<void>((resolve, reject) => {
    response$.subscribe({
      next: (val) => {
        replayStream$.next(val);
        resolve();
      },
      error: (err) => {
        if (err instanceof KafkaRetriableException) {
          reject(err);
        }
        replayStream$.error(err);
        resolve();
      },
      complete: () => replayStream$.complete(),
    });
  });
}

Also in new version you need to provide Exception instance instead of message of exception in RpcExceptionFilter:

catch(exception: RpcException | HttpException, host: ArgumentsHost): Observable<any> {
  ...
  return throwError(() => exception);
}

How would you extend ServerKafka? I mean extend whole chain of classes?

RaiMX avatar Dec 04 '22 07:12 RaiMX