SimpleAmqpClient icon indicating copy to clipboard operation
SimpleAmqpClient copied to clipboard

Message properties not being sent to RabbitMQ hub

Open humayunkhan opened this issue 4 years ago • 3 comments

I have used vcpkg to build rabbitmq-c as described in the https://github.com/alanxz/rabbitmq-c build instructions:

git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh
./vcpkg integrate install
./vcpkg install librabbitmq
./vcpkg install boost

Then I built https://github.com/alanxz/SimpleAmqpClient (latest release version 2.5.1)

The problem that I am facing is that the message properties like CorrelationId, Type, etc. are not being sent to RabbitMq hub. The same code was working previously before I upgraded to latest release.

Here is the code that I am using: I publish the message using the following code:

void RabbitMqNode::SendResponse(google::protobuf::Message *response, NodeEnvelope *envelope, std::map<string, string> headers)
{
	auto message = GetBasicResponseMessage(response, envelope, &headers);
	auto returnAddress = envelope->returnAddress;
	channel->BasicPublish(std::string(), returnAddress, message);
}

AmqpClient::BasicMessage::ptr_t RabbitMqNode::GetBasicResponseMessage(google::protobuf::Message *response, NodeEnvelope * envelope, std::map<string, string>* headers)
{
	auto message = GetBasicMessage(response, headers);
	message->CorrelationId(envelope->corelationId);
	return message;
}

AmqpClient::BasicMessage::ptr_t RabbitMqNode::GetBasicMessage(google::protobuf::Message *inputMessage, std::map<string, string> *headers)
{
	auto body = serializer.SerializeMessageToJson(inputMessage);
	return GetBasicMessage(inputMessage, body, headers);
}

AmqpClient::BasicMessage::ptr_t RabbitMqNode::GetBasicMessage(google::protobuf::Message *inputMessage, std::string messageBody, std::map<string, string> *headers)
{
	auto name = GetMessageName(inputMessage);
	auto message = AmqpClient::BasicMessage::Create(messageBody);
	message->DeliveryMode(AmqpClient::BasicMessage::delivery_mode_t::dm_nonpersistent);
	message->ContentType(MIME_JSON);
	message->Type(name);
	message->AppId(nodeId);
	auto headerTable = message->HeaderTable();
	InitializeHeaders(&headerTable, headers);
	message->HeaderTable(headerTable);
	message->Timestamp(utility.GetCurrentUnixTimestampMilliseconds());

	return message;
}

void RabbitMqNode::InitializeHeaders(AmqpClient::Table *headersTable, std::map<string, string> *headers)
{
	if (headers->empty())
	{
		return;
	}
	std::map<string, string>::iterator it = headers->begin();
	while (it != headers->end())
	{
		auto key = (std::string)it->first;
		auto value = it->second;

		headersTable->insert(std::pair<string, string>(key, value));

		// Increment the Iterator to point to next entry
		it++;
	}
}

Note: The body of the message and the headers are being sent without any issues.

Observation: Only the AppId property is sent correctly if I don't initialize the headers; but if I initialize headers, then except the headers no other property is sent to the hub. The body is always sent correctly; no problem with that.

Could some one please identify the mistake?

humayunkhan avatar Feb 18 '21 09:02 humayunkhan

@alanxz I think I know where the problem is. In the source code -> Channel.cpp the function "CreateAmqpProperties" seems to be handling the flags incorrectly. Example:

if (mes.TimestampIsSet()) {
    ret.timestamp = mes.Timestamp();
    ret._flags = AMQP_BASIC_TIMESTAMP_FLAG;
  }

will overwrite the previous flags which were set. So I replaced this with:

if (mes.TimestampIsSet()) {
    ret.timestamp = mes.Timestamp();
    ret._flags |= AMQP_BASIC_TIMESTAMP_FLAG;
  }

and replaced all other flags which were being overwritten. After this change I can see the properties and things seem to be working fine.

Could you please look into this and let me know if this is the correct fix or not.

Thank you.

humayunkhan avatar Feb 18 '21 15:02 humayunkhan

This appears to be fixed in https://github.com/alanxz/SimpleAmqpClient/commit/4818655450f385d7a9b68c3ebc60dc71fe42e2a3.

alanxz avatar Feb 18 '21 16:02 alanxz

That is correct. I only took the latest release. May be it is better to create a new release with this fix or at least update the release notes with this known issue.

It would save a lot of time for people who are unaware of this.

humayunkhan avatar Feb 18 '21 17:02 humayunkhan