decaton
decaton copied to clipboard
Embed task metadata in record header
Motivation
- Currently, DecatonClient serializes tasks in
DecatonTaskRequest
protobuf format because when Decaton had started, Kafka didn't have record header yet- As Kafka started record header support quite long ago, it's natural to use it to embed task metadata
Summary of changes
protocol
- Rename
DecatonTaskRequest
protobuf message toLegacyDecatonTaskRequest
and marked it as@Deprecated
- Add
TaskMetadataHeaders
which is responsible for read/write TaskMetadata from/to Headers
client
- Add new
DecatonClient
which produces tasks as record value directly and embed metadata in headers and make it default - Add new method
DecatonClientBuilder#buildLegacyClient
, that instantiates old decaton client impl which produces tasks inDecatonTaskRequest
format- for backward compatibility
processor
- Change the signature of
TaskExtractor#extract
to receiveConsumerRecord
instead ofbyte[]
, to allow users to access record header when extracting a task - Make
DefaultTaskExtractor
to be able to parse both formats of tasks,DecatonTaskRequest
protobuf and header-based tasks- for compatibility when rolling-upgrading decaton client application
- Introduce
CONFIG_UPGRADE_FROM
processor property, which denotes the previous version when upgrading decaton processor (analogous to Kafka Streams'supgrade.from
property ref) -
DecatonTaskRetryQueueingProcessor
switches decaton task request format based onCONFIG_UPGRADED_FROM
- If it is
V0_X_X
=> produce inDecatonTaskRequest
format same as current - otherwise => produce in header-based format
- This switch is necessary to prevent producing retry tasks in new header-based format when rolling-upgrading decaton processor application, which is unparseable by old decaton processor version (0.x.x)
- If it is
- Thus, upgrading process will be like below:
- upgrade decaton processor by setting
CONFIG_UPGRADE_FROM
toV0_X_X
- one more restart by unsetting
CONFIG_UPGRADE_FROM
- decaton processor starts producing retry tasks in header-based format
- upgrade decaton client
- upgrade decaton processor by setting
Want discussion
- As you noticed, current design enforces users to restart processor twice, and pay attention not to upgrade decaton-client first which could produce unparseable tasks
- Besides, introducing
CONFIG_UPGRADE_FROM
looks bit awkward
- Besides, introducing
- Isn't the upgrade procedure too complicated and error-prone?
This PR is still WIP. I'd like to complete it after we agreed about overall design.
Thanks for starting this 👍 Read your proposal and can agree with most of the parts.
Some comments inline:
Rename DecatonTaskRequest protobuf message to LegacyDecatonTaskRequest and marked it as @Deprecated
While I agree with that we'll eventually deprecate it, do we need to rename it as a step before removing?
Add new method DecatonClientBuilder#buildLegacyClient, that instantiates old decaton client impl which produces tasks in DecatonTaskRequest format
Also wrt rename to LegacyDecatonClient
, I'm not sure how is it meaningful to keep the legacy implementation if we're going to break backward API compatibility anyway.
I think we should discuss and take either one of following ways rather than attempting to do both?
- Move to the new way immediately, breaking any necessary API compatibility and release 2.0
- Preserve all backward compatibility and introduce some interfaces for using the new way, persuading users to migrate gracefully
Introduce CONFIG_UPGRADE_FROM processor property, which denotes the previous version when upgrading decaton processor (analogous to Kafka Streams's upgrade.from property ref)
Can't we distinguish it based on presence of the metadata header field in a given record?
- Rolling restart processors to new version which supports both types
- During the rolling restart, there will be no new task produced in new protocol because all the source tasks are in older format (
DecatonTaskRequest
) - After completing to upgrade all processors, restart producers with newer
DecatonClient
so it will start producing tasks with newer format. However all processor instances are already capable of handling it.
While I agree with that we'll eventually deprecate it, do we need to rename it as a step before removing?
Good point. Seems we don't have to rename it, just add deprecation annotation (to let users know the protocol will be deprecated eventually) would be fine.
Also wrt rename to LegacyDecatonClient, I'm not sure how is it meaningful to keep the legacy implementation
I thought we have to keep legacy impl, because processor
needs both implementation for graceful upgrade.
... but I realized that DecatonRetryQueueingProcessor
is using raw DecatonTaskProducer
rather than DecatonClient
, so seems we don't need to keep both impl.
Then "Move to the new way immediately" sounds better for simplicity. (If we preserve all API compatibility and providing different method to use new way (like DecatonClientBuilder#enableHeaderMeta()
or any), users have to change the code twice (1. #enableHeaderMeta
when upgrade to 1.2.0 and 2. delete #enableHeaderMeta
because it's now default when upgrade to 2.0), which sounds bothersome)
By the way maybe bit off-topic, will adding a method to TaskExtractor
be breaking change ?
For now, TaskExtractor
is "effectively FunctionalInterface" because it has only one method, so we often use lambda.
If we add a method to TaskExtractor
we have to rewrite such codes.
Can't we distinguish it based on presence of the metadata header field in a given record?
The problem is when we use TaskExtractor
with retry feature.
When we consume non-Decaton topic, source metadata header is always null so there's no trigger to switch to new format.
Or should the upgrade path be like below?
- Publish Decaton 2.0, which can understand both formats, but produces retry tasks in new format only when metadata header is not null (or always use old format)
- Publish Decaton 2.1, which always produces retry tasks in new format
- Decaton < 2.0 users must upgrade to 2.0 first
- Publish Decaton 3.0, which removes old format-related code completely
By the way maybe bit off-topic, will adding a method to TaskExtractor be breaking change ?
Not sure if we should really avoid that, but we might possibly workaround that by:
default DecatonTask<T> extract(ConsumerRecord<String, byte[]> record) {
return extract(record.value());
}
The problem is when we use TaskExtractor with retry feature.
That's true, good point.
Or should the upgrade path be like below?
No, I think the current approach suites the most I think. Maybe we can guide two ways - 1 is to shutdown all processrs at once and one is for graceful rolling restart. Some users might be okay to stop their processors for short duration and prefers the simpler way.
Can you rebase onto latest master branch then? I'll start looking into detail then.
Rebased and reworked based on discussions so far. (Still I didn't fix tests yet, since I think there's point of argue about overall direction)
Then, I came up with thoughts about two more points:
- It's necessary to "synthesize"
ConsumerRecord
, considering when a record is produced by retry-queueing processor < 2.x- https://github.com/line/decaton/pull/80/files#diff-8cac54935d1a5169c71e281024e7449fceab06a8f7a57e4439bec2f35d8a6a17R84-R96
- I'm not sure if such synthesizing is valid, because in principle,
ConsumerRecord
denotes a single message whichKafkaConsumer
receives - Possible solution is defining another data type (like
DecatonRecord
?) which holds subset ofConsumerRecord
- Or make
TaskExtractor#extract
signature to be likeextract(Headers headers, byte[] taskBytes)
- Or make
- Once we start embedding task metadata in record header, decaton-common's Serializer/Deserializer will be effectively same as kafka-clients's Serializer/Deserializer
- Shouldn't we deprecate that as well? Or better to do in another PR?