connectors
connectors copied to clipboard
rabbitmq inbound connector publish message with wrong name
Describe the Bug
For some process instances, the rabbimq inbound connector - intermediate catch event - publishes a message with the wrong name, the correlation key seems to be correct, however the wrong message name prevents the message to be correlated.
Steps to Reproduce
- create a process with more than 1 rabbitmq connector - intermediate catch event
- start the bpmn, wait for the token to reach the inbound connector, and push the message to the queue
Expected Behavior
Message should be correlated based on message name and correlation key
Environment
- OS: macos
- Library version: connector docker image v0.23.2
- Camunda version: 8.2.5
Test process
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0o5p122" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.13.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.2.0">
<bpmn:process id="Process_onboarding" isExecutable="true">
<bpmn:extensionElements />
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_0m0flnh</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0m0flnh" sourceRef="StartEvent_1" targetRef="Gateway_0533885" />
<bpmn:endEvent id="Event_05x97di">
<bpmn:incoming>Flow_1ilduf1</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_14b6eot" sourceRef="Event_0k5c5rz" targetRef="Activity_1q6qehk" />
<bpmn:intermediateCatchEvent id="Event_0k5c5rz" name="Save success" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1" zeebe:modelerTemplateVersion="3" zeebe:modelerTemplateIcon="data:image/svg+xml;utf8,%3Csvg xmlns='http://www.w3.org/2000/svg' width='18' height='18' viewBox='-7.5 0 271 271' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M245.44 108.308h-85.09a7.738 7.738 0 0 1-7.735-7.734v-88.68C152.615 5.327 147.29 0 140.726 0h-30.375c-6.568 0-11.89 5.327-11.89 11.894v88.143c0 4.573-3.697 8.29-8.27 8.31l-27.885.133c-4.612.025-8.359-3.717-8.35-8.325l.173-88.241C54.144 5.337 48.817 0 42.24 0H11.89C5.321 0 0 5.327 0 11.894V260.21c0 5.834 4.726 10.56 10.555 10.56H245.44c5.834 0 10.56-4.726 10.56-10.56V118.868c0-5.834-4.726-10.56-10.56-10.56zm-39.902 93.233c0 7.645-6.198 13.844-13.843 13.844H167.69c-7.646 0-13.844-6.199-13.844-13.844v-24.005c0-7.646 6.198-13.844 13.844-13.844h24.005c7.645 0 13.843 6.198 13.843 13.844v24.005z' fill='%23F60'/%3E%3C/svg%3E">
<bpmn:extensionElements>
<zeebe:properties>
<zeebe:property name="inbound.type" value="io.camunda:connector-rabbitmq-inbound:1" />
<zeebe:property name="authentication.authType" value="uri" />
<zeebe:property name="authentication.uri" value="amqp://admin:admin@rabbitmq:5672/devcon" />
<zeebe:property name="queueName" value="ack" />
<zeebe:property name="consumerTag" value="" />
<zeebe:property name="arguments" value="" />
<zeebe:property name="exclusive" value="false" />
<zeebe:property name="correlationKeyExpression" value="=string(message.body.ssn) + "-" + string(message.body.status)" />
</zeebe:properties>
</bpmn:extensionElements>
<bpmn:incoming>Flow_1n9ceqm</bpmn:incoming>
<bpmn:outgoing>Flow_14b6eot</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_1ha0hu7" messageRef="Message_1ydgiu3" />
</bpmn:intermediateCatchEvent>
<bpmn:sequenceFlow id="Flow_1n9ceqm" sourceRef="Gateway_0533885" targetRef="Event_0k5c5rz" />
<bpmn:eventBasedGateway id="Gateway_0533885">
<bpmn:incoming>Flow_0m0flnh</bpmn:incoming>
<bpmn:outgoing>Flow_1n9ceqm</bpmn:outgoing>
<bpmn:outgoing>Flow_1hv7n7i</bpmn:outgoing>
<bpmn:outgoing>Flow_12z54lh</bpmn:outgoing>
</bpmn:eventBasedGateway>
<bpmn:sequenceFlow id="Flow_1hv7n7i" sourceRef="Gateway_0533885" targetRef="Event_1jzlt5e" />
<bpmn:intermediateCatchEvent id="Event_1jzlt5e" name="Save failed" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1" zeebe:modelerTemplateVersion="3" zeebe:modelerTemplateIcon="data:image/svg+xml;utf8,%3Csvg xmlns='http://www.w3.org/2000/svg' width='18' height='18' viewBox='-7.5 0 271 271' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M245.44 108.308h-85.09a7.738 7.738 0 0 1-7.735-7.734v-88.68C152.615 5.327 147.29 0 140.726 0h-30.375c-6.568 0-11.89 5.327-11.89 11.894v88.143c0 4.573-3.697 8.29-8.27 8.31l-27.885.133c-4.612.025-8.359-3.717-8.35-8.325l.173-88.241C54.144 5.337 48.817 0 42.24 0H11.89C5.321 0 0 5.327 0 11.894V260.21c0 5.834 4.726 10.56 10.555 10.56H245.44c5.834 0 10.56-4.726 10.56-10.56V118.868c0-5.834-4.726-10.56-10.56-10.56zm-39.902 93.233c0 7.645-6.198 13.844-13.843 13.844H167.69c-7.646 0-13.844-6.199-13.844-13.844v-24.005c0-7.646 6.198-13.844 13.844-13.844h24.005c7.645 0 13.843 6.198 13.843 13.844v24.005z' fill='%23F60'/%3E%3C/svg%3E">
<bpmn:extensionElements>
<zeebe:properties>
<zeebe:property name="inbound.type" value="io.camunda:connector-rabbitmq-inbound:1" />
<zeebe:property name="authentication.authType" value="uri" />
<zeebe:property name="authentication.uri" value="amqp://admin:admin@rabbitmq:5672/devcon" />
<zeebe:property name="queueName" value="ack" />
<zeebe:property name="consumerTag" value="" />
<zeebe:property name="arguments" value="" />
<zeebe:property name="exclusive" value="false" />
<zeebe:property name="correlationKeyExpression" value="=string(message.body.ssn) + "-" + string(message.body.status)" />
</zeebe:properties>
</bpmn:extensionElements>
<bpmn:incoming>Flow_1hv7n7i</bpmn:incoming>
<bpmn:outgoing>Flow_0fz3r38</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_01dgsqw" messageRef="Message_0fuk94b" />
</bpmn:intermediateCatchEvent>
<bpmn:endEvent id="Event_1jdze4c">
<bpmn:incoming>Flow_0ptaz4j</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0fz3r38" sourceRef="Event_1jzlt5e" targetRef="Activity_0o7tai8" />
<bpmn:sequenceFlow id="Flow_0ptaz4j" sourceRef="Activity_0o7tai8" targetRef="Event_1jdze4c" />
<bpmn:sequenceFlow id="Flow_1ilduf1" sourceRef="Activity_1q6qehk" targetRef="Event_05x97di" />
<bpmn:sendTask id="Activity_1q6qehk" name="Start backoffice process">
<bpmn:extensionElements>
<zeebe:taskDefinition type="start-backoffice-process" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_14b6eot</bpmn:incoming>
<bpmn:outgoing>Flow_1ilduf1</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:sequenceFlow id="Flow_12z54lh" sourceRef="Gateway_0533885" targetRef="Event_0d5gade" />
<bpmn:intermediateCatchEvent id="Event_0d5gade" name="Already registered" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1" zeebe:modelerTemplateVersion="3" zeebe:modelerTemplateIcon="data:image/svg+xml;utf8,%3Csvg xmlns='http://www.w3.org/2000/svg' width='18' height='18' viewBox='-7.5 0 271 271' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M245.44 108.308h-85.09a7.738 7.738 0 0 1-7.735-7.734v-88.68C152.615 5.327 147.29 0 140.726 0h-30.375c-6.568 0-11.89 5.327-11.89 11.894v88.143c0 4.573-3.697 8.29-8.27 8.31l-27.885.133c-4.612.025-8.359-3.717-8.35-8.325l.173-88.241C54.144 5.337 48.817 0 42.24 0H11.89C5.321 0 0 5.327 0 11.894V260.21c0 5.834 4.726 10.56 10.555 10.56H245.44c5.834 0 10.56-4.726 10.56-10.56V118.868c0-5.834-4.726-10.56-10.56-10.56zm-39.902 93.233c0 7.645-6.198 13.844-13.843 13.844H167.69c-7.646 0-13.844-6.199-13.844-13.844v-24.005c0-7.646 6.198-13.844 13.844-13.844h24.005c7.645 0 13.843 6.198 13.843 13.844v24.005z' fill='%23F60'/%3E%3C/svg%3E">
<bpmn:extensionElements>
<zeebe:properties>
<zeebe:property name="inbound.type" value="io.camunda:connector-rabbitmq-inbound:1" />
<zeebe:property name="authentication.authType" value="uri" />
<zeebe:property name="authentication.uri" value="amqp://admin:admin@rabbitmq:5672/devcon" />
<zeebe:property name="queueName" value="ack" />
<zeebe:property name="consumerTag" value="" />
<zeebe:property name="arguments" value="" />
<zeebe:property name="exclusive" value="false" />
<zeebe:property name="correlationKeyExpression" value="=string(message.body.ssn) + "-" + string(message.body.status)" />
</zeebe:properties>
</bpmn:extensionElements>
<bpmn:incoming>Flow_12z54lh</bpmn:incoming>
<bpmn:outgoing>Flow_0bmmnn0</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_16dzqa3" messageRef="Message_0juxewj" />
</bpmn:intermediateCatchEvent>
<bpmn:endEvent id="Event_0si2p4a">
<bpmn:incoming>Flow_0bmmnn0</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0bmmnn0" sourceRef="Event_0d5gade" targetRef="Event_0si2p4a" />
<bpmn:serviceTask id="Activity_0o7tai8" name="Send email to CSR" zeebe:modelerTemplate="io.camunda.connectors.RabbitMQ.v1" zeebe:modelerTemplateVersion="1" zeebe:modelerTemplateIcon="data:image/svg+xml;utf8,%3Csvg xmlns='http://www.w3.org/2000/svg' width='18' height='18' viewBox='-7.5 0 271 271' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M245.44 108.308h-85.09a7.738 7.738 0 0 1-7.735-7.734v-88.68C152.615 5.327 147.29 0 140.726 0h-30.375c-6.568 0-11.89 5.327-11.89 11.894v88.143c0 4.573-3.697 8.29-8.27 8.31l-27.885.133c-4.612.025-8.359-3.717-8.35-8.325l.173-88.241C54.144 5.337 48.817 0 42.24 0H11.89C5.321 0 0 5.327 0 11.894V260.21c0 5.834 4.726 10.56 10.555 10.56H245.44c5.834 0 10.56-4.726 10.56-10.56V118.868c0-5.834-4.726-10.56-10.56-10.56zm-39.902 93.233c0 7.645-6.198 13.844-13.843 13.844H167.69c-7.646 0-13.844-6.199-13.844-13.844v-24.005c0-7.646 6.198-13.844 13.844-13.844h24.005c7.645 0 13.843 6.198 13.843 13.844v24.005z' fill='%23F60'/%3E%3C/svg%3E">
<bpmn:extensionElements>
<zeebe:taskDefinition type="io.camunda:connector-rabbitmq:1" />
<zeebe:ioMapping>
<zeebe:input source="uri" target="authentication.authType" />
<zeebe:input source="amqp://admin:admin@rabbitmq:5672/devcon" target="authentication.uri" />
<zeebe:input source="customer" target="routing.exchange" />
<zeebe:input source="mail" target="routing.routingKey" />
<zeebe:input source="={ "message": "saved failed for customer " + customer.ssn, "to": "[email protected]" }" target="message.body" />
<zeebe:input source="{}" target="message.properties" />
</zeebe:ioMapping>
<zeebe:taskHeaders />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0fz3r38</bpmn:incoming>
<bpmn:outgoing>Flow_0ptaz4j</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmn:message id="Message_1ydgiu3" name="39fa5a4a-e86c-45cf-b406-b2b8d19fa2d3" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=string(customer.ssn) + "-success"" />
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="Message_0fuk94b" name="77cd64b1-d37f-4583-89ae-6a14c712d907" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=string(customer.ssn) + "-failure"" />
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="Message_0juxewj" name="292bcfdb-4f85-46f7-955e-d04fe87438de" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=string(customer.ssn) + "-existent"" />
</bpmn:extensionElements>
</bpmn:message>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_onboarding">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="152" y="99" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_05x97di_di" bpmnElement="Event_05x97di">
<dc:Bounds x="1642" y="99" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_01wkmyk_di" bpmnElement="Event_0k5c5rz">
<dc:Bounds x="1312" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="1297" y="142" width="68" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_1pazu5r_di" bpmnElement="Gateway_0533885">
<dc:Bounds x="1205" y="92" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_134roqx_di" bpmnElement="Event_1jzlt5e">
<dc:Bounds x="1312" y="202" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="1303" y="245" width="55" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1jdze4c_di" bpmnElement="Event_1jdze4c">
<dc:Bounds x="1642" y="202" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1gvilfb_di" bpmnElement="Activity_1q6qehk">
<dc:Bounds x="1440" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_049f47u_di" bpmnElement="Event_0d5gade">
<dc:Bounds x="1312" y="312" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="1287" y="355" width="90" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0si2p4a_di" bpmnElement="Event_0si2p4a">
<dc:Bounds x="1642" y="312" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1jkwpvu_di" bpmnElement="Activity_0o7tai8">
<dc:Bounds x="1440" y="180" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0m0flnh_di" bpmnElement="Flow_0m0flnh">
<di:waypoint x="188" y="117" />
<di:waypoint x="1205" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_14b6eot_di" bpmnElement="Flow_14b6eot">
<di:waypoint x="1348" y="117" />
<di:waypoint x="1440" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1n9ceqm_di" bpmnElement="Flow_1n9ceqm">
<di:waypoint x="1255" y="117" />
<di:waypoint x="1312" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1hv7n7i_di" bpmnElement="Flow_1hv7n7i">
<di:waypoint x="1230" y="142" />
<di:waypoint x="1230" y="220" />
<di:waypoint x="1312" y="220" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0fz3r38_di" bpmnElement="Flow_0fz3r38">
<di:waypoint x="1348" y="220" />
<di:waypoint x="1440" y="220" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0ptaz4j_di" bpmnElement="Flow_0ptaz4j">
<di:waypoint x="1540" y="220" />
<di:waypoint x="1642" y="220" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1ilduf1_di" bpmnElement="Flow_1ilduf1">
<di:waypoint x="1540" y="117" />
<di:waypoint x="1642" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_12z54lh_di" bpmnElement="Flow_12z54lh">
<di:waypoint x="1230" y="142" />
<di:waypoint x="1230" y="330" />
<di:waypoint x="1312" y="330" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0bmmnn0_di" bpmnElement="Flow_0bmmnn0">
<di:waypoint x="1348" y="330" />
<di:waypoint x="1642" y="330" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
ES message Working
{
"_index": "zeebe-record_message_8.2.5_2023-10-24",
"_type": "_doc",
"_id": "1-714",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"_routing": "1",
"found": true,
"_source": {
"partitionId": 1,
"value": {
"name": "39fa5a4a-e86c-45cf-b406-b2b8d19fa2d3",
"timeToLive": 3600000,
"messageId": "",
"deadline": 1698152680347,
"variables": {},
"correlationKey": "1-success"
},
"key": 2251799813685601,
"timestamp": 1698149080349,
"valueType": "MESSAGE",
"brokerVersion": "8.2.5",
"recordType": "EVENT",
"sourceRecordPosition": 713,
"intent": "PUBLISHED",
"rejectionType": "NULL_VAL",
"rejectionReason": "",
"position": 714,
"sequence": 2251799813685249
}
}
Wrong message name - the message name belongs to a different catch event
{
"_index": "zeebe-record_message_8.2.5_2023-10-24",
"_type": "_doc",
"_id": "1-992",
"_version": 1,
"_seq_no": 2,
"_primary_term": 1,
"_routing": "1",
"found": true,
"_source": {
"partitionId": 1,
"value": {
"name": "77cd64b1-d37f-4583-89ae-6a14c712d907",
"timeToLive": 3600000,
"messageId": "",
"deadline": 1698152716088,
"variables": {},
"correlationKey": "2-success"
},
"key": 2251799813685719,
"timestamp": 1698149116088,
"valueType": "MESSAGE",
"brokerVersion": "8.2.5",
"recordType": "EVENT",
"sourceRecordPosition": 991,
"intent": "PUBLISHED",
"rejectionType": "NULL_VAL",
"rejectionReason": "",
"position": 992,
"sequence": 2251799813685251
}
}