clickhouse-kafka-connect icon indicating copy to clipboard operation
clickhouse-kafka-connect copied to clipboard

Support StringConverter && ByteArray value converter

Open AdisNezirovic opened this issue 1 year ago • 7 comments

I'd like to have the option to pass-through data from Kafka to ClickHouse, by specifying String or ByteArray converter. The data would go to a table with a String column.

ClickHouse could then reinterpret the data as necessary. For my specific case, the data is variable size JSON array of primitive types (i.e. tuple), and ClickHouse can use materialized view to extract structure (I'm being creative with untuple(JSONExtract(data, 'Tuple(...)').

I've played a little bit, and with something like bellow I'm able to get data through with hardcoded data column name.

public class ByteArrayRecordConvertor implements RecordConvertor {
    @Override
    public Record convert(SinkRecord sinkRecord) {
        String topic = sinkRecord.topic();
        int partition = sinkRecord.kafkaPartition().intValue();
        long offset = sinkRecord.kafkaOffset();
        Schema schema = SchemaBuilder.struct().name("StringData")
             .field("data", Schema.STRING_SCHEMA).build();
         Struct struct = new Struct(schema).put("data", sinkRecord.value().toString());
        Map<String, Data> data = StructToJsonMap.toJsonMap(struct);
        return new Record(SchemaType.SCHEMA, new OffsetContainer(topic, partition, offset), struct.schema().fields(), data, sinkRecord);
    }
}

The alternative which I'm currently using in my code is to collect/batch data and insert into ClickHouse via HTTP with 'INSERT INTO tbl FORMAT LineAsString' (which works just fine without knowing column name), but I'd really like to have other goodies available from ClickHouse connector, and be able to deploy code to AWS MSK or similar.

I've briefly considered Confluent HTTP Sink Connector, but it is not freely available.

Even some CSV support would be helpful, but I wasn't able to find anything specific in the code or docs. The current code really expects either full schema or "normal" json.

AdisNezirovic avatar Nov 29 '23 13:11 AdisNezirovic

This would be useful for inserting binary encoded formats such as Cap'n Proto and Protobuf.

ladislavmacoun avatar Nov 29 '23 19:11 ladislavmacoun

I'll definitely take a look at this, @AdisNezirovic and @ladislavmacoun !

Re: CSV - we actually do support CSVs now (it was added recently). Check out https://clickhouse.com/docs/en/integrations/kafka/clickhouse-kafka-connect-sink#string-support for the properties to enable that (assuming your files are in that format, of course).

Cheers!

Paultagoras avatar Dec 01 '23 19:12 Paultagoras

If I am not mistaken this can be achived by Kafka Connect SMTs. Here is the one of examples:

https://github.com/an0r0c/kafka-connect-transform-tojsonstring https://www.confluent.io/hub/an0r0c/kafka-connect-transform-record2jsonstring

mlivirov avatar Dec 06 '23 20:12 mlivirov

Unfortunately, the SMT you linked requires schema for input data and the output is also not optimal for ClickHouse. I really think a generic "passthrough" converter would be nice thing to have (tm) for ClickHouse connector.

AdisNezirovic avatar Dec 06 '23 21:12 AdisNezirovic

Do you happen to have a sample of the data you'd be looking to consume? And the table definition?

Paultagoras avatar Dec 06 '23 21:12 Paultagoras

I second what @AdisNezirovic said, it would be nice to have the client agnostic to the schema.

Here at the clickhouse-java I've opened an issue addressing ingesting Cap'N'Proto data into Clickhouse, it describes the insertion process, which is basically passing raw chunks of data to clickhouse, and setting the format and schema name, which is presented at the server.

 ClickHouseRequest.Mutation request = client
                    .read(server)
                    .write()
                    .table(TABLE_NAME)
                    .format(ClickHouseFormat.CapnProto)
                    .set("format_schema", "schema.capnp:Message");

The schema could be now set with the clickhouseSettings option, however, there is no option to set the binary format for example to capnp or protobuff and just send raw bytes to clickhouse.

ladislavmacoun avatar Dec 07 '23 08:12 ladislavmacoun

Sample data (variable length/type json array, first field is discriminant):

[0, 123456, "a str", 0.1, 123, 1.2, 1.3]
[1, 7890, "b str", 0.2, 456, 9.0, 8.1, "coment", 123]

Table definition

CREATE TABLE events (
   data String
) ...

we use Null table engine, since data structure is extracted with ClickHouse untuple/JSONExtract from MV

AdisNezirovic avatar Dec 07 '23 10:12 AdisNezirovic

As we support the StringConverter I'm closing this issue out, but if you're seeing that not working please let us know!

Paultagoras avatar Jun 05 '24 06:06 Paultagoras