clickhouse-kafka-connect
clickhouse-kafka-connect copied to clipboard
Add support for Tuples & Variants
Summary
Related issue: https://github.com/ClickHouse/clickhouse-kafka-connect/issues/286
I have implemented the support for Tuple & Variant data types by adding describe_include_subcolumns = 1 setting to the DESCRIBE TABLE query.
As a result, I was able to get rid of the recursive Map parsing (since ClickHouse returns separate rows for the nested types) and unify it with the way how Tuples are now parsed.
Essentially, the Column class is no longer responsible for parsing of the complex data type (except Array, since this is not included in the DESCRIBE TABLE output). To fully parse the complex data type, a context of previous columns has to be present, which is why Table class is responsible for parsing those.
I have added TableTest, feel free to inspect it first to get an idea of how it works.
Regarding the Variant type, I needed this to support Avro's union type. In the Kafka Connect ecosystem, it is represented as a Struct, whose columns are named by the lowercase version of the type.
Please note that I've added the Lombok library as the telescoping constructors problem was too much in the Column class, so I refactored it to use the builder pattern.
Checklist
Delete items not relevant to your PR:
- [x] Unit and integration tests covering the common scenarios were added
- [x] A human-readable description of the changes was provided to include in CHANGELOG
- [x] For significant changes, documentation in https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials
- https://github.com/ClickHouse/clickhouse-docs/pull/2202
For anyone interested, I've released the zip here.
The following tests are failing due to https://github.com/ClickHouse/clickhouse-kafka-connect/issues/367:
- ClickHouseSinkTaskWithSchemaTest.schemaWithDefaultsTest
- ClickHouseSinkTaskWithSchemaTest.schemaWithDefaultsAndNullableTest
- ClickHouseSinkTaskTest.testDBTopicSplit
@jirislav Thanks for the contribution, I have disabled those tests in my latest merge tom main can you aline your brach with main and check again if the tests are still falling
@mzitnik Thank you! I have rebased my changes and adjusted the GitHub workflow defintion to run cloud tests only when the secrets are defined.
All tests are passing except this one:
ClickHouseSinkTaskWithSchemaProxyTest > schemaWithDefaultsTest() FAILED
org.opentest4j.AssertionFailedError: expected: <1000> but was: <500>
at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
at app//com.clickhouse.kafka.connect.sink.ClickHouseSinkTaskWithSchemaProxyTest.schemaWithDefaultsTest(ClickHouseSinkTaskWithSchemaProxyTest.java:283)
I have no idea what's wrong here, I haven't touched anything related (I think).
When I inspect the actual table contents, the off16 column contains only even numbers (0, 2, 4 …). Can you think of anything that could be the issue here? It fails both on my local machine and using GitHub workflows.
I have no idea what's wrong here, I haven't touched anything related (I think).
Ok, turns out it was actually my fault. During the refactor, the hasDefaults setting of the Table instance was lost. Now it's fixed and it's working again.
I'm glad you have this test.
All tests are passing now.
It turns out that adding support for Nested type is really easy, so I just did it and released it to the public here (if somebody didn't want to wait for merge):
- https://github.com/jirislav/clickhouse-kafka-connect/releases/tag/v1.1.0%2Btuple-variant-and-nested
im seeing this error:
INFO 2024-05-29T14:39:51.163593458Z java.lang.IllegalArgumentException: DESCRIBE TABLE is never supposed to return Nested type. It should always yield its Array fields directly. at com.clickhouse.kafka.connect.sink.db.mapping.Column.extractColumn(Column.java:204) at com.clickhouse.kafka.connect.sink.db.mapping.Column.extractColumn(Column.java:159) at com.clickhouse.kafka.connect.sink.db.mapping.Column.extractColumn(Column.java:155) at com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient.describeTable(ClickHouseHelperClient.java:220) at com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient.extractTablesMapping(ClickHouseHelperClient.java:248) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.updateMapping(ClickHouseWriter.java:123) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.start(ClickHouseWriter.java:103) at com.clickhouse.kafka.connect.sink.ProxySinkTask.
the logs dont include the table that caused it, so i dont have a repro yet
@dermasmid This has to be specific to the table you're using. I ran the schemaWithNestedTupleMapArrayAndVariant test on the latest ClickHouse version 24.4 without any error.
Note that we don't have the context of the table being parsed within the Column class, so the only way how you can find out what table is causing this is by enabling the DEBUG log level. You should see a preceding DESCRIBE TABLE statement pointing to the problematic table. Please run the DESCRIBE TABLE statement yourself and share the output of it here as well as the version of your ClickHouse installation.
heres the debug log:
Extracting column business_units with type Nested(bu_id String, name Nullable(String), currency Nullable(String), status String, timezone Nullable(String), tw_account_id Nullable(String), integration_ids Array(Nullable(String)), created_at Nullable(DateTime), updated_at Nullable(DateTime)) (com.clickhouse.kafka.connect.sink.db.mapping.Column) [task-thread-sonic-clickhouse-0]
clickhouse version 24.4
note, this column is not from the table i intend to write to, but because its in the database that i set, it seems like to connector is fetching it. if i moved my table of interest to a clean database, it worked without issue
i opened issue #399