rabbitmq-dotnet-client
rabbitmq-dotnet-client copied to clipboard
Reduce memory usage by using the body directly instead of copying in BasicPublishAsync
Proposed Changes
This PR adds the ability to use the body in BasicPublishAsync
directly when sending it to RabbitMQ, instead of copying it to a temporary byte-array.
Technical information
Currently BasicPublishAsync
rents an array from the ArrayPool, with the frame + body length:
https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/3794b4c49e0f672f31b9871ec833afa1fa56ffab/projects/RabbitMQ.Client/client/impl/Frame.cs#L173-L178
Then the body is being copied to this array.
https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/3794b4c49e0f672f31b9871ec833afa1fa56ffab/projects/RabbitMQ.Client/client/impl/Frame.cs#L183-L188
In our application, we're sometimes forwarding large bodies to RabbitMQ. The problem is that the ArrayPool<byte>.Shared
is using a separate buckets in each thread, causing a lot of new byte arrays to be generated.
In addition, the BasicPublishAsync
writes the rented array in a channel:
https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/3794b4c49e0f672f31b9871ec833afa1fa56ffab/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs#L308-L309
Which is later written to the pipe:
https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/3794b4c49e0f672f31b9871ec833afa1fa56ffab/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs#L333-L342
If RabbitMQ is has a slow response rate or we're trying to send a lot of data at once, the channel can fill up quickly. Causing it to hold 128 rented arrays.
New API
ConnectionFactory
In the factory there is a new property called CopyBodyToMemoryThreshold
:
ConnectionFactory factory = new ConnectionFactory
{
CopyBodyToMemoryThreshold = 4096
}
When this value is set, every data that is larger than the provided value (in this instance, 4096) will be used directly instead of being copied to a new array. If the data is smaller than the provided value, the body is still copied to a new array.
When we use the data directly, we cannot send the data in the background like we're currently doing. We have to wait until the bytes are written to the pipe: otherwise, the application can modify the buffer. So, whenever we're using the buffer directly, the BasicPublishAsync
wait until the bytes are written to the pipe.
This solution is a middle ground: smaller bodies are sent in the background but allocating, larger bodies are sent directly but are blocking.
The default value of CopyBodyToMemoryThreshold is int.MaxValue
, so there is no breaking change.
IChannel
BasicPublishAsync has a new parameter: bool? copyBody = null
:
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
When the value is:
-
true
: the body is copied to a new array and sent in the background. -
false
: the body is written directly but blocking. -
null
: the length is being compared to the configCopyBodyToMemoryThreshold
in the connection
There are two new overloads that allows ReadOnlySequence<byte>
to be used as body:
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body = default, bool mandatory = false, bool? copyBody = null)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
This way you can use System.IO.Pipelines
without allocating to a temporary ReadOnlyMemory
.
Types of Changes
What types of changes does your code introduce to this project?
- [ ] Bug fix (non-breaking change which fixes issue #NNNN)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause an observable behavior change in existing systems)
- [ ] Documentation improvements (corrections, new content, etc)
- [ ] Cosmetic change (whitespace, formatting, etc)
Checklist
- [x] I have read the
CONTRIBUTING.md
document - [x] I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
- [ ] All tests pass locally with my changes
Some tests timed out. - [x] I have added tests that prove my fix is effective or that my feature works
- [ ] I have added necessary documentation (if appropriate)
I'll wait until this API is approved/changes have been made. - [x] ~~Any dependent changes have been merged and published in related repositories~~
Further Comments
- Maybe the naming of the properties and/or parameters are confusing and have to be renamed.
- I tried my best to make unit tests to check if "CopyBodyToMemoryThreshold" works as indented but doing so I had to make some stuff internal instead of private. I also had to track the number of allocating bytes in the connection. Is this OK?