kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
1088: Add connection ttl config
Problem
Original issue: https://github.com/confluentinc/kafka-connect-jdbc/issues/1088
The current implementation of the source and sink connectors maintains a single database connection indefinitely across all connectors and tasks, only cycling if the connection becomes unhealthy. However, in some circumstances, long-running connections can eat up too many resources and eventually crash the db server, resulting in outage, data corruption, or data loss.
Solution
Overview
- Whenever
CachedConnnectionProvider::getConnectionis called, check if the connectionTTL is expired, and if so, cycle the connection. - During long polling intervals (source) or when no new messages are available (sink), check if connection is expired, if so, close it until there is more work to do.
Details
Change to Config
- Add a config property to the source and sink connectors:
connection.ttl.ms. If this property is not present, default to the current infinite connection behavior (the ttl value for an infinite connection is -1)
Changes to CachedConnectionProvider
- Pass
connectionTTLto theCachedConnnectionProviderconstructor from:JdbcSourceConnector,JdbcSourceTask,JdbcDBWriter. - Add additional internal property to the
CachedConnectionProvider:connectionStartTswhich is set with the current timestamp in millis whenever a connection object is successfully returned fromprovider.getConnection(). - Add
connectionIsExpiredmethod to the CachedConnectionProvider class, which checks theconnectionTTLagainst the current time and theconnectionStartTs. - Add a check for
connectionIsExpiredin theCachedConnectionProvider::getConnection()method. Iffalsecycle the connection.
Change to JdbcSourceTask
- Add a check for
connectionProvider.connectionIsExpiredprior totime.sleep()inpollmethod, and thenconnectionProvider.close()if it's past its ttl.
Change to JdbcSinkTask and JdbcWriter
- When
JdbcSinkTask::puthas emptyrecords, call new methodJdbcWriter::handleIdlingwhich checks for an expired connection and closes it if it's expired.
Does this solution apply anywhere else?
- [ ] yes
- [x] no
Test Strategy
Testing done:
- [x] Unit tests
- [x] Integration tests
- [x] System tests
- [x] Manual tests
Release Plan
This would be a part of a new minor version release, as it adds functionality but is completely backwards compatible with the existing interface and behavior.
This would be a merge to master.
This doesn't require a revert or roll-back.
Connect logs demonstrating connection ttl functionality:


Database query results demonstrating connections are being cycled as expected:

@ncliang @ddasarathan @Cyril-Engels, @rhauch, @C0urante or @gharris1727, I’d love a review of this PR when you get a chance! This functionality is necessary for continued use of this connector at work, due to my company’s database connection standard practices.
@ncliang Do you have any additional pointers on trying to get this to work as I need, based on my config above? Otherwise, you can feel free to close the PR if you don't think this is right for the project, and I'll just operate from my fork for now. Thanks again!!
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.