automq icon indicating copy to clipboard operation
automq copied to clipboard

[BUG] Critical Memory Leak in DefaultWriter.uploadBulk0()

Open rahulmishra117 opened this issue 3 months ago • 7 comments

AutoMQ Version

1.6.0

Operating System

mac os

Installation Method

source

Hardware Configuration

No response

Other Relevant Software

No response

What Went Wrong?

Critical ByteBuf Memory Leak when an exception is thrown during bulk upload preparation.


Location:

s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/DefaultWriter.java, lines 321-384


The Problem:

private void uploadBulk0(Bulk bulk) { try { long startTime = time.nanoseconds(); List<Record> records = bulk.records; // Order by <streamId, offset> records.sort(...); // CAN THROW EXCEPTION

    long firstOffset = bulk.baseOffset;
    long nextOffset = firstOffset;
    long lastRecordOffset = nextOffset;
    CompositeByteBuf dataBuffer = ByteBufAlloc.compositeByteBuffer();  // LINE 340: ALLOCATE
    for (Record record : records) {
        record.offset = nextOffset;
        lastRecordOffset = record.offset;
        ByteBuf data = record.streamRecordBatch.encoded();
        ByteBuf header = BYTE_BUF_ALLOC.byteBuffer(RECORD_HEADER_SIZE);  // LINE 345: ALLOCATE
        header = WALUtil.generateHeader(data, header, 0, nextOffset);  // CAN THROW EXCEPTION
        nextOffset += record.size;
        dataBuffer.addComponent(true, header);
        dataBuffer.addComponent(true, data);
    }

    // Build object buffer.
    long dataLength = dataBuffer.readableBytes();
    nextOffset = ObjectUtils.ceilAlignOffset(nextOffset);  // CAN THROW EXCEPTION
    long endOffset = nextOffset;

    CompositeByteBuf objectBuffer = ByteBufAlloc.compositeByteBuffer();  // LINE 357: ALLOCATE
    WALObjectHeader header = new WALObjectHeader(...);
    objectBuffer.addComponent(true, header.marshal());  // CAN THROW EXCEPTION
    objectBuffer.addComponent(true, dataBuffer);

    // ... more code ...
    
    objectStorage.write(writeOptions, path, objectBuffer);  // LINE 368: OWNERSHIP TRANSFERRED
    
} catch (Throwable ex) {  // LINE 381
    bulk.uploadCf.completeExceptionally(ex);  // LINE 382: NO CLEANUP!
}

}


Scenarios of Memory Leak

If ANY exception occurs between lines 322-368 (before objectStorage.write() is called):

Lines 322-340: Exception during sort or before dataBuffer allocation

No leak (no buffers allocated yet) Lines 340-357: Exception during data assembly (e.g., in WALUtil.generateHeader())

LEAK: dataBuffer and all header ByteBufs added to it Lines 357-368: Exception during object buffer assembly (e.g., in header.marshal())

LEAK: Both dataBuffer AND objectBuffer plus all components After Line 368: Once objectStorage.write() is called

No leak (write() method takes ownership and releases on failure)

What Should Have Happened Instead?

| Expected | Actual (Bug) | | Exception → buffers released → no leak | Exception → buffers NOT released → LEAK | | Memory stable under failures | Memory grows with each failure | | Broker recovers from transient errors | Broker crashes with OOM after enough failures |

Steps to Reproduce

Scenario with Network Issues

Steps

1 :- Setup AutoMQ with S3/MinIO:

docker-compose up -d

2:- Create Topic and Start Producer:

./bin/kafka-topics.sh --create --topic leak-test
--bootstrap-server localhost:9092
--partitions 10 --replication-factor 1

3:- Inject Network Chaos:

Simulate intermittent S3 failures

docker exec minio killall -STOP minio # Pause MinIO briefly sleep 2 docker exec minio killall -CONT minio # Resume High-Throughput Producer during chaos:

./bin/kafka-producer-perf-test.sh
--topic leak-test
--num-records 1000000
--record-size 1024
--throughput 100000
--producer-props bootstrap.servers=localhost:9092 4:- Monitor Memory Leak:

Watch direct memory grow

watch -n 1 'jcmd $(pgrep -f kafka) VM.native_memory summary | grep -A1 "Direct"'

Expected Behavior:

When MinIO pauses, some uploadBulk0() calls fail during buffer assembly Exceptions caught but ByteBufs not released Direct memory increases steadily Eventually: OutOfDirectMemoryError crashes the broker

Additional Information

rahulmishra117 avatar Oct 11 '25 17:10 rahulmishra117

Thank you for your bug report. We will try to reproduce the issue. Please feel free to share any additional information in this issue thread.

Gezi-lzq avatar Oct 13 '25 07:10 Gezi-lzq

@rahulmishra117 In scenarios where the code has no bugs, there are some assumptions:

  1. DefaultWriter#uploadBulk0 is not expected to throw an exception and fail;
  2. objectStorage#write will keep retrying when S3 is unavailable;

For writing, there are the following Inflight memory limits:

  • ObjectWALConfig.maxUnflushedBytes = 1GiB
  • ElasticLog.APPEND_PERMIT = HEAP_MEMORY_SIZE / 6GiB * 100MiB

Could you share your KAFKA_HEAP_OPTS and broker config?

superhx avatar Oct 15 '25 02:10 superhx

Hi, below are the steps I performed that you should follow to reproduce this issue.


Memory Leak Test Flow

1. Setup

  • Start Docker containers for AutoMQ and MinIO.
  • Create a topic named leak-test with 10 partitions.

2. Monitoring

  • Start the Memory Monitor to track direct memory allocation and usage.
  • Continuously watch for memory leaks by monitoring for:
    • ByteBuf leak warnings
    • OutOfMemoryError exceptions

3. Chaos Injection (Background Thread)

  • Run a background thread that executes every N seconds:
    1. Pause MinIO using killall -STOP minio
    2. Wait for 2 seconds
    3. Resume MinIO using killall -CONT minio
  • Repeat this process throughout the test duration to simulate instability.

4. Producer Load (Main Thread)

  • Run a high-throughput producer that sends:
    • 100,000 messages per second
    • 1 KB per message
  • This load triggers frequent buffer allocations.
  • During MinIO pauses:
    • S3 upload attempts fail
    • uploadBulk0() throws exceptions
    • ByteBuf objects are not released, leading to potential memory leaks.

5. Leak Manifestation

  • Observe the direct memory growth over time:
    • T+0 min: 50–100 MB
    • T+2 min: 300–500 MB
    • T+5 min: >1 GB
    • T+X min: Application crashes with OutOfMemoryError

I also collected logs while running a script to reproduce the leak issue. Below, I’ve attached the logs obtained after reproducing the issue.

=== Mon Oct 13 22:47:50 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 35.95% 1.253GiB / 3.841GiB 32.61% 31.9kB / 57.6kB 88.6MB / 3.08MB 233 === Mon Oct 13 22:47:54 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 31.31% 1.255GiB / 3.841GiB 32.66% 33.6kB / 60.5kB 88.6MB / 3.14MB 233 === Mon Oct 13 22:47:57 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 167.68% 2.292GiB / 3.841GiB 59.67% 33.7kB / 60.7kB 88.6MB / 3.24MB 250 === Mon Oct 13 22:48:01 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 247.82% 2.914GiB / 3.841GiB 75.87% 1.2MB / 264MB 91.7MB / 85.4MB 266 === Mon Oct 13 22:48:05 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 120.60% 3.16GiB / 3.841GiB 82.26% 3.61MB / 703MB 647MB / 807MB 27 === Mon Oct 13 22:48:11 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:14 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:15 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:17 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:19 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:20 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:22 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:24 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:25 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:27 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:28 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:30 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:32 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:33 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:35 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:37 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:38 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:40 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:42 IST 2025 === CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS 650594d1fddc automq-single-server 0.00% 0B / 0B 0.00% 0B / 0B 0B / 0B 0 === Mon Oct 13 22:48:43 IST 2025 ===

22:47:50 - Test starts, normal operation 22:47:57 - First S3 chaos event + high load = LEAK BEGINS 22:48:01 - Continued chaos + buffer accumulation 22:48:05 - Memory near limit, system struggling 22:48:11 - OutOfDirectMemoryError → Container crash

This is some additional information for reproducing the bug.

rahulmishra117 avatar Oct 15 '25 17:10 rahulmishra117

Below is the screenshot showing how the size of the container increases and exited.

Image

rahulmishra117 avatar Oct 15 '25 17:10 rahulmishra117

Commands to Reproduce Memory Leak

Step 1: Start AutoMQ and MinIO

cd /Users/rahulm/AutoMQ/automq/docker
docker-compose up -d
sleep 15

Step 2: Create Test Topic

cd /Users/rahulm/AutoMQ/automq/tests
docker exec automq-single-server /opt/automq/kafka/bin/kafka-topics.sh     --create --topic leak-test     --bootstrap-server localhost:9092     --partitions 10 --replication-factor 1

Step 3: Create Chaos Injection Script

cd /Users/rahulm/AutoMQ/automq/tests
cat > chaos-docker.sh << 'EOF'
#!/bin/bash
# Alternative chaos injection using Docker pause/resume

DURATION=${1:-120}
INTERVAL=${2:-3}

echo "Starting Docker-based chaos injection for $DURATION seconds..."
END_TIME=$((SECONDS + DURATION))

while [ $SECONDS -lt $END_TIME ]; do
    echo "[$(date)] Pausing MinIO container..."
    docker pause minio 2>/dev/null || echo "MinIO pause failed"
    
    sleep 2
    
    echo "[$(date)] Resuming MinIO container..."
    docker unpause minio 2>/dev/null || echo "MinIO resume failed"
    
    sleep $INTERVAL
done

echo "Chaos injection completed"
EOF

chmod +x chaos-docker.sh

Step 4: Start Chaos Injection (Background)

cd /Users/rahulm/AutoMQ/automq/tests
./chaos-docker.sh 60 3 &

Step 5: Start Memory Monitoring (Background)

# Alternative to watch command (since watch wasn't available)
while true; do
    echo "=== $(date) ==="
    docker stats automq-single-server --no-stream
    sleep 1
done &

Step 6: Run High-Throughput Producer

cd /Users/rahulm/AutoMQ/automq/tests
docker exec automq-single-server /opt/automq/kafka/bin/kafka-producer-perf-test.sh     --topic leak-test     --num-records 1000000     --record-size 1024     --throughput 100000     --producer-props bootstrap.servers=localhost:9092         acks=1         linger.ms=100         batch.size=524288         buffer.memory=134217728         compression.type=lz4

rahulmishra117 avatar Oct 15 '25 17:10 rahulmishra117

@rahulmishra117 Hi, I tried your steps. The broker works fine. Maybe it's caused by that my docker memory limitation is 6GiB. You could try to increase your docker memory limitation.

Image Image

superhx avatar Oct 16 '25 08:10 superhx

I have implemented a comprehensive fix for this critical memory leak issue.

Pull Request: #2965

The fix addresses the ByteBuf cleanup in the uploadBulk0() exception handler and includes comprehensive test coverage to prevent regressions. This resolves the OutOfDirectMemoryError scenario described in your reproduction steps.

aaron-seq avatar Oct 17 '25 08:10 aaron-seq

Cannot reproduce

superhx avatar Dec 12 '25 08:12 superhx