risingwave icon indicating copy to clipboard operation
risingwave copied to clipboard

sink message of debezium json without schema field

Open masterclock opened this issue 2 years ago • 8 comments
trafficstars

Is your feature request related to a problem? Please describe.

when create a sink like:

CREATE SINK some_sink FROM some_view WITH (
    connector = 'kafka',
    properties.bootstrap.server = 'localhost:9092',
    type = 'debezium',
    topic = 'cloud_testees_with_station',
    primary_key = 'id'
);

the output message looks like:

{
  "schema": ...,
  "payload": ""
}

as the "schema" part in message is very large and rarely used, I'd like to remove this field, just like configuration in debezium

schamas.enable = false

Describe the solution you'd like

a configuration in WITH to disable schema generation.

Describe alternatives you've considered

No response

Additional context

No response

masterclock avatar Aug 08 '23 07:08 masterclock

I am not sure whether the debezium format definition allows it 🤔 @tabVersion can you please take a look?

fuyufjh avatar Aug 08 '23 15:08 fuyufjh

I am not sure whether the debezium format definition allows it 🤔 @tabVersion can you please take a look?

yes, debezium allows this operation by specifying key.converter.schemas.enable and value.converter.schemas.enable. I am not sure if we can do this now because the generation logic is nested into kafka sink & kinesis sink. Maybe some prior refactoring is needed.

tabVersion avatar Aug 11 '23 09:08 tabVersion

@masterclock Hi Without the schema field, may I ask how do you handle the schema change from the upstream? Do you use schema registry?

neverchanje avatar Aug 15 '23 02:08 neverchanje

@tabVersion Hi, is there any progress on this issue, I'd like to try and help in any way I can if I can.

devillove084 avatar Oct 03 '23 14:10 devillove084

@tabVersion Hi, is there any progress on this issue, I'd like to try and help in any way I can if I can.

Sorry, there's no update yet. And contribution is welcomed

tabVersion avatar Oct 10 '23 14:10 tabVersion

I am not sure if we can do this now because the generation logic is nested into kafka sink & kinesis sink. Maybe some prior refactoring is needed.

There has been some refactor on the sink side, which made this more approachable.

~~First of all, the syntax would be (I suggest to name it disable so that default is false):~~ (See new comment below and how it should just reuse the same implementation as upsert for kafka connect compatibility.)

CREATE SINK some_sink FROM some_view WITH (connector = 'kafka', ...)
FORMAT DEBEZIUM ENCODE JSON (key.schemas.disable = true);

Then, this option can be read from format_desc.options: BTreeMap<String, String> and passed into DebeziumJsonFormatter::new https://github.com/risingwavelabs/risingwave/blob/fb12bbaee99a91cb10f568874431d1825c3158e7/src/connector/src/sink/formatter/mod.rs#L94-L100

xiangjinwu avatar Oct 10 '23 14:10 xiangjinwu

@tabVersion Let's make a progress on this issue.

I agree that we can provide users with this flexibility, i.e, a WITH option: schema.included, by default to be false, meaning that we don't include the schema field if not specified.

neverchanje avatar Dec 14 '23 05:12 neverchanje

Some update: We already introduced schemas.enable for upsert json sink in #12113, which is essentially the same option from debezium because both are based on the Java class org.apache.kafka.connect.json.JsonConverter. (I was not aware of this when posting the Oct 10th comment above, and proposed a different name.) There has also been some recent fixes regarding how RisingWave types maps to these "kafka connect types" by @xuefengze.

Moving forward, I would suggest to:

  • Refactor DebeziumJsonFormatter to leverage the kafka_connect option from JsonEncoder.
  • Use schemas.enable as the user facing name. When absent, default to false for upsert but true for debezium. This is necessary for backward compatibility.

xiangjinwu avatar Dec 14 '23 06:12 xiangjinwu