kafka
kafka copied to clipboard
KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector
Implements the new SourceConnector::exactlyOnceSupport
method in the source connector used by MirrorMaker 2. Since the connector tracks offsets using the Kafka Connect framework, exactly-once support is possible. However, we require that the consumer used to read from the source cluster is configured with the read_committed
isolation level, since otherwise records from aborted and uncommitted transactions would be replicated. In the future, we may relax this constraint and replicate transaction boundaries directly, which would arguably also provide exactly-once support, but we'll have to update the Java consumer library to surface transaction boundary information in order to do this.
Does not implement the new SourceConnector::canDefineTransactionBoundaries method as the default is to return ConnectorTransactionBoundaries.UNSUPPORTED
, which is correct for this connector as it is incapable of defining its own transaction boundaries
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
@OmniaGM You've been active recently with MirrorMaker 2 and seem quite familiar with it. Would you be willing to give this a review? I'd love it if someone could fact-check here and make sure that the MM2-specific details are correct.
For reference:
- Docs for
SourceConnector::exactlyOnceSupport
: https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54 - Docs and other useful info on the
exactly.once.support
connector property: https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java#L60-L84
@tombentley If you have a moment, would you mind taking a look?
@OmniaGM You've been active recently with MirrorMaker 2 and seem quite familiar with it. Would you be willing to give this a review? I'd love it if someone could fact-check here and make sure that the MM2-specific details are correct.
For reference:
* Docs for `SourceConnector::exactlyOnceSupport`: https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54 * Docs and other useful info on the `exactly.once.support` connector property: https://github.com/apache/kafka/blob/679e9e0cee67e7d3d2ece204a421ea7da31d73e9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java#L60-L84
Hi sorry for the very late response on this. I have one question:
- What is the side effect of enabling
exactly.once.source.support=enabled
andconsumer.isolation.level=READ_COMMITTED
on the otherSourceConnector
s in MM2 likeMirrorHeartbeatConnector
andMirrorCheckpointConnector
(both have no needs to be EOS). Will these 2SourceConnector
also be EOS when we configureMirrorSourceConnector
to enable EOS? If this is the case then KIP-618 need to mention this or/and update MM2 README.md as it isn't clear.
Note: At the moment any client in MM2 is configured by one of the following
- per herder pair for example
primary->backup.consumer.<CONSUMER_CONFIG>
. Every consumer in this herder pair will use these consumer's configs. - per cluster for example
primary.consumer.<CONSUMER_CONFIG>
. Every consumer needed for clusterprimary
will use these configs.
Check connectorBaseConfig
here https://github.com/apache/kafka/blob/215d4f93bd16efc8e9b7ccaa9fc99a1433a9bcfa/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L203
So, based on this implementation I believe that enabling consumer.isolation.level=READ_COMMITTED
and exactly.once.source.support
will enable EOS for every connector in MM2 and not only MirrorSourceConnector
used for the herder pair/ or cluster. Which is not necessarily desired or needed behaviour.
Hi @OmniaGM -- good questions! Due to KAFKA-10586, exactly-once isn't supported when running Mirror Maker 2 in dedicated mode (i.e., via bin/connect-mirror-maker.sh
) and is instead only possible by deploying Mirror Maker 2 (or, specifically, one or more MirrorSourceConnector
instances) onto a vanilla distributed Connect cluster.
Given that, I believe it's safe to say that the changes here should not require consideration of the heartbeat and checkpoint connectors. We may want to update the README to clarify the lack of support for exactly-once in dedicated mode, though.
Does that address your concerns?
Can we also add an integration test for EOS in MM2 as well?
Hi @OmniaGM -- good questions! Due to KAFKA-10586, exactly-once isn't supported when running Mirror Maker 2 in dedicated mode (i.e., via
bin/connect-mirror-maker.sh
) and is instead only possible by deploying Mirror Maker 2 (or, specifically, one or moreMirrorSourceConnector
instances) onto a vanilla distributed Connect cluster.Given that, I believe it's safe to say that the changes here should not require consideration of the heartbeat and checkpoint connectors. We may want to update the README to clarify the lack of support for exactly-once in dedicated mode, though.
Does that address your concerns?
@C0urante Yeah updating the README to clear this should be enough.
Thanks @OmniaGM, good idea. I've updated the README and added an integration test that verifies that MM2 can still run with exactly-once support enabled.
I should note that the testReplication
test case is currently failing locally with timeout issues, but succeeds when I bump the timeout. Going to see how the Jenkins build goes; we may choose to increase the timeout in these tests if they also fail during CI.
It turns out that the testReplication
flakiness persisted in Jenkins, and was not solved by increasing timeouts.
Instead, the root of the problem was a change in the Connect framework's behavior when exactly-once support is enabled. Without exactly-once support, SourceTask::commitRecord
is invoked as soon as the record is ack'd by the Kafka cluster, which usually causes those calls to be spread out over time. With exactly-once support, SourceTask::commitRecord
is invoked for every record in a transaction once that transaction is committed, which causes a rapid series of calls to take place one after the other.
MirrorMaker 2 triggers a (potential) offset sync after every call to commitRecord
, but it has logic to prevent too many outstanding offset syncs from accruing. The exact limit on the number of outstanding offset requests is ten, which is less than the total number of topic partitions being replicated during the integration test. As a result, the test became flaky, since sometimes MM2 would drop an offset sync for partition 0 of the test-topic-1
and then fail when checking for offset syncs for that topic partition.
Since the behavior change in the Connect framework may have an impact on MirrorMaker 2 outside of testing environments, I've tweaked the offset sync limit to apply on a per-topic-partition basis. This way, if a flurry of calls to commitRecord
takes place when a transaction is committed, every topic partition should still get a chance for an offset sync, but there is still an upper bound on the number of outstanding offset syncs (although that bound is now proportional to the number of topic partitions being replicated, instead of a constant).
Thanks @OmniaGM, good idea. I've updated the README and added an integration test that verifies that MM2 can still run with exactly-once support enabled.
I should note that the
testReplication
test case is currently failing locally with timeout issues, but succeeds when I bump the timeout. Going to see how the Jenkins build goes; we may choose to increase the timeout in these tests if they also fail during CI.
I had similar issue with the timeouts
It turns out that the
testReplication
flakiness persisted in Jenkins, and was not solved by increasing timeouts.Instead, the root of the problem was a change in the Connect framework's behavior when exactly-once support is enabled. Without exactly-once support,
SourceTask::commitRecord
is invoked as soon as the record is ack'd by the Kafka cluster, which usually causes those calls to be spread out over time. With exactly-once support,SourceTask::commitRecord
is invoked for every record in a transaction once that transaction is committed, which causes a rapid series of calls to take place one after the other.MirrorMaker 2 triggers a (potential) offset sync after every call to
commitRecord
, but it has logic to prevent too many outstanding offset syncs from accruing. The exact limit on the number of outstanding offset requests is ten, which is less than the total number of topic partitions being replicated during the integration test. As a result, the test became flaky, since sometimes MM2 would drop an offset sync for partition 0 of thetest-topic-1
and then fail when checking for offset syncs for that topic partition.Since the behavior change in the Connect framework may have an impact on MirrorMaker 2 outside of testing environments, I've tweaked the offset sync limit to apply on a per-topic-partition basis. This way, if a flurry of calls to
commitRecord
takes place when a transaction is committed, every topic partition should still get a chance for an offset sync, but there is still an upper bound on the number of outstanding offset syncs (although that bound is now proportional to the number of topic partitions being replicated, instead of a constant).
Good finding. Should we add logs in sendOffsetSync
to make it easier to find out if MirrorMaker is hitting the limit of MAX_OUTSTANDING_OFFSET_SYNCS
in the future?
This was proposed in the original PR for MM2, but decided against: https://github.com/apache/kafka/pull/6295#discussion_r290372816
I was on the fence about adding something now that we're changing the logic. At TRACE
level it's probably worth the effort; will add a log message.
Thanks @mimaison. I've rebased and addressed your comments
@C0urante Let's try to get this in. If you rebase I'll take another look
@mimaison Thanks, I've rebased. I've also removed the changes to the offset sync semaphore since https://github.com/apache/kafka/pull/13181 should obviate them.
Thanks Chris for the quick reply! I'll take a look this week.
@mimaison I've updated the README and fixed a small bug in the integration test recently introduced in https://github.com/apache/kafka/pull/13137. Mind giving one more quick pass?
Thanks Mickael!