Custom RPC exception when using Kafka based microservice is not returning error
Did you read the migration guide?
- [X] I have read the whole migration guide
Is there an existing issue that is already proposing this?
- [X] I have searched the existing issues
Potential Commit/PR that introduced the regression
No response
NestJS version
8.4.6 -> 9.0.9
Describe the regression
Custom RPC exception when using Kafka based microservice is not returning error. Error is displayed in console, but not getting to Kafka topic. Before upgrading to v9 it was working fine.
Minimum reproduction code
No response
Input code
import { ArgumentsHost, Catch, HttpException, RpcExceptionFilter } from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';
@Catch()
export class CustomRpcExceptionFilter implements RpcExceptionFilter<RpcException> {
catch(exception: RpcException | HttpException, host: ArgumentsHost): Observable<any> {
...
return throwError(() => JSON.stringify(error)); // before migrating to v9 this line worked fine
}
}
Expected behavior
Custom RPC exception should return error
Other
"dependencies": {
"@nestjs/common": "^9.0.9",
"@nestjs/config": "^2.2.0",
"@nestjs/core": "^9.0.9",
"@nestjs/microservices": "^9.0.9",
"@nestjs/platform-express": "^9.0.9",
"@nestjs/swagger": "^6.0.5",
"@nestjs/typeorm": "^9.0.1",
"@nestjsx/crud": "^5.0.0-alpha.3",
"@nestjsx/crud-typeorm": "^5.0.0-alpha.3",
"bcrypt": "^5.0.1",
"cache-manager": "^4.1.0",
"cache-manager-redis-store": "^2.0.0",
"class-transformer": "^0.5.1",
"class-validator": "^0.13.2",
"ioredis": "^5.2.0",
"kafkajs": "^2.1.0",
"kafkajs-snappy": "^1.1.0",
"mongoose": "^6.4.4",
"nest-winston": "^1.7.0",
"pg": "^8.7.3",
"redis": "^4.2.0",
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
"rxjs": "^7.5.6",
"typeorm": "^0.3.7",
"typeorm-naming-strategies": "^4.1.0",
"winston": "^3.8.1",
"winston-daily-rotate-file": "^4.7.1"
},
"devDependencies": {
"@nestjs/cli": "^9.0.0",
"@nestjs/schematics": "^9.0.1",
"@nestjs/testing": "^9.0.9",
"@types/cache-manager": "^4.0.1",
"@types/cache-manager-redis-store": "^2.0.1",
"@types/express": "^4.17.13",
"@types/jest": "^28.1.5",
"@types/node": "^18.0.3",
"@types/supertest": "^2.0.12",
"@typescript-eslint/eslint-plugin": "^5.30.6",
"@typescript-eslint/parser": "^5.30.6",
"eslint": "^8.19.0",
"eslint-config-prettier": "^8.5.0",
"eslint-plugin-prettier": "^4.2.1",
"jest": "^28.1.2",
"prettier": "^2.7.1",
"source-map-support": "^0.5.21",
"supertest": "^6.2.4",
"ts-jest": "^28.0.5",
"ts-loader": "^9.3.1",
"ts-node": "^10.8.2",
"tsconfig-paths": "^4.0.0",
"typescript": "^4.7.4"
}
Please provide a minimum reproduction repository (Git repository/StackBlitz/CodeSandbox project).
Please provide a minimum reproduction repository (Git repository/StackBlitz/CodeSandbox project).
Here is the repo https://github.com/RaiMX/nestjs-kafka-exceptions
any updates?
any updates?
Please anybody?
forcing me to downgrade to version 8...
Faced same problem week ago, there are bug in ServerKafka. Extend ServerKafka and override method combineStreamsAndThrowIfRetriable with:
private combineStreamsAndThrowIfRetriable(response$: Observable<any>, replayStream$: ReplaySubject<unknown>) {
return new Promise<void>((resolve, reject) => {
response$.subscribe({
next: (val) => {
replayStream$.next(val);
resolve();
},
error: (err) => {
if (err instanceof KafkaRetriableException) {
reject(err);
}
replayStream$.error(err);
resolve();
},
complete: () => replayStream$.complete(),
});
});
}
Also in new version you need to provide Exception instance instead of message of exception in RpcExceptionFilter:
catch(exception: RpcException | HttpException, host: ArgumentsHost): Observable<any> {
...
return throwError(() => exception);
}
Faced same problem week ago, there are bug in ServerKafka. Extend ServerKafka and override method
Thank you! I confirm that this solution works!
I confirm that this solution works!
@kimvladis Could you submit a pull request?
Let's track this here https://github.com/nestjs/nest/pull/10379
Faced same problem week ago, there are bug in ServerKafka. Extend ServerKafka and override method
combineStreamsAndThrowIfRetriablewith:private combineStreamsAndThrowIfRetriable(response$: Observable<any>, replayStream$: ReplaySubject<unknown>) { return new Promise<void>((resolve, reject) => { response$.subscribe({ next: (val) => { replayStream$.next(val); resolve(); }, error: (err) => { if (err instanceof KafkaRetriableException) { reject(err); } replayStream$.error(err); resolve(); }, complete: () => replayStream$.complete(), }); }); }Also in new version you need to provide Exception instance instead of message of exception in RpcExceptionFilter:
catch(exception: RpcException | HttpException, host: ArgumentsHost): Observable<any> { ... return throwError(() => exception); }
How would you extend ServerKafka? I mean extend whole chain of classes?