Finer grained transaction control needed - Feature request
Hi there. Love the project and am considering using it. However, the GroupTransactSession semantics seem to be a bit limiting for concurrent processing of partitions. I have a consumer application that interacts with external systems, and it seems there is no good way to exclude a partition from a group transaction.
in the eos example given:
if err := sess.Begin(); err != nil {
// Similar to above, we only encounter errors here if
// we are not transactional or are already in a
// transaction. We should not hit this error.
die("unable to start transaction: %v", err)
}
e := kgo.AbortingFirstErrPromise(sess.Client())
fetches.EachRecord(func(r *kgo.Record) {
sess.Produce(ctx, kgo.StringRecord("eos "+string(r.Value)), e.Promise())
})
committed, err := sess.End(ctx, e.Err() == nil)
I could throw each record into a go routine for it's partitions and then wait for all of them to finish before consuming the next set of records, but if one partition is running slow, the entire batch returned in fetches will block. This seems like both a throughput and latency issue for my use case.
The shorthand of what I would like to be able to do is something like this:
fetches := client.PollFetches(ctx)
fetches.EachPartition(func (p) {
go func () {
producerClient := transactionalProducerPool.Borrow()
defer transactionalProducerPool.Release(producerClient)
producerClient.BeginTransaction()
process(p.Records)
producerClient.Flush()
producerClient.CommitOffsetsForTransaction(getHighestOffsets(p.Records))
producerClient.EndTransaction()
}()
})
The CommitOffsetsForTransaction method existed at one point as it is referenced in the documentation for EndTransaction. It seems that this method was made package private. Is there any way to export this method again (or some version of it)?
Transactions are so filled with edge cases that I'd have to think a good bit to see what would break by handing over a bit more control -- the main reason for GroupTransactSession is to hide the edge cases. I agree that the current session aspect is basically a big forced synchronization point on every session end / begin.
In your shorthand, are you also thinking that PollFetches can be called again before the goroutines return?
Note that the consuming side is half of the problem. Grabbing offsets to commit for a single partition is conceptually simple. The other half of the problem is the concurrent flushing. Rolling back on error could also be a big problem.
With producing, if one goroutine flushes and ends a transaction, the other goroutines are actually still producing, violating the requirements on flush & end. Deep in the client, if you produce while ending a transaction, some records could be produced outside of the context of a transaction. You can work around some of this by using EndBeginTxnUnsafe, but it has some caveats as described in the documentation for why it's technically unsafe.
The easier approach is to use a bunch of EOS clients in the same application, with the assumption that if you use enough EOS, then each client will get one or two partitions assigned. Then, each session truly is only working on a small set of partitions.
Those are fair points, however you can still use the GroupTransactSession and still provide the developer with the option to manage their transactions manually. The GroupTransactSession would just be using the standard AddOffsetsToTransaction of the underlying client.
In your shorthand, are you also thinking that PollFetches can be called again before the goroutines return?"
Yes, the consumer would feed a small channel per partition (this blocks when full so it provides back pressure if the consumer gets too far ahead of the processor). My current codebase creates a context.Context for every Topic/Partition assignment that gets passed along with the each record/txn. When an assignment is invalidated, the context is canceled. Anything pending in the channel for that context is dumped and any transactions would be aborted, preventing the race condition you're trying to prevent by synchronizing re-balances with consumption.
The real blocker for me on GroupTransactSession is that it is an all or nothing approach. Let's say a PollFetches receives 1000 records for a single partition. Using the current semantics, I must commit or abort all 1000 records. Let's say there's a problem with the 999th record and some retries must happen internally - processing for the entire consumer would be halted. Any other consumers further down the chain would not receive any records (as the would be using READ_COMMITTED isolation). This approach also leads to a very bursty consume/produce/consume cycle if the partitions are full of data.
The easier approach is to use a bunch of EOS clients in the same application, with the assumption that if you use enough EOS, then each client will get one or two partitions assigned. Then, each session truly is only working on a small set of partitions.
The production app I'm building for has 1000 partitions, 1000 consumers instances would be extreme unless we managed to fill them all simultaneously. But again, the issue is not so much the number of partitions, it's that we can not commit a single record (or event 2 or 100), we must commit or abort every record in the batch from the fetch.
real blocker
So, if I add the ability to mark individual records (partitions, really -- strictly advance within a partition) as available to commit, so that a session isn't all or nothing, that would largely solve your issue? I don't think it's so easy to get rid of the forced synchronization point, but it would be pretty easy to allow individual partition advancement.
I'm not sure about exposing the ability to commit manually (the timing here is where a lot of problems come in), but the client has the existing ability to mark records as ready for commit, and I could piggyback off of this feature. The feature is currently used for autocommitting, but an additional option could be added for the transact session that provides ~roughly the same behavior. wdyt?
Those are good ideas, but I'd rather the manage the transaction myself TBH, or at least have the option to. The standard consumer (non GroupTransactSession) supports transactions. What's the harm in exposing AddOffsetsToTransaction (like the confluent driver for example)? That way you can leave the GroupTransactSession semantics for whatever works best for existing users. The GroupTransactSession could just use that method on *Client (which it already is, but with a different name)
With AddOffsetsToTransaction, are you thinking this is a 1:1 mapping to the underlying TxnOffsetCommitRequest?
A problem with exposing this is that there are three different cases that should be handled by end users, and it's not clear what or why: https://github.com/twmb/franz-go/blob/2b746f5b0fcfad4f34ccc3434a6df6c9ba47375b/pkg/kgo/txn.go#L266-L291
- the request fails to be issued entirely: abort
- abortable error within an individual partition: abort
- kip-447 behavior: opt into happy path, rather than fallback to sad path that is described here: https://github.com/twmb/franz-go/blob/master/docs/transactions.md#kip-447
The Java client expects end users to start one EOS consumer/producer per consumed partition. KIP-447 eventually fixed this: https://wiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics. This behavior isn't really obvious, and this client takes the safest approach possible across all versions while avoiding the consumer-per-partition as much as possible. As the short way to describe the problem, it's possible that a rebalance will occur while you think you have a transaction, so you will doubly process records and have duplicates.
The reason I un-exported all the txn functions before stabilization is because most of how Kafka exposes transactions is a huge footgun that requires mostly undocumented knowledge to get correct. I'd be happy to expose new APIs so long as we can be sure they wont open end users to big problems around unintentional footguns. With transactions / EOS, I want to be sure that an API wont accidentally cause unexpected duplicates.
Do you have a proposal for avoiding the issues described above^?
I guess I'm still not seeing an issue with exposing the underlying Kafka request. Technically it is already exposed through the kmsg package, however the necessary data to populate the request is not exposed unfortunately (producer epoch, producer id, etc.).
The err response from AddOffsetsToTransaction is either retryable, abortable or fatal. So long as that error state is exposed to the developer, I don't see an issue.
err := cl.AddOffsetsToTransaction(ctx, offsets)
switch err {
case nil:
case err.IsRetryable():
retry()
case err.Abortable():
abort()
default:
handlFatal(err)
}
As far as synchronizing the re-balance with transactions, I'm already managing that, but I believe the issue to be tangential. For users that want to use the GroupTransactSession, you are providing the safety they need. For users that have sensitive throughput and latency considerations (like myself), GroupTransactSession does not really work as we can not run concurrent transactions. Does that mean I can't use transactions at all?
The confluent driver exposes this through SendOffsetsToTransaction() on their producer, and the examples are pretty straightforward. I really don't want to use cgo, and your consumer is much more efficient. Also, you provide the ability to migrate from an eager to cooperative balancer, which I am in need of. So, in short, your package is better in every way except for Txn control. I've even managed to write an "Incremental Cooperative Sticky" group balancer for it (slowly migrates topic partitions to new group members, rather than all at once).
If a developer wants to shoot themselves in the foot, they're going to do so...I'll probably end up forking this project to get the functionality I need but I'd really like to avoid that :) It seems like you're already performing the above logic in your code, is there a strong reason to not allow consumers of your lib to do the same?
Is SendOffsetsToTransaction synchronous? Also, I'm open to a PR so long as there's a lot of documentation on when something should or shouldn't be used, and the safety concerns around it -- EndBeginTxnUnsafe started as a proposal from a PR (and a lot of talk in discord for how to do it as safe as possible)
It is, as it would create a ProducerFencedException in java land. Been looking as the confluent implementation, hate sifting through C and I'm not all that familiar with the Kafka wire protocol.
But it sounds like the ask is to implement the equivalent of the SendOffsetsToTransaction that is part of the standard Java Producer API. I'll dig into a bit further. But it sounds like it would not interfere with anything you have going in the GroupTransactSession, especially since it would be a separate client than what is use by the consumer.
How do you feel about isolating this functionality in a separate type? TransactionalProducer for example. I noticed there is no Producer API, everything is wrapped up in the Client. Similar to the admin.Client, you could initialize the TransactionalProducer with a fully initialized Client, or with an array of Opts
IIRC the Java client is implemented such that the producer and consumer have no knowledge of each other, which actually makes safety guarantees a lot harder. Separating this into a separate type seems like overkill, I think one or two function on the existing client would be preferable, with docs indicating when/why to use, and an example. I originally made the mistake of only having ComitOffsets and CommitOffsetsSync, and those are more challenging to use than the later introduced CommitRecords and CommitUncommittedOffsets.
From what I can tell, there are multiple asks in this thread,
- Add functions that expose internal client metadata:
// ProducerID returns, loading if necessary, the current producer ID and epoch.
// This returns an error if the producer ID could not be loaded.
func (*Client) ProducerID(context.Context) (int64, int16, error)
// GroupMetadata returns the current group member ID and generation,
// or an empty string and -1 if not in the group
func (*Client) GroupMetadata() (string, int32)
// PartitionLeaderEpoch returns the given topic partition's leader epoch,
// or -1 if the topic / partition has not been loaded.
func (*Client) PartitionLeaderEpoch(string, int32) int32
This supports the use case for crafting a custom TxnOffsetCommitRequest. The safe way to use this would be to use BlockRebalanceOnPoll and then to AllowRebalance, or to manually add locks to the rebalance callbacks. A user also must send a manual AddOffsetsToTxnRequest to tie what is being consumed to what is being produced (see txn.go). I don't think GroupMetadata is a good function name, but I can't think of something better. Effectively, these three functions support the super-advanced admin use case, and all the unsafety and lack of control that comes with this.
- Different from your idea above, but I think simpler: add an option,
TxnCommitMarks.
This would modify the GroupTransactSession to only commit records that you have marked with MarkCommitRecords. This supports the behavior requested above: you can process partitions in parallel, and you can partially process each partition and commit only what you want. You can fail any individual record in that partition and just stop marking that partition.
This option replaces SendOffsetsToTransaction, which itself is a useless function because it must be paired with an ExnTxn. The GroupTransactSession.End is SendOffsetsToTransaction and EndTxn. There is no need for an individual SendOffsetsToTransaction.
re: "GroupTransactSession does not really work as we can not run concurrent transactions", this just isn't a thing with how Kafka transactions are designed internally. You can only have one transaction per transactional ID, and you cannot be producing between ending a transaction and beginning a new one. There is no safe way to concurrently process more records for a transaction while ending a transaction. The EndBeginTxnUnsafe option exists for a very specific niche use case but is more specific to the produce-only transactions use case, less so for EOS.
What do you think about the two feature proposals above?
Ping
Hi @salsorrentino! This will be my last ping on this issue before closing it. I think my proposal above addresses your needs, lmk what you think if possible. I'm probably going to implement my proposal (and if so, will leave this issue open until I implement the proposal).
Thanks for your attention on this. Decided to go a different route for managing the committed offsets. I'm working on implementing state stores so I can do Kafka streams type processing and decided to just keep a topic that manages the commit offset in a compacted topic. This way I can provide EOS by including the last successful consumer group offset inside the produce transaction for the state store itself. The official consumer group offsets will be committed using the standard process but will lag behind the state store offsets a bit (depending on auto-commit frequency). This is ok as when the state store bootstraps on a partition, I know the last processed offset and can ignore any consumed messages with a lower offset.
@twmb Feel free to close the issue unless you feel it has some worth to other users
2. Different from your idea above, but I think simpler: add an option,
TxnCommitMarks.
I think this is good option. If the consumer is configured with AutoCommitMarks() would it behave this way already?
If so, then I don't think there is an ask here at all. If not, it feels like the GroupTransactSession should respect this option, and that is the real ask.
Feel free to close the issue unless you feel it has some worth to other users
I'll leave this open and implement both proposals I have above for v1.7.0. The priority is low at the moment, though, since this is the only issue for it at the moment.
After looking into implementing it, I'm going to just expose the three new methods and add an mark behavior. The problem with partially marking what is polled is that the group transact session never rewinds. If you go to eventually commit only marked records (end the session), you have to buffer the uncommitted records because you will never receive them again. And if the problem is one partition falling behind, it'll just need to be more and more buffered within your client. Things eventually don't play out that well. Implementation wise, it gets a bit hairier than I was hoping for, and given the potentially problematic usage, I'm less interested in working it in.
I'll implement the three exposing-guts functions and call it good.
This has been released in v1.7.0