risingwave
risingwave copied to clipboard
sink message of debezium json without schema field
Is your feature request related to a problem? Please describe.
when create a sink like:
CREATE SINK some_sink FROM some_view WITH (
connector = 'kafka',
properties.bootstrap.server = 'localhost:9092',
type = 'debezium',
topic = 'cloud_testees_with_station',
primary_key = 'id'
);
the output message looks like:
{
"schema": ...,
"payload": ""
}
as the "schema" part in message is very large and rarely used, I'd like to remove this field, just like configuration in debezium
schamas.enable = false
Describe the solution you'd like
a configuration in WITH to disable schema generation.
Describe alternatives you've considered
No response
Additional context
No response
I am not sure whether the debezium format definition allows it 🤔 @tabVersion can you please take a look?
I am not sure whether the debezium format definition allows it 🤔 @tabVersion can you please take a look?
yes, debezium allows this operation by specifying key.converter.schemas.enable and value.converter.schemas.enable.
I am not sure if we can do this now because the generation logic is nested into kafka sink & kinesis sink. Maybe some prior refactoring is needed.
@masterclock Hi Without the schema field, may I ask how do you handle the schema change from the upstream? Do you use schema registry?
@tabVersion Hi, is there any progress on this issue, I'd like to try and help in any way I can if I can.
@tabVersion Hi, is there any progress on this issue, I'd like to try and help in any way I can if I can.
Sorry, there's no update yet. And contribution is welcomed
I am not sure if we can do this now because the generation logic is nested into kafka sink & kinesis sink. Maybe some prior refactoring is needed.
There has been some refactor on the sink side, which made this more approachable.
~~First of all, the syntax would be (I suggest to name it disable so that default is false):~~ (See new comment below and how it should just reuse the same implementation as upsert for kafka connect compatibility.)
CREATE SINK some_sink FROM some_view WITH (connector = 'kafka', ...)
FORMAT DEBEZIUM ENCODE JSON (key.schemas.disable = true);
Then, this option can be read from format_desc.options: BTreeMap<String, String> and passed into DebeziumJsonFormatter::new
https://github.com/risingwavelabs/risingwave/blob/fb12bbaee99a91cb10f568874431d1825c3158e7/src/connector/src/sink/formatter/mod.rs#L94-L100
@tabVersion Let's make a progress on this issue.
I agree that we can provide users with this flexibility, i.e, a WITH option: schema.included, by default to be false, meaning that we don't include the schema field if not specified.
Some update: We already introduced schemas.enable for upsert json sink in #12113, which is essentially the same option from debezium because both are based on the Java class org.apache.kafka.connect.json.JsonConverter. (I was not aware of this when posting the Oct 10th comment above, and proposed a different name.) There has also been some recent fixes regarding how RisingWave types maps to these "kafka connect types" by @xuefengze.
Moving forward, I would suggest to:
- Refactor
DebeziumJsonFormatterto leverage thekafka_connectoption fromJsonEncoder. - Use
schemas.enableas the user facing name. When absent, default tofalseforupsertbuttruefordebezium. This is necessary for backward compatibility.