Initial support for OpenJDK CRaC snapshotting
This change intends to support an application using Vertx GRPC server to perform the Checkpoint and Restore on JVM implementing this, specifically using OpenJDK CRaC or future versions of OpenJDK. Package org.crac is a facade that either forwards the invocation to actual implementation or provides a no-op implementation.
Right now a test of the actual behaviour is not provided; without running on CRaC-enabled JVM there's nothing that would invoke the Resource methods, and making this a part of testsuite would be complicated (probably through a containerized test). If needed, I could try to put together a test not involving the actual checkpoint, that would verify that the code does not deadlock and that connections are eventually re-created.
It is not entirely clear to me what level of API publicity are the KafkaClient and Selectable interfaces and in what version could this land, or if I should do the changes only on the implementation classes.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Hi @rvansa Thank you for your first contribution to Apache Kafka! Thanks to you, today I learnt something new (CRaC).
For an effective discussion on this topic, starting a discussion thread with the community at [email protected] (preferably accompanied with a KIP)? I am asking for this because we need to dig more into the problem that this is solving for us, alternative solutions and weigh the tradeoffs of using a non-GA feature of OpenJDK (and I understand that you have tried to address this using the facade package). You can find the process of creating a KIP at https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
My initial thoughts (based on limited understanding, please correct me if I am wrong):
- What is the downside of adding checkpoint & restore to the producer threads? What expense does it add?
- Looks like we are trying to save resource by suspending the threads (on the producer) that are not actively doing anything and restoring them when we they are needed? Is that right?
Let's talk more on the mailing list.
It is not entirely clear to me what level of API publicity are the KafkaClient and Selectable interface
In general, you can find all public APIs for Kafka at https://kafka.apache.org/34/javadoc/allclasses-index.html
Sure, thanks for the pointers! I'll go through the docs and compose a proposal on the mailing list. If you don't mind I'll keep this PR open in the meantime.
What is the downside of adding checkpoint & restore to the producer threads? What expense does it add?
My take on this is that unless the checkpoint itself is performed, there shouldn't be any performance overhead (or very minimal). In case of this PR the sender performs a volatile read in the loop, which is cheap (unless contended with frequent writes). Also, usually some components that need to handle the checkpoint process need little bit of memory for tracking, but usually applications have only one or few instances of each component. On the other hand the cost of checkpoint itself can be significant as this happens in a controlled manner, sometimes even out of production environment.
Looks like we are trying to save resource by suspending the threads (on the producer) that are not actively doing anything and restoring them when we they are needed? Is that right?
The sender thread is paused, but not for saving resources but only to achieve correctness. Before performing the checkpoint we need to close all network connections, and don't want to re-create them unexpectedly, until restore. From my understanding of the code the affected components are used exclusively by the Sender thread (processing requests queues), therefore the most natural and performant option was to block it entirely, rather than trying to synchronize using locks (which would bring non-trivial overhead even without checkpoint).
Hi @Hangleton , in this PR I am not addressing the broker but a client. Since I am not that familiar with the whole project, I am following the whack-a-mole strategy; in my case I am trying to demo a C/R of Quarkus Super-Heroes example application which uses Kafka Clients to report some data to another component.
I can imagine that in case of the broker you don't see the need for frequent scaling as it would be counter-productive. In case of applications that don't hold (Kafka) state, startup in seconds is quite a long time, especially if we're talking about serverless architectures and similar. Quarkus in native mode strives for sub-second startup, with CRaC we can get to tens of milliseconds.
Hi, Radim,
Thank you for the follow-up and clarifying, I missed the fact that the targeted components are the Kafka clients.
Some of the previous statements regarding state may still be valid. Typically, a Kafka client holds cluster and topic metadata and one of its first operation on start-up, once a connection with a bootstrap broker is established, is to fetch these metadata to get an up-to-date view of the cluster (e.g. broker membership).
But, I lack the background to fully understand this approach.
@Hangleton You're right, this process should be repeated; for example in the JDK itself we flush DNS caches before checkpoint. I was hoping that the code in https://github.com/apache/kafka/pull/13619/files#diff-dcc1af531d191de8da1e23ad6d878a3efc463ba4670dbcf2896295a9dacd1c18R658 would reload the cluster view; is that not the case?
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)
If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.
hi, any development on this? we've been using the the spring kafka reactive streams and recently started testing azul crac jdk. I wonder is there any way to gracefully pause the kafka stream before checkpointing and then re-enable it after? thanks
Hi @tzvetkovg , there was not so much reaction so far from the community, so basically - we need your feedback. You can use the artifacts at https://mvnrepository.com/artifact/io.github.crac.org.apache.kafka/kafka-clients/3.3.1.CRAC.0 - kind of preview until Kafka decides to integrate things. This should be working for the sender; TBH I haven't integrated any hooks for event receiver. Your feedback is welcome.
Hi @rvansa @tzvetkovg @Hangleton ,
we use Quarkus for AWS Lambdas and have SnapStart (uses CRaC API) active, so a snapshot is created when the lambda is started and is reloaded during coldstart which makes coldstart very fast. We use smallrye-messaging and kafka connector to send messages to a kafka. After the coldstart we always get an exception that the connection is lost. The client reconnects and everything works.
Of course the connection is lost because when AWS took the snapshot there was one working connection open. When the lambda is used 2 hours later this connection does not exist anymore.
So we would appreciate this feature :) A simple close and reconnect method on a kafkaClient would work, which could be called inbeforeCheckpoint and afterRestore method.
https://github.com/quarkusio/quarkus/issues/42286
Cheers Michael
@hamburml Thanks for your use case! Have you tried the modified artifact I link above? I hope that since your concern is the sender side it might work...
No sorry, i did not. For kafka we use SmallRye Reactive Messaging which has transitive dependency for kafka clients. I haven't tried if the modified artifacts are compatible to the one used in this.
edit
Additionally I would not download artifacts from github directly for security reasons. And if I tried this would not work because the dependencies are loaded from a dependency repository in my case. Would need integrated support.
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.