moleculer
moleculer copied to clipboard
Possible error in broker.emit(). I cannot recognize that an event has been sent to the transport successfully.
If I invoke broker.emit()
, I will never recognize that the event was sent to the transport successfully. The problem is that you return this.transit.sendEvent(ctx)
or this.Promise.all(promises)
, where promises are the results of this.transit.sendEvent(newCtx)
, in ServiceBroker.emit()
. Inside Transit.sendEvent(ctx)
, you return
return this.publish(
new Packet(P.PACKET_EVENT, ctx.endpoint ? ctx.nodeID : null, {
id: ctx.id,
event: ctx.eventName,
data: ctx.params,
groups,
broadcast: ctx.eventType == "broadcast",
meta: ctx.meta,
level: ctx.level,
tracing: ctx.tracing,
parentID: ctx.parentID,
requestID: ctx.requestID,
caller: ctx.caller,
needAck: ctx.needAck
})
).catch(
/* istanbul ignore next */ err => {
this.logger.error(`Unable to send '${ctx.eventName}' event to groups.`, err);
this.broker.broadcastLocal("$transit.error", {
error: err,
module: "transit",
type: C.FAILED_SEND_EVENT_PACKET
});
}
);
In such situation, the returned promise will never be rejected if the transporter exception has raised. I think it needs to save the result promise of publish method in a variable and return it, and apply catch to this promise too. Like
const publishResult = this.publish(
new Packet(P.PACKET_EVENT, ctx.endpoint ? ctx.nodeID : null, {
id: ctx.id,
event: ctx.eventName,
data: ctx.params,
groups,
broadcast: ctx.eventType == "broadcast",
meta: ctx.meta,
level: ctx.level,
tracing: ctx.tracing,
parentID: ctx.parentID,
requestID: ctx.requestID,
caller: ctx.caller,
needAck: ctx.needAck
})
);
publishResult.catch(
/* istanbul ignore next */ err => {
this.logger.error(`Unable to send '${ctx.eventName}' event to groups.`, err);
this.broker.broadcastLocal("$transit.error", {
error: err,
module: "transit",
type: C.FAILED_SEND_EVENT_PACKET
});
}
);
return publishResult;
It gives a possibility to recognize a problem with transport.
Hi @Vslava can you please take a look at https://github.com/moleculerjs/moleculer/issues/1065 and see if it's somehow related to the issue that you've reported?
Hi @AndreMaz It seems no. They tell that the local emit request is waiting when the code is completed. I tell that I don't get any information about the emit has a problem while it is running. For example, if the transport get down and after that I emit an event, I will get that the operation has been resolved successfully, but I see in the log that actually the event hasn't been sent to the transport and I cannot react to the problem, for example, to resend the event again.
For example, I have two files. I use RabbitMQ as a transport.
--- reciver.js ---
const { Context, ServiceBroker } = require('moleculer');
const broker = new ServiceBroker({
nodeID: 'node-1',
transporter: 'amqp://rabbitmq:5672',
logLevel: 'debug',
serializer: 'CBOR',
});
broker.createService({
name: 'test',
events: {
'created': {
handler(ctx) {
console.log('---- event is got');
}
},
}
});
broker.start()
.catch((err) => console.error('!!! Error 1 !!!', err));
--- sender.js ---
const util = require('util');
const { ServiceBroker } = require('moleculer');
const setTimeoutProm = util.promisify(setTimeout);
const broker = new ServiceBroker({
nodeID: 'node-2',
transporter: 'amqp://rabbitmq:5672',
logLevel: 'debug',
serializer: 'CBOR',
});
broker.start()
.then(async () => {
while (true) {
broker.emit('created', { aaa: 111 })
.then(() => { console.log('----- resolved'); })
.catch((err) => { console.log('----- error', err); })
await setTimeoutProm(2000);
}
})
.catch((err) => console.error('!!! Error 2 !!!', err));
I started them and after that I shut down RabbitMQ. On the screen for sender.js I got
[2022-07-25T01:45:20.971Z] INFO node-2/BROKER: Moleculer v0.14.21 is starting...
[2022-07-25T01:45:20.972Z] INFO node-2/BROKER: Namespace: <not defined>
[2022-07-25T01:45:20.972Z] INFO node-2/BROKER: Node ID: node-2
[2022-07-25T01:45:20.973Z] INFO node-2/REGISTRY: Strategy: RoundRobinStrategy
[2022-07-25T01:45:20.973Z] INFO node-2/REGISTRY: Discoverer: LocalDiscoverer
[2022-07-25T01:45:20.978Z] INFO node-2/BROKER: Serializer: CborSerializer
[2022-07-25T01:45:20.981Z] INFO node-2/BROKER: Validator: FastestValidator
[2022-07-25T01:45:20.982Z] INFO node-2/BROKER: Registered 13 middleware(s).
[2022-07-25T01:45:20.983Z] INFO node-2/BROKER: Transporter: AmqpTransporter
[2022-07-25T01:45:20.989Z] DEBUG node-2/$NODE: Service '$node' is creating...
[2022-07-25T01:45:20.989Z] DEBUG node-2/$NODE: Service '$node' created.
[2022-07-25T01:45:20.990Z] INFO node-2/TRANSIT: Connecting to the transporter...
[2022-07-25T01:45:21.023Z] INFO node-2/TRANSPORTER: AMQP is connected.
[2022-07-25T01:45:21.025Z] INFO node-2/TRANSPORTER: AMQP channel is created.
[2022-07-25T01:45:21.064Z] DEBUG node-2/BROKER: Broadcast '$services.changed' local event.
[2022-07-25T01:45:21.064Z] DEBUG node-2/BROKER: Broadcast '$node.connected' local event.
[2022-07-25T01:45:21.065Z] INFO node-2/REGISTRY: Node 'node-1' connected.
[2022-07-25T01:45:21.563Z] DEBUG node-2/BROKER: Broadcast '$transporter.connected' local event.
[2022-07-25T01:45:21.564Z] DEBUG node-2/$NODE: Service '$node' is starting...
[2022-07-25T01:45:21.569Z] INFO node-2/REGISTRY: '$node' service is registered.
[2022-07-25T01:45:21.569Z] DEBUG node-2/BROKER: Broadcast '$services.changed' local event.
[2022-07-25T01:45:21.569Z] INFO node-2/$NODE: Service '$node' started.
[2022-07-25T01:45:21.569Z] DEBUG node-2/BROKER: Broadcast '$broker.started' local event.
[2022-07-25T01:45:21.570Z] INFO node-2/BROKER: ✔ ServiceBroker with 1 service(s) started successfully in 580ms.
[2022-07-25T01:45:21.570Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:21.570Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:23.572Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:23.572Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:25.576Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:25.576Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:27.578Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:27.578Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:29.579Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:29.579Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:29.813Z] WARN node-2/TRANSPORTER: AMQP channel is closed.
[2022-07-25T01:45:29.813Z] ERROR node-2/TRANSPORTER: AMQP connection is closed.
[2022-07-25T01:45:29.814Z] WARN node-2/TRANSIT: Connection is failed. Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
[2022-07-25T01:45:29.814Z] DEBUG node-2/TRANSIT: Error: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
at Object.accept (/app/node_modules/amqplib/lib/connection.js:91:15)
at Connection.mainAccept (/app/node_modules/amqplib/lib/connection.js:64:33)
at Socket.go (/app/node_modules/amqplib/lib/connection.js:478:48)
at Socket.emit (node:events:390:28)
at emitReadable_ (node:internal/streams/readable:578:12)
at processTicksAndRejections (node:internal/process/task_queues:82:21) {
code: 320
}
[2022-07-25T01:45:31.580Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:31.580Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
[2022-07-25T01:45:31.581Z] ERROR node-2/TRANSIT: Unable to send 'created' event to groups. [BrokerDisconnectedError: The broker's transporter has disconnected. Please try again when a connection is reestablished.] { code: 502, type: 'BAD_GATEWAY', data: undefined, retryable: true }
[2022-07-25T01:45:31.582Z] DEBUG node-2/BROKER: Broadcast '$transit.error' local event.
----- resolved
[2022-07-25T01:45:33.582Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:33.583Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
[2022-07-25T01:45:33.583Z] ERROR node-2/TRANSIT: Unable to send 'created' event to groups. [BrokerDisconnectedError: The broker's transporter has disconnected. Please try again when a connection is reestablished.] { code: 502, type: 'BAD_GATEWAY', data: undefined, retryable: true }
[2022-07-25T01:45:33.583Z] DEBUG node-2/BROKER: Broadcast '$transit.error' local event.
----- resolved
As we can see, the emit got failed but it returned that it had executed successfully because the promise was resolved, not rejected. I think that the emit result promise must be rejected in such situation.
I think that this need further discussion.
The overall idea is that events are fire-and-forget, i.e., you don't get any guarantees about the delivery of the event.
I see in the log that actually the event hasn't been sent to the transport and I cannot react to the problem, for example, to resend the event again.
Even if the event is sent again (and it reaches the transporter) there's still no guarantee that it will be delivered to the consumer
. The producer
does not know if the consumer
is connected to the transporter or not. Not fully related but this kind of reminds me an old issue/RFC that we have https://github.com/moleculerjs/rfcs/issues/2
Nevertheless, there are some internal events (e.g., $transit.error
, $transporter.error
) that can be used to detect issues with the transporter.
Overall, for guaranteed event delivery https://github.com/moleculerjs/moleculer-channels might be a good solution. It was designed to provide a "secure" and persistent event system.
In any case, @Vslava @Embraser01, since this issue and the https://github.com/moleculerjs/moleculer/issues/1065 are related I would like to hear your ideas about this.
What should be the behavior for the local and remote emit()
?
@AndreMaz, this issue #1065 has a different issue. There, a local call to emit
and broadcast
occurs through await
while waiting for a resolve Promise
, so it turns to wait of executing all the code in the handler.
@Vslava the fact is that you need to separate two different systems.
Namely, the moleculer transport is a message broker that does not have a guaranteed delivery task, so it only is a publication of messages.
You can process sending messages asynchronously through events, which is the best option in node.js.
If you need message assurance, you must consider streaming systems (Kafka, JetStream, etc.) or message queuing systems (RabbitMQ, Bull, etc.).
And as Andre already said, the guarantee of delivery at least once is achieved due to the observation of consumers and ACK packets.
The implementation of ACK for events at the level of the moleculer transport protocol is redundant functionality because there is no built-in storage. We can use call
instead of events, which implements a guaranteed call with ACK.
@AndreMaz, this issue #1065 has a different issue. There, a local call to
emit
andbroadcast
occurs throughawait
while waiting for a resolvePromise
, so it turns to wait of executing all the code in the handler.
You're right @intech it's a different issue but I think that emit()
should have a consistent behavior regardless of being processed locally or remotely.
In the case of local emit()
, that waits unit the handler finishes the processing, it is possible to catch an error. On the other hand, in remote emit()
this is not possible. Ideally, the devs should expect the same behavior in both cases.
@AndreMaz @intech
The overall idea is that events are fire-and-forget, i.e., you don't get any guarantees about the delivery of the event.
I agree that there’s no guarantee that an event will be delivered because you don’t know that somebody is listening to the event or not. The event sender cannot know who is subscribed for the event.
However, a moleculer user has to be sure that an event will reach the transporter and be accepted for delivery and the event will be accepted by the consumer successfully but you don’t guarantee these all.
Now, if the transporter has a problem, the sender cannot recognize that this problem exists. broker.emit() doesn’t return any problem which it has bumped into. Such implementation doesn’t give the user any possibility to create a simple subsystem to resend events. Yes, Moleculer doesn’t have a built-in storage but it can give a possibility to the user to implement this storage on their side. You can reject the promise returned by broker.emit() if there is some problem when the event is being transmitted to the transporter. If the sender knows about the problem, the sender can react on the problem and resend the event.
As I understood, Moleculer doesn’t guarantee that the event will be accepted by the consumer successfully. If the event starts sending the event, the transporter forgets about the event. There is’t any ACK implementation which the cosumer can send to the transporter to confirm that the event has been accepted successfully. I think that not all transaporters have this feature but it can be implemented where possible.
About $transit.error, $transporter.error. How can I understand that an error was caused by the event I sent? Maybe my event was sent successfully and after that the error was happened. Especially It is important when the error is happend in the same time when I am sending the event. There is nothing inside the error except a message and a code. Maybe it is simpler to throw an exception from broker.emit() ?
About https://github.com/moleculerjs/moleculer-channels. Yes, it can be good choice but now It has a bug with RabbitMQ. It doesn’t recover connection with RabbitMQ when RabbitMQ got down and started again. I will make an issue about this. So It is not choice just now. And… with all respect but it is strange that Moleculer is a microservices framework and it has own feature for events but I need to use another module to use events instead of using native events out of box.
@Vslava as I see, your main problem is that in the catch
case we didn't throw further the original error. So if we add a throw err
into the catch
block, the error goes to the caller and you can catch it on your code as well, right?
As @icebob suggested, adding a throw
would propagate the error, allowing you to catch and react to it.
However, introducing this could be a breaking change. I'm saying could because I saw several times situation like
await ctx.emit("some.event", {data}) // <--- No catch
Throwing an error without the catch()
would cause an unhandled promise rejection, causing the process to exit. I'm not discussing if this is a good (or bad) code, I'm just saying that I saw this situation several times. So this (breaking) change can only come in v0.15
.
The workaround for you would be to overwrite the sendEvent()
method of Transit class and throw the error further.
it is strange that Moleculer is a microservices framework and it has own feature for events but I need to use another module to use events instead of using native events out of box.
I think that this statement is a little unfair. We simply can't compete against project like NATS, Kafka, RabitMQ, etc. They have big budgets and lots of people dedicated to design a message delivery solution. I want to highlight, the sole purpose of those projects is reliable delivery and nothing more. Moleculer is a little bit different as it provides more features.
If we were to implement a custom (and reliable) event system, then the discussion would be different. We would be discussing the performance of our solution against NATS, Kafka, etc. Again, we can't compete with those projects without having the same resources as they have.
To summarize, I think that our decision of creating moleculer/channels was correct. Instead of competing with the previously mentioned solutions we provide a common and easy-to-use interface to interact with them, alongside with all the benefits that they provide.
@icebob Yes. You are right.