kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

1088: Add connection ttl config

Open Bradleywboggs opened this issue 4 years ago • 4 comments
trafficstars

Problem

Original issue: https://github.com/confluentinc/kafka-connect-jdbc/issues/1088

The current implementation of the source and sink connectors maintains a single database connection indefinitely across all connectors and tasks, only cycling if the connection becomes unhealthy. However, in some circumstances, long-running connections can eat up too many resources and eventually crash the db server, resulting in outage, data corruption, or data loss.

Solution

Overview

  • Whenever CachedConnnectionProvider::getConnection is called, check if the connectionTTL is expired, and if so, cycle the connection.
  • During long polling intervals (source) or when no new messages are available (sink), check if connection is expired, if so, close it until there is more work to do.

Details

Change to Config

  • Add a config property to the source and sink connectors: connection.ttl.ms. If this property is not present, default to the current infinite connection behavior (the ttl value for an infinite connection is -1)

Changes to CachedConnectionProvider

  • Pass connectionTTL to the CachedConnnectionProvider constructor from: JdbcSourceConnector, JdbcSourceTask, JdbcDBWriter .
  • Add additional internal property to the CachedConnectionProvider: connectionStartTs which is set with the current timestamp in millis whenever a connection object is successfully returned from provider.getConnection().
  • Add connectionIsExpired method to the CachedConnectionProvider class, which checks the connectionTTL against the current time and the connectionStartTs.
  • Add a check for connectionIsExpired in the CachedConnectionProvider::getConnection() method. If false cycle the connection.
Change to JdbcSourceTask
  • Add a check for connectionProvider.connectionIsExpired prior to time.sleep() in poll method, and then connectionProvider.close() if it's past its ttl.
Change to JdbcSinkTask and JdbcWriter
  • When JdbcSinkTask::put has empty records, call new method JdbcWriter::handleIdling which checks for an expired connection and closes it if it's expired.
Does this solution apply anywhere else?
  • [ ] yes
  • [x] no

Test Strategy

Testing done:
  • [x] Unit tests
  • [x] Integration tests
  • [x] System tests
  • [x] Manual tests

Release Plan

This would be a part of a new minor version release, as it adds functionality but is completely backwards compatible with the existing interface and behavior.

This would be a merge to master.

This doesn't require a revert or roll-back.

Bradleywboggs avatar Oct 21 '21 02:10 Bradleywboggs

Connect logs demonstrating connection ttl functionality: image

image

Bradleywboggs avatar Oct 21 '21 19:10 Bradleywboggs

Database query results demonstrating connections are being cycled as expected: image

Bradleywboggs avatar Oct 21 '21 19:10 Bradleywboggs

@ncliang @ddasarathan @Cyril-Engels, @rhauch, @C0urante or @gharris1727, I’d love a review of this PR when you get a chance! This functionality is necessary for continued use of this connector at work, due to my company’s database connection standard practices.

Bradleywboggs avatar Oct 23 '21 14:10 Bradleywboggs

@ncliang Do you have any additional pointers on trying to get this to work as I need, based on my config above? Otherwise, you can feel free to close the PR if you don't think this is right for the project, and I'll just operate from my fork for now. Thanks again!!

Bradleywboggs avatar Nov 16 '21 18:11 Bradleywboggs

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

cla-assistant[bot] avatar Aug 01 '23 23:08 cla-assistant[bot]