[fix][broker] Fix multiple transfer corruption issues when TLS is enabled
Fixes #22601 #21892 #19460
Motivation
In Pulsar, there are multiple reported issues where the transferred output gets corrupted and fails with exceptions around invalid reader and writer index. One source of these issues are the ones which occur only when TLS is enabled.
I found these Netty issues that provide a lot of context:
- https://github.com/netty/netty/issues/6184
- https://github.com/netty/netty/issues/2761
- https://github.com/netty/netty/issues/1865
- https://github.com/netty/netty/commit/a74149e9848fef73d909dd56843e0fbab23f877d
- https://github.com/netty/netty/issues/1801
- https://github.com/netty/netty/issues/1797
- https://github.com/netty/netty/commit/6f79291d5bf7c515549234ed93dc00b84ce008c9
- https://github.com/netty/netty/issues/1925
It seems that this is a long time issue in Netty and it has been partially fixed. However, it's not fixed for many locations in the Netty code base and it's not safe to share ByteBuf instances in all cases.
In Pulsar, the sharing of ByteBuf instance happens in this case at least via the broker cache (RangeEntryCacheManagerImpl) and the pending reads manager (PendingReadsManager).
The SslHandler related issue was originally reported in Pulsar in 2018 with #2401 . The fix that time was #2464.
The ByteBuf .copy() method was used to copy the ByteBuf. The problem with this change is that .copy() itself isn't thread safe and accesses the internalNioBuffer instance directly.
This happens at least when the ByteBuf instance contains a ReadOnlyByteBufferBuf wrapper. This can be seen in the code https://github.com/netty/netty/blob/243de91df2e9a9bf0ad938f54f76063c14ba6e3d/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBufferBuf.java#L412-L433 .
As a result of this, exceptions such as these ones occur:
java.lang.IllegalArgumentException: newPosition > limit: (2094 > 88)
at java.base/java.nio.Buffer.createPositionException(Buffer.java:341)
at java.base/java.nio.Buffer.position(Buffer.java:316)
at java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1516)
at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:185)
at io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113)
at io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431)
at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210)
at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194)
at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
java.nio.BufferUnderflowException
at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:183)
at io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113)
at io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431)
at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210)
at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194)
at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
It is likely that Failed to peek sticky key from the message metadata java.lang.IllegalArgumentException: Invalid unknonwn tag type: 4 exceptions are also caused by the same root cause.
java.lang.IndexOutOfBoundsException: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536)) type of exceptions on the broker side are possibly caused by the same root cause as well.
The root cause of such exceptions could also be different. A shared Netty ByteBuf must have at least have an independent view created with duplicate, slice or retainedDuplicate if the readerIndex is mutated.
The ByteBuf instance must also be properly shared in a thread safe way. Failing to do that could result in similar symptoms and this PR doesn't fix that.
Modifications
- Remove the ByteBufPair.CopyingEncoder and make the ByteBufPair.Encoder suitable for both use cases
- A read-only ByteBuf needs to be passed to SslHandler so that it doesn't get mutated. A deep copy isn't required. This solution is also more performant since there will be less memory copies.
- Workaround an IntelliJ issue where it shows a red mark in PulsarChannelInitializer classes (casting to ChannelHandler fixes the issue).
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
On Netty side, PooledByteBuf.getBytes was made thread safe by making a copy of the internalByteBuffer with https://github.com/netty/netty/pull/9120 changes. However this change is not reflected in the ReadOnlyByteBufferBuf implementation. (this doesn't have an impact on this PR, just sharing the observation)
Issues reported to Netty:
- https://github.com/netty/netty/issues/14069
- https://github.com/netty/netty/issues/14068
Marking this draft until feedback is received to this comment: https://github.com/netty/netty/issues/14069#issuecomment-2125067746
I'll add background to the PR description to help reviewers understand it quickly.
@dao-jun Please share that as a comment instead. This is my PR and not yours. :) I'll update the description based on the feedback.
Just add additional context to help understand it quickly.
CopyingEncoder is used to handle the case of SslHandler. Because SslHandler could compose ByteBufs to one(The first input ByteBuf).
For instance, we write buf1,buf2,buf3 to the handler, it will write buf2,buf3 to buf1. If we are enable EntryCache, this will pollute the entries in the EntryCache. So we use buf.copy to make a new instance and copy the bytes to handle the case.
However, buf.copy is not thread safe, multi thread copying may lead to data corruption.
The solution is pass a ReadOnlyByteBuf to SslHandler to disable ByteBuf compose(write buf2,buf3 to buf1), although netty considered the input ByteBuf can be not writable, but it seems there are some problems with the code:
private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
final int inReadableBytes = next.readableBytes();
final int cumulationCapacity = cumulation.capacity();
if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
// Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
// Only copy if there is enough space available and the capacity is large enough, and attempt to
// resize if the capacity is small.
(cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
cumulationCapacity < wrapDataSize &&
ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
cumulation.writeBytes(next);
next.release();
return true;
}
return false;
}
it should return false immediately if cumulation.isWritable(inReadableBytes)== false or isReadOnly == true
Waiting the fix PR https://github.com/netty/netty/pull/14071 merged.
Yes, of cause, my bad, since I didn't explain myself correctly.
Yes, of cause, my bad, since I didn't explain myself correctly.
Thanks for the useful summary @dao-jun. I'll revisit the description of this PR later once this comes to a conclusion.
it should return false immediately if
cumulation.isWritable(inReadableBytes)== falseorisReadOnly == trueWaiting the fix PR netty/netty#14071 merged.
there might be a workaround (explained in https://github.com/apache/pulsar/pull/22760#discussion_r1610605439). will test that.
I think I'll need to add a repro test to the Pulsar code base. While testing the recent changes, I can see that the problem occurs even when there's the read only wrapper.
There are also other bugs in this area.
When TLS is enabled between Broker and Bookies, the Bookkeeper V3 protocol is used. In the Bookkeeper client, there's a bug related reference counts when V3 protocol is used. The PR to fix that issue is https://github.com/apache/bookkeeper/pull/4293 .
Closing this PR since the transfer corruption issues will be fixed by changes in Netty 4.1.111.Final (https://github.com/netty/netty/pull/14072, https://github.com/netty/netty/pull/14071, https://github.com/netty/netty/pull/14076 and https://github.com/netty/netty/pull/14078) and Bookkeeper 4.16.6 (https://github.com/apache/bookkeeper/pull/4289 and https://github.com/apache/bookkeeper/pull/4293).
Fix in Bookkeeper is https://github.com/apache/bookkeeper/pull/4404
UPDATE: This PR has been replaced by #22810 .