decaton icon indicating copy to clipboard operation
decaton copied to clipboard

Embed task metadata in record header

Open ocadaruma opened this issue 3 years ago • 5 comments

Motivation

  • Currently, DecatonClient serializes tasks in DecatonTaskRequest protobuf format because when Decaton had started, Kafka didn't have record header yet
    • As Kafka started record header support quite long ago, it's natural to use it to embed task metadata

Summary of changes

protocol

  • Rename DecatonTaskRequest protobuf message to LegacyDecatonTaskRequest and marked it as @Deprecated
  • Add TaskMetadataHeaders which is responsible for read/write TaskMetadata from/to Headers

client

  • Add new DecatonClient which produces tasks as record value directly and embed metadata in headers and make it default
  • Add new method DecatonClientBuilder#buildLegacyClient, that instantiates old decaton client impl which produces tasks in DecatonTaskRequest format
    • for backward compatibility

processor

  • Change the signature of TaskExtractor#extract to receive ConsumerRecord instead of byte[], to allow users to access record header when extracting a task
  • Make DefaultTaskExtractor to be able to parse both formats of tasks, DecatonTaskRequest protobuf and header-based tasks
    • for compatibility when rolling-upgrading decaton client application
  • Introduce CONFIG_UPGRADE_FROM processor property, which denotes the previous version when upgrading decaton processor (analogous to Kafka Streams's upgrade.from property ref)
  • DecatonTaskRetryQueueingProcessor switches decaton task request format based on CONFIG_UPGRADED_FROM
    • If it is V0_X_X => produce in DecatonTaskRequest format same as current
    • otherwise => produce in header-based format
    • This switch is necessary to prevent producing retry tasks in new header-based format when rolling-upgrading decaton processor application, which is unparseable by old decaton processor version (0.x.x)
  • Thus, upgrading process will be like below:
    • upgrade decaton processor by setting CONFIG_UPGRADE_FROM to V0_X_X
    • one more restart by unsetting CONFIG_UPGRADE_FROM
      • decaton processor starts producing retry tasks in header-based format
    • upgrade decaton client

Want discussion

  • As you noticed, current design enforces users to restart processor twice, and pay attention not to upgrade decaton-client first which could produce unparseable tasks
    • Besides, introducing CONFIG_UPGRADE_FROM looks bit awkward
  • Isn't the upgrade procedure too complicated and error-prone?

This PR is still WIP. I'd like to complete it after we agreed about overall design.

ocadaruma avatar Jan 09 '21 14:01 ocadaruma

Thanks for starting this 👍 Read your proposal and can agree with most of the parts.

Some comments inline:

Rename DecatonTaskRequest protobuf message to LegacyDecatonTaskRequest and marked it as @Deprecated

While I agree with that we'll eventually deprecate it, do we need to rename it as a step before removing?

Add new method DecatonClientBuilder#buildLegacyClient, that instantiates old decaton client impl which produces tasks in DecatonTaskRequest format

Also wrt rename to LegacyDecatonClient, I'm not sure how is it meaningful to keep the legacy implementation if we're going to break backward API compatibility anyway. I think we should discuss and take either one of following ways rather than attempting to do both?

  • Move to the new way immediately, breaking any necessary API compatibility and release 2.0
  • Preserve all backward compatibility and introduce some interfaces for using the new way, persuading users to migrate gracefully

Introduce CONFIG_UPGRADE_FROM processor property, which denotes the previous version when upgrading decaton processor (analogous to Kafka Streams's upgrade.from property ref)

Can't we distinguish it based on presence of the metadata header field in a given record?

  1. Rolling restart processors to new version which supports both types
  2. During the rolling restart, there will be no new task produced in new protocol because all the source tasks are in older format (DecatonTaskRequest)
  3. After completing to upgrade all processors, restart producers with newer DecatonClient so it will start producing tasks with newer format. However all processor instances are already capable of handling it.

kawamuray avatar Feb 25 '21 11:02 kawamuray

While I agree with that we'll eventually deprecate it, do we need to rename it as a step before removing?

Good point. Seems we don't have to rename it, just add deprecation annotation (to let users know the protocol will be deprecated eventually) would be fine.

Also wrt rename to LegacyDecatonClient, I'm not sure how is it meaningful to keep the legacy implementation

I thought we have to keep legacy impl, because processor needs both implementation for graceful upgrade. ... but I realized that DecatonRetryQueueingProcessor is using raw DecatonTaskProducer rather than DecatonClient, so seems we don't need to keep both impl.

Then "Move to the new way immediately" sounds better for simplicity. (If we preserve all API compatibility and providing different method to use new way (like DecatonClientBuilder#enableHeaderMeta() or any), users have to change the code twice (1. #enableHeaderMeta when upgrade to 1.2.0 and 2. delete #enableHeaderMeta because it's now default when upgrade to 2.0), which sounds bothersome)

By the way maybe bit off-topic, will adding a method to TaskExtractor be breaking change ?

For now, TaskExtractor is "effectively FunctionalInterface" because it has only one method, so we often use lambda. If we add a method to TaskExtractor we have to rewrite such codes.

Can't we distinguish it based on presence of the metadata header field in a given record?

The problem is when we use TaskExtractor with retry feature. When we consume non-Decaton topic, source metadata header is always null so there's no trigger to switch to new format.

Or should the upgrade path be like below?

  1. Publish Decaton 2.0, which can understand both formats, but produces retry tasks in new format only when metadata header is not null (or always use old format)
  2. Publish Decaton 2.1, which always produces retry tasks in new format
    • Decaton < 2.0 users must upgrade to 2.0 first
  3. Publish Decaton 3.0, which removes old format-related code completely

ocadaruma avatar Feb 25 '21 23:02 ocadaruma

By the way maybe bit off-topic, will adding a method to TaskExtractor be breaking change ?

Not sure if we should really avoid that, but we might possibly workaround that by:

    default DecatonTask<T> extract(ConsumerRecord<String, byte[]> record) {
        return extract(record.value());
    }

The problem is when we use TaskExtractor with retry feature.

That's true, good point.

Or should the upgrade path be like below?

No, I think the current approach suites the most I think. Maybe we can guide two ways - 1 is to shutdown all processrs at once and one is for graceful rolling restart. Some users might be okay to stop their processors for short duration and prefers the simpler way.

kawamuray avatar Feb 26 '21 09:02 kawamuray

Can you rebase onto latest master branch then? I'll start looking into detail then.

kawamuray avatar Feb 26 '21 09:02 kawamuray

Rebased and reworked based on discussions so far. (Still I didn't fix tests yet, since I think there's point of argue about overall direction)

Then, I came up with thoughts about two more points:

  • It's necessary to "synthesize" ConsumerRecord, considering when a record is produced by retry-queueing processor < 2.x
    • https://github.com/line/decaton/pull/80/files#diff-8cac54935d1a5169c71e281024e7449fceab06a8f7a57e4439bec2f35d8a6a17R84-R96
    • I'm not sure if such synthesizing is valid, because in principle, ConsumerRecord denotes a single message which KafkaConsumer receives
    • Possible solution is defining another data type (like DecatonRecord ?) which holds subset of ConsumerRecord
      • Or make TaskExtractor#extract signature to be like extract(Headers headers, byte[] taskBytes)
  • Once we start embedding task metadata in record header, decaton-common's Serializer/Deserializer will be effectively same as kafka-clients's Serializer/Deserializer
    • Shouldn't we deprecate that as well? Or better to do in another PR?

ocadaruma avatar Mar 02 '21 00:03 ocadaruma