nest
nest copied to clipboard
Fix rpc exception kafka
PR Checklist
Please check if your PR fulfills the following requirements:
- [x] The commit message follows our guidelines: https://github.com/nestjs/nest/blob/master/CONTRIBUTING.md
- [ ] Tests for the changes have been added (for bug fixes / features)
- [ ] Docs have been added / updated (for bug fixes / features)
PR Type
What kind of change does this PR introduce?
- [x] Bugfix
- [ ] Feature
- [ ] Code style update (formatting, local variables)
- [ ] Refactoring (no functional changes, no api changes)
- [ ] Build related changes
- [ ] CI related changes
- [ ] Other... Please describe:
What is the current behavior?
https://github.com/nestjs/nest/issues/10113
Issue Number: 10113
What is the new behavior?
Microservice return custom RPC error through Kafka transport
Does this PR introduce a breaking change?
- [ ] Yes
- [x] No
Other information
Pull Request Test Coverage Report for Build d8d9a5ab-12b5-4810-9ec6-34797051705d
- 1 of 3 (33.33%) changed or added relevant lines in 1 file are covered.
- No unchanged relevant lines lost coverage.
- Overall coverage decreased (-0.003%) to 93.784%
| Changes Missing Coverage | Covered Lines | Changed/Added Lines | % |
|---|---|---|---|
| packages/microservices/server/server-kafka.ts | 1 | 3 | 33.33% |
| <!-- | Total: | 1 | 3 |
| Totals | |
|---|---|
| Change from base Build 6b637f64-9c7a-4d7b-8537-b6a8d098f58e: | -0.003% |
| Covered Lines: | 6110 |
| Relevant Lines: | 6515 |
💛 - Coveralls
Hey @kamilmysliwiec could you take a quick look at this PR? It seems to fix an issue introduced by this commit: cfe1191d225cd40a225ca98e875e15e307702e55
So no updates at the moment?
I was about to open a PR to add that resolve that is blocking the execution after an error is thrown. There's something missing in this PR so that it can be merged? This PR would fix my issue as well
@kamilmysliwiec no updates yet? because of this bug I can't use version 9 in my production...sad
+1 Please someone look into it. @kamilmysliwiec
I added the following code to my application as a workaround for now:
// created file kafka-server.ts
// HACK - This is a hack to fix the issue with the @nestjs/microservices package tracked in https://github.com/nestjs/nest/pull/10379
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
export class KafkaServer extends ServerKafka {
private combineStreamsAndThrowIfRetriable(
response$: { subscribe: (arg0: { next: (val: any) => void; error: (err: any) => void; complete: () => any }) => void },
replayStream$: { next: (arg0: any) => void; error: (arg0: any) => void; complete: () => any },
) {
return new Promise<void>((resolve, reject) => {
let isPromiseResolved = false;
response$.subscribe({
next: val => {
replayStream$.next(val);
if (!isPromiseResolved) {
isPromiseResolved = true;
resolve();
}
},
error: err => {
if (err instanceof KafkaRetriableException && !isPromiseResolved) {
isPromiseResolved = true;
reject(err);
}
replayStream$.error(err);
resolve(); // This line is needed to prevent server stall
},
complete: () => replayStream$.complete(),
});
});
}
}
// on main.ts
app.connectMicroservice<MicroserviceOptions>(
{
strategy: new KafkaServer({
client: {
brokers: environment.KAFKA_URL.map(url => `${url}:${environment.KAFKA_PORT}`),
clientId: 'logs',
},
consumer: { groupId: 'logs' },
}),
},
{ inheritAppConfig: true },
);
await app.startAllMicroservices();
Fixed in this PR https://github.com/nestjs/nest/pull/10982