clickhouse-kafka-connect
clickhouse-kafka-connect copied to clipboard
Support StringConverter && ByteArray value converter
I'd like to have the option to pass-through data from Kafka to ClickHouse, by specifying String or ByteArray converter. The data would go to a table with a String column.
ClickHouse could then reinterpret the data as necessary. For my specific case, the data is variable size JSON array of primitive types (i.e. tuple), and ClickHouse can use materialized view to extract structure (I'm being creative with untuple(JSONExtract(data, 'Tuple(...)')
.
I've played a little bit, and with something like bellow I'm able to get data through with hardcoded data
column name.
public class ByteArrayRecordConvertor implements RecordConvertor {
@Override
public Record convert(SinkRecord sinkRecord) {
String topic = sinkRecord.topic();
int partition = sinkRecord.kafkaPartition().intValue();
long offset = sinkRecord.kafkaOffset();
Schema schema = SchemaBuilder.struct().name("StringData")
.field("data", Schema.STRING_SCHEMA).build();
Struct struct = new Struct(schema).put("data", sinkRecord.value().toString());
Map<String, Data> data = StructToJsonMap.toJsonMap(struct);
return new Record(SchemaType.SCHEMA, new OffsetContainer(topic, partition, offset), struct.schema().fields(), data, sinkRecord);
}
}
The alternative which I'm currently using in my code is to collect/batch data and insert into ClickHouse via HTTP with 'INSERT INTO tbl FORMAT LineAsString' (which works just fine without knowing column name), but I'd really like to have other goodies available from ClickHouse connector, and be able to deploy code to AWS MSK or similar.
I've briefly considered Confluent HTTP Sink Connector, but it is not freely available.
Even some CSV support would be helpful, but I wasn't able to find anything specific in the code or docs. The current code really expects either full schema or "normal" json.
This would be useful for inserting binary encoded formats such as Cap'n Proto and Protobuf.
I'll definitely take a look at this, @AdisNezirovic and @ladislavmacoun !
Re: CSV - we actually do support CSVs now (it was added recently). Check out https://clickhouse.com/docs/en/integrations/kafka/clickhouse-kafka-connect-sink#string-support for the properties to enable that (assuming your files are in that format, of course).
Cheers!
If I am not mistaken this can be achived by Kafka Connect SMTs. Here is the one of examples:
https://github.com/an0r0c/kafka-connect-transform-tojsonstring https://www.confluent.io/hub/an0r0c/kafka-connect-transform-record2jsonstring
Unfortunately, the SMT you linked requires schema for input data and the output is also not optimal for ClickHouse. I really think a generic "passthrough" converter would be nice thing to have (tm) for ClickHouse connector.
Do you happen to have a sample of the data you'd be looking to consume? And the table definition?
I second what @AdisNezirovic said, it would be nice to have the client agnostic to the schema.
Here at the clickhouse-java I've opened an issue addressing ingesting Cap'N'Proto data into Clickhouse, it describes the insertion process, which is basically passing raw chunks of data to clickhouse, and setting the format and schema name, which is presented at the server.
ClickHouseRequest.Mutation request = client
.read(server)
.write()
.table(TABLE_NAME)
.format(ClickHouseFormat.CapnProto)
.set("format_schema", "schema.capnp:Message");
The schema could be now set with the clickhouseSettings
option, however, there is no option to set the binary format for example to capnp or protobuff and just send raw bytes to clickhouse.
Sample data (variable length/type json array, first field is discriminant):
[0, 123456, "a str", 0.1, 123, 1.2, 1.3]
[1, 7890, "b str", 0.2, 456, 9.0, 8.1, "coment", 123]
Table definition
CREATE TABLE events (
data String
) ...
we use Null table engine, since data structure is extracted with ClickHouse untuple/JSONExtract from MV
As we support the StringConverter I'm closing this issue out, but if you're seeing that not working please let us know!