nestjs-google-pubsub-microservice
nestjs-google-pubsub-microservice copied to clipboard
Bug: PubSub client is not connected: topic undefined does not exist
I'm experiencing an awkward bug. I've two microservices with the same MessageService class
export class MessageService implements OnApplicationShutdown, MessageServiceInterface {
private client: GCPubSubClient;
private readonly projectId: string;
private readonly apiEndpoint: string;
constructor(private config: ConfigService) {
this.projectId = this.config.get('PUBSUB_PROJECT_ID') ?? this.config.getOrThrow('GCP_PROJECT_ID');
this.apiEndpoint = this.config.getOrThrow('PUBSUB_HOST');
}
createClient(topic: string, subscription?: string): GCPubSubClient {
return (this.client = new GCPubSubClient({
init: false,
checkExistence: true,
noAck: false,
topic: topic,
subscription: subscription,
publisher: {
batching: { maxMessages: 10 },
},
client: {
projectId: this.projectId,
apiEndpoint: this.apiEndpoint,
},
}));
}
public onApplicationShutdown(): Promise<void> {
return this.client.close();
}
}
And then I'm using it in one on my controllers in the next way
await firstValueFrom(
this.messageService
.createClient(this.paymentsCalculatorSuccLevTopic)
.emit(this.paymentsCalculatorSuccLevPattern, data),
)
In one of the services it's working perfectly but in the other I get all the time the next error:
PubSub client is not connected: topic undefined does not exist
I've confirmed that the topic is there, and that the connection is done and all the configuration is working. Also, if I call the PubSub functions like the example above it works with no problem
await this.messageService
.createClient()
.topic(this.paymentsCalculatorSuccLevTopic)
.publishMessage({
json: {
pattern: this.paymentsCalculatorSuccLevPattern,
data,
},
});
Here is a log of the client
GCPubSubClient {
routingMap: Map(0) {},
options: {
init: false,
checkExistence: true,
noAck: false,
topic: 'payment_calculator_suc_lev_test',
subscription: undefined,
publisher: { batching: [Object] },
client: { projectId: 'emulator', apiEndpoint: '127.0.0.1:8085' }
},
logger: Logger { context: 'GCPubSubClient', options: {} },
client: null,
replySubscription: null,
topic: null,
clientConfig: { projectId: 'emulator', apiEndpoint: '127.0.0.1:8085' },
scopedEnvKey: '',
topicName: 'payment_calculator_suc_lev_test',
subscriberConfig: {
topic: Topic {
getSubscriptionsStream: [Function (anonymous)],
name: 'projects/emulator/topics/payment_calculator_update_test',
publisher: [Publisher],
pubsub: [PubSub],
parent: [PubSub],
request: [Function: bound request],
iam: [IAM],
metadata: [Object]
}
},
publisherConfig: { batching: { maxMessages: 10 } },
replyTopicName: 'undefined',
replySubscriptionName: 'undefined',
noAck: false,
init: false,
useAttributes: false,
checkExistence: true,
serializer: IdentitySerializer {},
deserializer: IncomingResponseDeserializer {}
}
The log from the other microservice looks the same but just with different topic and subscription.
Do you know what could be happening?
Hi, please check that this.paymentsCalculatorSuccLevTopic
is defined when you create a client createClient(this.paymentsCalculatorSuccLevTopic)
. Seems like in one case it passes undefined
value
That variable is defined in the constructor of the controller and with a getOrThrow. This is the entire controller
import { Controller, Logger } from '@nestjs/common';
import { Ctx, EventPattern, Payload } from '@nestjs/microservices';
import { CalculationResponse, CalculationsService } from './calculations.service';
import { GCPubSubContext } from 'nestjs-google-pubsub-microservice';
import * as process from 'process';
import { MessageService } from '../message/message.service';
import { ConfigService } from '@nestjs/config';
import { Calculation } from '../common/interfaces/payment-strategy.interface';
import { firstValueFrom } from 'rxjs';
@Controller('calculations')
export class CalculationsController {
private readonly paymentsCalculatorSuccLevPattern;
private readonly paymentsCalculatorSuccLevTopic;
constructor(
private calculationService: CalculationsService,
private logger: Logger,
private messageService: MessageService,
private readonly config: ConfigService,
) {
this.paymentsCalculatorSuccLevPattern = this.config.getOrThrow('PAYMENT_CALCULATOR_SUCCESS_LEV_PATTERN');
this.paymentsCalculatorSuccLevTopic = this.config.getOrThrow('PAYMENT_CALCULATOR_SUCCESS_LEV_TOPIC');
}
@EventPattern(process.env.PAYMENT_CALCULATOR_RESULT_PATTERN)
async publish(@Payload() rawInput: Calculation, @Ctx() context: GCPubSubContext): Promise<void> {
const originalMsg = context.getMessage();
originalMsg.ack();
this.logger.log(`Event received: ${JSON.stringify(rawInput)}`, 'Calculation Controller');
const data: CalculationResponse | void = await this.calculationService.createCalculation(rawInput);
if (data && !data.error && data.calculationId) {
try {
await firstValueFrom(
this.messageService
.createClient(this.paymentsCalculatorSuccLevTopic)
.emit(this.paymentsCalculatorSuccLevPattern, data),
);
this.logger.log(
`Event emitted with pattern: ${this.paymentsCalculatorSuccLevPattern} and payload: ${JSON.stringify(data)}`,
'Calculation Controller',
);
} catch (e) {
this.logger.log(
{
context: 'Calculation Controller',
message: `Error emitting event.`,
service: 'Calculation Controller',
component: 'controller',
action: 'event-emited',
pattern: this.paymentsCalculatorSuccLevPattern,
},
'Payment Controller',
);
}
}
}
}
Hi, any comment on this?
Hi, here is the PR fixing the issue