[Bug][io] JdbcSink: potential memory leak/OOM due to lack of backpressure in internal buffering
Search before reporting
- [x] I searched in the issues and found nothing similar.
Read release policy
- [x] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
- Client library type: Java
- Client library version: 4.1.0
- Client Java version: 21
Issue Description
The JdbcAbstractSink.write method adds records to an unbounded incomingList without checking if batchSize is exceeded.
The JavaInstanceRunnable runtime pushes messages as fast as possible because write never blocks.
If the Database is slower than the consumption rate, incomingList grows indefinitely until OOM.
Proposed fix: Use wait/notify mechanism to block write when incomingList.size() >= batchSize.
Error messages
java.lang.OutOfMemoryError: Java heap space
Dumping heap to /pulsar/tmp/heapdump-%p.hprof
Reproducing the issue
- Set up a JDBC Sink with a constrained batchSize (e.g., 5) and a very high timeoutMs (e.g., 100000) to prevent time-based flushing from clearing the queue immediately.
- Use a mock or slow database connection that cannot process records instantly (or simply pause the flush thread in a debugger/test).
- Produce messages rapidly to the input topic.
Expected behavior: The sink.write() method should block once the incomingList reaches the batchSize, waiting for the flush thread to clear the batch.
Actual behavior: The pulsar function continues to consume messages regardless of the receiverQueueSize and the jdbc sink's batch size from the topic and calls sink.write. This method returns immediately for every received message. Because of this, the internal incomingList in JdbcAbstractSink grows indefinitely. If the database can't keep up the pace, it'll result in an OOM error.
Additional information
Proof of the leak:
Are you willing to submit a PR?
- [x] I'm willing to submit a PR!
@peterh-wob Thanks for a good issue report and great analysis! Looking forward to your PR.
@jiangpengcheng Could you help also track this issue ?