kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector

Open C0urante opened this issue 2 years ago • 1 comments

Jira

Implements the new SourceConnector::exactlyOnceSupport method in the source connector used by MirrorMaker 2. Since the connector tracks offsets using the Kafka Connect framework, exactly-once support is possible. However, we require that the consumer used to read from the source cluster is configured with the read_committed isolation level, since otherwise records from aborted and uncommitted transactions would be replicated. In the future, we may relax this constraint and replicate transaction boundaries directly, which would arguably also provide exactly-once support, but we'll have to update the Java consumer library to surface transaction boundary information in order to do this.

Does not implement the new SourceConnector::canDefineTransactionBoundaries method as the default is to return ConnectorTransactionBoundaries.UNSUPPORTED, which is correct for this connector as it is incapable of defining its own transaction boundaries

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

C0urante avatar Jun 30 '22 01:06 C0urante

@OmniaGM You've been active recently with MirrorMaker 2 and seem quite familiar with it. Would you be willing to give this a review? I'd love it if someone could fact-check here and make sure that the MM2-specific details are correct.

For reference:

  • Docs for SourceConnector::exactlyOnceSupport: https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54
  • Docs and other useful info on the exactly.once.support connector property: https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java#L60-L84

C0urante avatar Jul 24 '22 05:07 C0urante

@tombentley If you have a moment, would you mind taking a look?

C0urante avatar Oct 11 '22 16:10 C0urante

@OmniaGM You've been active recently with MirrorMaker 2 and seem quite familiar with it. Would you be willing to give this a review? I'd love it if someone could fact-check here and make sure that the MM2-specific details are correct.

For reference:

* Docs for `SourceConnector::exactlyOnceSupport`: https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54

* Docs and other useful info on the `exactly.once.support` connector property: https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java#L60-L84

Hi sorry for the very late response on this. I have one question:

  • What is the side effect of enabling exactly.once.source.support=enabled and consumer.isolation.level=READ_COMMITTED on the other SourceConnectors in MM2 like MirrorHeartbeatConnector and MirrorCheckpointConnector (both have no needs to be EOS). Will these 2 SourceConnector also be EOS when we configure MirrorSourceConnector to enable EOS? If this is the case then KIP-618 need to mention this or/and update MM2 README.md as it isn't clear.

Note: At the moment any client in MM2 is configured by one of the following

  • per herder pair for example primary->backup.consumer.<CONSUMER_CONFIG>. Every consumer in this herder pair will use these consumer's configs.
  • per cluster for example primary.consumer.<CONSUMER_CONFIG>. Every consumer needed for cluster primary will use these configs.

Check connectorBaseConfig here https://github.com/apache/kafka/blob/215d4f93bd16efc8e9b7ccaa9fc99a1433a9bcfa/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L203

So, based on this implementation I believe that enabling consumer.isolation.level=READ_COMMITTED and exactly.once.source.support will enable EOS for every connector in MM2 and not only MirrorSourceConnector used for the herder pair/ or cluster. Which is not necessarily desired or needed behaviour.

OmniaGM avatar Oct 23 '22 23:10 OmniaGM

Hi @OmniaGM -- good questions! Due to KAFKA-10586, exactly-once isn't supported when running Mirror Maker 2 in dedicated mode (i.e., via bin/connect-mirror-maker.sh) and is instead only possible by deploying Mirror Maker 2 (or, specifically, one or more MirrorSourceConnector instances) onto a vanilla distributed Connect cluster.

Given that, I believe it's safe to say that the changes here should not require consideration of the heartbeat and checkpoint connectors. We may want to update the README to clarify the lack of support for exactly-once in dedicated mode, though.

Does that address your concerns?

C0urante avatar Oct 24 '22 15:10 C0urante

Can we also add an integration test for EOS in MM2 as well?

Hi @OmniaGM -- good questions! Due to KAFKA-10586, exactly-once isn't supported when running Mirror Maker 2 in dedicated mode (i.e., via bin/connect-mirror-maker.sh) and is instead only possible by deploying Mirror Maker 2 (or, specifically, one or more MirrorSourceConnector instances) onto a vanilla distributed Connect cluster.

Given that, I believe it's safe to say that the changes here should not require consideration of the heartbeat and checkpoint connectors. We may want to update the README to clarify the lack of support for exactly-once in dedicated mode, though.

Does that address your concerns?

@C0urante Yeah updating the README to clear this should be enough.

OmniaGM avatar Oct 24 '22 17:10 OmniaGM

Thanks @OmniaGM, good idea. I've updated the README and added an integration test that verifies that MM2 can still run with exactly-once support enabled.

I should note that the testReplication test case is currently failing locally with timeout issues, but succeeds when I bump the timeout. Going to see how the Jenkins build goes; we may choose to increase the timeout in these tests if they also fail during CI.

C0urante avatar Oct 27 '22 15:10 C0urante

It turns out that the testReplication flakiness persisted in Jenkins, and was not solved by increasing timeouts.

Instead, the root of the problem was a change in the Connect framework's behavior when exactly-once support is enabled. Without exactly-once support, SourceTask::commitRecord is invoked as soon as the record is ack'd by the Kafka cluster, which usually causes those calls to be spread out over time. With exactly-once support, SourceTask::commitRecord is invoked for every record in a transaction once that transaction is committed, which causes a rapid series of calls to take place one after the other.

MirrorMaker 2 triggers a (potential) offset sync after every call to commitRecord, but it has logic to prevent too many outstanding offset syncs from accruing. The exact limit on the number of outstanding offset requests is ten, which is less than the total number of topic partitions being replicated during the integration test. As a result, the test became flaky, since sometimes MM2 would drop an offset sync for partition 0 of the test-topic-1 and then fail when checking for offset syncs for that topic partition.

Since the behavior change in the Connect framework may have an impact on MirrorMaker 2 outside of testing environments, I've tweaked the offset sync limit to apply on a per-topic-partition basis. This way, if a flurry of calls to commitRecord takes place when a transaction is committed, every topic partition should still get a chance for an offset sync, but there is still an upper bound on the number of outstanding offset syncs (although that bound is now proportional to the number of topic partitions being replicated, instead of a constant).

C0urante avatar Oct 28 '22 18:10 C0urante

Thanks @OmniaGM, good idea. I've updated the README and added an integration test that verifies that MM2 can still run with exactly-once support enabled.

I should note that the testReplication test case is currently failing locally with timeout issues, but succeeds when I bump the timeout. Going to see how the Jenkins build goes; we may choose to increase the timeout in these tests if they also fail during CI.

I had similar issue with the timeouts

It turns out that the testReplication flakiness persisted in Jenkins, and was not solved by increasing timeouts.

Instead, the root of the problem was a change in the Connect framework's behavior when exactly-once support is enabled. Without exactly-once support, SourceTask::commitRecord is invoked as soon as the record is ack'd by the Kafka cluster, which usually causes those calls to be spread out over time. With exactly-once support, SourceTask::commitRecord is invoked for every record in a transaction once that transaction is committed, which causes a rapid series of calls to take place one after the other.

MirrorMaker 2 triggers a (potential) offset sync after every call to commitRecord, but it has logic to prevent too many outstanding offset syncs from accruing. The exact limit on the number of outstanding offset requests is ten, which is less than the total number of topic partitions being replicated during the integration test. As a result, the test became flaky, since sometimes MM2 would drop an offset sync for partition 0 of the test-topic-1 and then fail when checking for offset syncs for that topic partition.

Since the behavior change in the Connect framework may have an impact on MirrorMaker 2 outside of testing environments, I've tweaked the offset sync limit to apply on a per-topic-partition basis. This way, if a flurry of calls to commitRecord takes place when a transaction is committed, every topic partition should still get a chance for an offset sync, but there is still an upper bound on the number of outstanding offset syncs (although that bound is now proportional to the number of topic partitions being replicated, instead of a constant).

Good finding. Should we add logs in sendOffsetSync to make it easier to find out if MirrorMaker is hitting the limit of MAX_OUTSTANDING_OFFSET_SYNCS in the future?

OmniaGM avatar Nov 08 '22 12:11 OmniaGM

This was proposed in the original PR for MM2, but decided against: https://github.com/apache/kafka/pull/6295#discussion_r290372816

I was on the fence about adding something now that we're changing the logic. At TRACE level it's probably worth the effort; will add a log message.

C0urante avatar Nov 08 '22 17:11 C0urante

Thanks @mimaison. I've rebased and addressed your comments

C0urante avatar Dec 01 '22 16:12 C0urante

@C0urante Let's try to get this in. If you rebase I'll take another look

mimaison avatar Feb 07 '23 11:02 mimaison

@mimaison Thanks, I've rebased. I've also removed the changes to the offset sync semaphore since https://github.com/apache/kafka/pull/13181 should obviate them.

C0urante avatar Feb 07 '23 15:02 C0urante

Thanks Chris for the quick reply! I'll take a look this week.

mimaison avatar Feb 07 '23 16:02 mimaison

@mimaison I've updated the README and fixed a small bug in the integration test recently introduced in https://github.com/apache/kafka/pull/13137. Mind giving one more quick pass?

C0urante avatar Feb 10 '23 16:02 C0urante

Thanks Mickael!

C0urante avatar Feb 13 '23 15:02 C0urante