Pulsar blocks vertx thread when blockIfQueueFull is true
When blockIfQueueFull is set to true for pulsar producer it might happen that when the queue limit is reached producer is blocked and it block the event loop.
Sending message should be done in different thread as it's done e.g for Kafka
Do you have a reproducer?
I guess that we would need to dispatch writes on a separate thread as we do for Kafka.
Hey @cescoffier thank you for a quick response. To reproduce we just need to enable blockIfQueueFull and produce more messages that Pulsar is able to receive.
https://quarkus.io/guides/pulsar
By default Pulsar throws an exception if the maxPendingMessages limit is exceeded. This is sometimes expected, but by default it leads to Nacks, and puts even more pressure on the service. After enabling blockIfQueueFull the event-loop is getting blocked.
I think that dispatching writes on an executor could be a right solution.
So to sum up, to reproduce the issue one needs to:
- Set relatively small
maxPendingMessages(i.e. 10) - Enable
blockIfQueueFull - Send more messages than Pulsar can write (i.e. 1000 using emitter) This will block the event loop.
We did not use a sender thread because the Pulsar producer client has the maxPendingMessages property.
The connector sender honors the maxPendingMessages and effectively applies back-pressure by not requesting more messages to process from the upstream.
So, in theory, you would not need to use blockIfQueueFull=true. As the name mentions and you've noticed it blocks the client therefore not compatible with event loop threads.
Is there a specific behavior you are looking for when enabling blockIfQueueFull?
The real issue is that you are seeing maxPendingMessages limit exceeded. Normally that should not happen.
We have ad-hoc "performance" tests for the Pulsar producer here: https://github.com/smallrye/smallrye-reactive-messaging/blob/3c2663b8e91f71251f43fa7c0e808e7add6ea644/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/perf/PerformanceProducerTest.java
It would be great if you can reproduce it with a similar test.
Thanks for the response,
The whole code is:
@ApplicationScoped
public class Relay {
public static final String INCOMING_PAGES_CHANNEL = "incoming-pages";
public static final String OUTGOING_PAGES_CHANNEL = "outgoing-pages";
public static final String INCOMING_PAGE_FRAGMENTS_CHANNEL = "incoming-page-fragments";
public static final String OUTGOING_PAGE_FRAGMENTS_CHANNEL = "outgoing-page-fragments";
public static final String INCOMING_ASSETS_CHANNEL = "incoming-assets";
public static final String OUTGOING_ASSETS_CHANNEL = "outgoing-assets";
public static final String INCOMING_WEB_RESOURCES_CHANNEL = "incoming-web-resources";
public static final String OUTGOING_WEB_RESOURCES_CHANNEL = "outgoing-web-resources";
@Inject
Logger log;
@Incoming(INCOMING_PAGES_CHANNEL)
@Outgoing(OUTGOING_PAGES_CHANNEL)
public Message<Page> relayPage(Message<Page> incoming) {
return relay(incoming);
}
@Incoming(INCOMING_PAGE_FRAGMENTS_CHANNEL)
@Outgoing(OUTGOING_PAGE_FRAGMENTS_CHANNEL)
public Message<PageFragment> relayPageFragment(Message<PageFragment> incoming) {
return relay(incoming);
}
@Incoming(INCOMING_ASSETS_CHANNEL)
@Outgoing(OUTGOING_ASSETS_CHANNEL)
public Message<Asset> relayAsset(Message<Asset> incoming) {
return relay(incoming);
}
@Incoming(INCOMING_WEB_RESOURCES_CHANNEL)
@Outgoing(OUTGOING_WEB_RESOURCES_CHANNEL)
public Message<WebResource> relayWebResource(Message<WebResource> incoming) {
return relay(incoming);
}
private <T> Message<T> relay(Message<T> incoming) {
String key = MetadataUtils.extractKey(incoming);
Action action = MetadataUtils.extractAction(incoming);
Long eventTime = MetadataUtils.extractEventTime(incoming);
Message<T> outgoing;
if (key == null || action == null || eventTime == null) {
log.trace("Skipping relaying of message without required metadata");
incoming.ack();
outgoing = null;
} else {
log.tracef("Relaying: key %s, action %s, event time %s",
key, action, eventTime);
outgoing = incoming;
}
return outgoing;
}
}
So I was wrong, there is no emitter. Does it change anything? We noticed this error under load.
The settings are:
maxPendingMessages 16
blockIfQueueFull true
We can work on a reproducible test, but not in upcoming days, as we are on a conference this week.
So I was wrong, there is no emitter. Does it change anything?
No I don't think so.
Looking at the code I see that the manual call to the ack is erroneous, but I can't be sure if that's the reason for the wrong application of the backpressure
if (key == null || action == null || eventTime == null) { log.trace("Skipping relaying of message without required metadata"); incoming.ack(); outgoing = null; } else {
Does this case actually happen or is there for safeguarding?
Because incoming.ack(); is an async operation and you need to subscribe to it, otherwise you would never call the ack of the pulsar message underneath.
If you want to continue using the Message<T> you can do the following:
private <T> Uni<Message<T>> relay(Message<T> incoming) {
String key = MetadataUtils.extractKey(incoming);
Action action = MetadataUtils.extractAction(incoming);
Long eventTime = MetadataUtils.extractEventTime(incoming);
if (key == null || action == null || eventTime == null) {
log.trace("Skipping relaying of message without required metadata");
return Uni.createFrom().completionStage(incoming::ack)
.replaceWith(() -> null);
} else {
log.tracef("Relaying: key %s, action %s, event time %s",
key, action, eventTime);
return Uni.createFrom().item(incoming);
}
}
Or you can use the simpler signatures with payload directly and inject the PulsarIncomingMessageMetadata to the incoming methods and extract key, action, eventtime from the metadata. The handling of acks, also in the case of returning null, will be handled automatically.
private <T> T relay(T incomingPayload, PulsarIncomingMessageMetadata metadata) {
String key = MetadataUtils.extractKey(metadata);
Action action = MetadataUtils.extractAction(metadata);
Long eventTime = MetadataUtils.extractEventTime(metadata);
if (key == null || action == null || eventTime == null) {
log.trace("Skipping relaying of message without required metadata");
return null;
} else {
log.tracef("Relaying: key %s, action %s, event time %s",
key, action, eventTime);
return incomingPayload;
}
}
In your processor definitions you can simply inject the metadata after the payload :
@Incoming(INCOMING_ASSETS_CHANNEL)
@Outgoing(OUTGOING_ASSETS_CHANNEL)
public Asset relayAsset(Asset incoming, PulsarIncomingMessageMetadata metadata) {
return relay(incoming, metadata);
}
It's only for safeguarding. This block of code should 'never' happen. Anyway I do not think it has something to do with incoming message acknowledging. There might be something wrong with backPressure on pulsar.
Originally we faced the issue with Pulsar producer queue full.
As you can see we have two services (2 replicas for each) that reads from 'pages' channel and tries to send messages to pulsar topic via smallrye channel.
No custom settings for maxPendingMessages nor waitForWriteCompletion have been introduced.
At first it seemed like a good idea to set blockIfQueueFull to true to wait until pulsar consume the pending messages, but as you mentioned it's not.
It looks like by default Pulsar controls the memory using MemoryLimitController https://github.com/apache/pulsar/pull/13344/files#diff-dada7e51eed6861aa3ebc13dcf1571dc0b6998f881a61d0cf4ced7e10b06628a
The ProducerQueueIsFullError error is raised, when we can't reserve a memory (ProducerImpl.java - Pulsar):
private boolean canEnqueueRequest(SendCallback callback, long sequenceId, int payloadSize) {
try {
if (conf.isBlockIfQueueFull()) {
if (semaphore.isPresent()) {
semaphore.get().acquire();
}
client.getMemoryLimitController().reserveMemory(payloadSize);
} else {
if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError(
"Producer send queue is full", sequenceId));
return false;
}
if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
semaphore.ifPresent(Semaphore::release);
callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError(
"Client memory buffer is full", sequenceId));
return false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
callback.sendComplete(new PulsarClientException(e, sequenceId));
return false;
}
return true;
}
So before we limited the maxPendingMessages and enabled blockIfQueueFull, we get errors because of the memory limit.
Some calculations:
- 6 partitions = 6000 messages that Pulsar can enqueue
- SmallRye allows to send 1000 messages (inflights, SmallRye does not take partitions into the consideration)
- 64MB memory limit (default)
- we were sending 30kb messages
I think we could reach the limit of 64MB.
The solution could be to create SenderProcessor based on MemoryLimitController?
We can access the memory limit controller:
PulsarClientImpl clientImpl = (PulsarClientImpl) client;
MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
SenderProcessor processor = new MemoryAwareSenderProcessor(memoryLimitController);
And use something like (forgive me the pseudo code):
if (memoryLimitController.isMemoryLimited()) {
semaphore.acquire();
memoryLimitController.tryReserveMemory(messageSize); // There may be more elegant way for checking it
memoryLimitController.releaseMemory(messageSize); // There may be more elegant way for checking it
sendMessage(message);
semaphore.release();
}
The takeways:
- Creating MemoryLimitController
- Another potential issue, is that if the client is not memory limited, the partitions are not taken into the consideration when calculating
inflightsinSenderProcessor.
What do you think @marekczajkowski @ozangunalp ?
Of course I may be wrong, haven't tested that yet.
Regarding the memory limit, I encountered the error while using default settings and a basic relay function. By sending several thousand messages of approximately 30KB each to a Pulsar topic, the relay began to reject messages after consuming around 1.6k messages due to insufficient memory.
The error message received was:
SRMSG19024: Failed to process a message sent to channel incoming-pages, proceeding with nack, failure is: Client memory buffer is full.
It's essential that backpressure takes into account memory constraints, not just the number of messages, as the size of individual messages can vary unpredictably.
As for the original issue, I've been making efforts to reproduce the 'queue full' error, but haven't encountered it yet.
@michalcukierman @marekczajkowski Thanks for your inputs. I'd assume the memory management would be according to the max inflight messages setting but apparently, we need to look into this a little bit deeper.
I'll spend more time next week to reproduce this issue.
I've been looking into this and couldn't find a real problem with the connector code. I've been using the EndToEndPayloadPerfTest, (it was disabled on main for a stupid reason.) Initially, it runs with 10kb payload sizes but I've increased it up to 50kb for testing.
Here are my observations:
- It is hard (and also easy) for me to trigger the
Client memory buffer is fullexception: ThePulsarContainerused in tests sets up a tmpfs for the data directory. It was there to run the tests a bit faster but didn't have a big impact at the end. However, when you are hammering the broker with big payloads it fills up the bookie disk space fast and kills the container if its a tmpfs. Just before getting killed, the broker stops acking sent messages and the client memory gets filled. - When I remove the tmpfs, it is not really possible for me to reproduce the
client memory buffer is full. For further tests, we need to set up a latency with a toxiproxy like we do for Kafka tests. Contributions are welcome! - So I continued my tests with tmpfs enabled. I pushed 30k * 50kb sized payloads and processed them with a relay (in-out). I've noticed that I filled the disk space around 7k relayed messages, which is coherent with 2Gb of tmpfs disk space (50kb * 37k = 1.85Gb).
- In this scenario the client should process and relay messages and eventually fill the bookie's disk without hitting the
Client memory buffer is full. It should stop processing more messages as it "detects" that no more message sends are getting confirmed. (ThemaxPendingMessagesinSenderProcessorworking) - I recognized that the
MemoryLimitControllerindeed works effectively to track the message payload sizes. However, it maxes out at the half of the set size, ex for 64mb (default)memoryLimitBytesmaxes out at 650 50kb messages pending for send confirms. - Initially, I thought this was because of received messages but this is the case even when using different
PulsarClientinstances for in and out channels. (Tip: Pulsar connector shares pulsar client instances between different channels if client configurations are the same. If you want to force different clients, you can set thedescriptionattribute.) - But it turns out it was because of producer batching. When producer batching is enabled the same payload counts twice in the memory limit, once for send queueing and second for the batch allocation. The batching is enabled by default with 128kb size limit, but I guess when the broker is not accepting any more sends, batches also stack up.
The conclusion is this happens when the broker cannot ingest messages at the rate your app produces them. The proper way is to adjust the send buffer, both in terms of memory and in terms of messages pending for send confirms. That way the back-pressure can be applied to slow the new message ingestion without filling the client memory buffer. Many course of actions are possible:
- You can adjust the
memoryLimitBytes(defaults to 64mb) - You can disable batching if payloads are already very big, that way batches do not count against the memory limit.
- You can adjust the
maxPendingMessages(applied by the connector to 1000) : For example 500 * 50kb stays in the 30mb limit.
Note for partitioned topics : The memory limiter is on the pulsar client and shared for all producers/consumers created with that client. Therefore for a partitioned producer the limit will be the same 64mb default.
It is not possible for the connector to apply per-partition back-pressure to the upstream. And the maxPendingMessagesAcrossPartitions config being deprecated, I've chosen to honor only the maxPendingMessages and not send more than that to the producer.
Currently, there is an issue on how the client configs are merged so when specifying maxPendingMessages you also need to specify maxPendingMessagesAcrossPartitions but it is easy to fix.
If someone is willing to contribute a MemoryLimitAwareSendProcessor, I'd be glad to review it.
Hope all this helps for your further investigation.
@ozangunalp thanks a lot for your input. Here are my thoughts on this
The bottleneck arises when setting maxPendingMessages for partitioned topics. When maxPendingMessages is configured, SmallRye applies backpressure, preventing the sending of more messages to the broker than the specified limit. However, with partitioned topics, each partition has its own producer under the hood, and each producer has its maxPendingMessages set.
The challenge lies in the fact that setting maxPendingMessages is applied individually for each producer. Consider a scenario where there are 3 partitions and the default maxPendingMessages is set to 1000. From Pulsar's perspective, brokers can accept 3 x 1000 = 3000 messages. However, SmallRye does not honor this setting. This discrepancy can be addressed by using the deprecated maxPendingMessagesAcrossPartitions setting, which should be set to the value of numOfPartitions x maxPendingMessages.
Unfortunately, the maxPendingMessagesAcrossPartitions setting has no effect in the SmallRye connector. When maxPendingMessagesAcrossPartitions is set to a lower number than numOfPartitions x maxPendingMessages, SmallRye will send messages via the producer up to the limit of maxPendingMessages. Consequently, individual producers may have a lower queue size (maxPendingMessagesAcrossPartitions / numOfPartitions) than the SmallRye sender. This mismatch can lead to errors, as SmallRye may send more messages to an individual producer than it can accept, potentially resulting in a ProducerQueueIsFullError or MemoryBufferIsFullError.
Example:
numberOfPartitions=2
maxPendingMessages=100
maxPendingMessagesAcrossPartitions=102
smallrye backpressure inflights = maxPendingMessages (100)
maxPendingMessage per individual partition producer in pulsar client = 51
smallrye can send 100 messages when producer can accept 51.
Given the dynamic nature of partition changes at runtime, I believe there are limited options available to address this issue effectively. Consequently, I suggest updating the documentation to emphasize the importance of carefully configuring maxPendingMessagesAcrossPartitions, taking into account the aforementioned scenario. Additionally, it would be beneficial to include a note in the SmallRye log messages, indicating that if encountering a ProducerQueueIsFull error, users should verify their maxMessagesAcrossPartitions settings.
As for the MemoryLimitAwareSendProcessor I will raise another issue and we are willing to contribute but it's not the highest priority at the moment
@marekczajkowski you are right, the producer queue size is somewhat related but different from the client memory full error.
Maybe the best course of action is to rename the Pulsar connector attribute maxPendingMessages to something else like max.inflight.messages like in Kafka.
Initiallly, I've named the attribute the same to be able to set the producer config together, but I was unaware of this partitioned topic issue.
If we rename the connector attribute to max.inflight.messages with 1000 default, the maxPendingMessages and maxPendingMessagesAcrossPartitions can be set independently, defaulting both to 0.
WDYT?
I believe separating the backpressure configuration from the producer configuration is a beneficial approach. This way, the default mechanism can rely on SmallRye configuration, allowing producers to operate without limitations and providing room for more sophisticated custom configurations when used in conjunction with parameters like maxPendingMessages and maxPendingMessagesAcrossPartitions. Additionally, I suggest updating the documentation to include detailed explanations of these configurations.