clickhouse-kafka-connect icon indicating copy to clipboard operation
clickhouse-kafka-connect copied to clipboard

Add support for Tuples & Variants

Open jirislav opened this issue 1 year ago • 7 comments

Summary

Related issue: https://github.com/ClickHouse/clickhouse-kafka-connect/issues/286

I have implemented the support for Tuple & Variant data types by adding describe_include_subcolumns = 1 setting to the DESCRIBE TABLE query.

As a result, I was able to get rid of the recursive Map parsing (since ClickHouse returns separate rows for the nested types) and unify it with the way how Tuples are now parsed.

Essentially, the Column class is no longer responsible for parsing of the complex data type (except Array, since this is not included in the DESCRIBE TABLE output). To fully parse the complex data type, a context of previous columns has to be present, which is why Table class is responsible for parsing those.

I have added TableTest, feel free to inspect it first to get an idea of how it works.

Regarding the Variant type, I needed this to support Avro's union type. In the Kafka Connect ecosystem, it is represented as a Struct, whose columns are named by the lowercase version of the type.

Please note that I've added the Lombok library as the telescoping constructors problem was too much in the Column class, so I refactored it to use the builder pattern.

Checklist

Delete items not relevant to your PR:

  • [x] Unit and integration tests covering the common scenarios were added
  • [x] A human-readable description of the changes was provided to include in CHANGELOG
  • [x] For significant changes, documentation in https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials
    • https://github.com/ClickHouse/clickhouse-docs/pull/2202

jirislav avatar Apr 08 '24 09:04 jirislav

CLA assistant check
All committers have signed the CLA.

CLAassistant avatar Apr 08 '24 09:04 CLAassistant

For anyone interested, I've released the zip here.

jirislav avatar Apr 08 '24 09:04 jirislav

The following tests are failing due to https://github.com/ClickHouse/clickhouse-kafka-connect/issues/367:

  • ClickHouseSinkTaskWithSchemaTest.schemaWithDefaultsTest
  • ClickHouseSinkTaskWithSchemaTest.schemaWithDefaultsAndNullableTest
  • ClickHouseSinkTaskTest.testDBTopicSplit

jirislav avatar Apr 08 '24 11:04 jirislav

@jirislav Thanks for the contribution, I have disabled those tests in my latest merge tom main can you aline your brach with main and check again if the tests are still falling

mzitnik avatar Apr 08 '24 14:04 mzitnik

@mzitnik Thank you! I have rebased my changes and adjusted the GitHub workflow defintion to run cloud tests only when the secrets are defined.

All tests are passing except this one:

ClickHouseSinkTaskWithSchemaProxyTest > schemaWithDefaultsTest() FAILED
    org.opentest4j.AssertionFailedError: expected: <1000> but was: <500>
        at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
        at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
        at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
        at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
        at app//com.clickhouse.kafka.connect.sink.ClickHouseSinkTaskWithSchemaProxyTest.schemaWithDefaultsTest(ClickHouseSinkTaskWithSchemaProxyTest.java:283)

I have no idea what's wrong here, I haven't touched anything related (I think).

When I inspect the actual table contents, the off16 column contains only even numbers (0, 2, 4 …). Can you think of anything that could be the issue here? It fails both on my local machine and using GitHub workflows.

jirislav avatar Apr 08 '24 19:04 jirislav

I have no idea what's wrong here, I haven't touched anything related (I think).

Ok, turns out it was actually my fault. During the refactor, the hasDefaults setting of the Table instance was lost. Now it's fixed and it's working again.

I'm glad you have this test.

All tests are passing now.

jirislav avatar Apr 08 '24 20:04 jirislav

It turns out that adding support for Nested type is really easy, so I just did it and released it to the public here (if somebody didn't want to wait for merge):

  • https://github.com/jirislav/clickhouse-kafka-connect/releases/tag/v1.1.0%2Btuple-variant-and-nested

jirislav avatar Apr 08 '24 23:04 jirislav

im seeing this error:

INFO 2024-05-29T14:39:51.163593458Z java.lang.IllegalArgumentException: DESCRIBE TABLE is never supposed to return Nested type. It should always yield its Array fields directly. at com.clickhouse.kafka.connect.sink.db.mapping.Column.extractColumn(Column.java:204) at com.clickhouse.kafka.connect.sink.db.mapping.Column.extractColumn(Column.java:159) at com.clickhouse.kafka.connect.sink.db.mapping.Column.extractColumn(Column.java:155) at com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient.describeTable(ClickHouseHelperClient.java:220) at com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient.extractTablesMapping(ClickHouseHelperClient.java:248) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.updateMapping(ClickHouseWriter.java:123) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.start(ClickHouseWriter.java:103) at com.clickhouse.kafka.connect.sink.ProxySinkTask.(ProxySinkTask.java:61) at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.start(ClickHouseSinkTask.java:57) at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)

the logs dont include the table that caused it, so i dont have a repro yet

dermasmid avatar May 29 '24 18:05 dermasmid

@dermasmid This has to be specific to the table you're using. I ran the schemaWithNestedTupleMapArrayAndVariant test on the latest ClickHouse version 24.4 without any error.

Note that we don't have the context of the table being parsed within the Column class, so the only way how you can find out what table is causing this is by enabling the DEBUG log level. You should see a preceding DESCRIBE TABLE statement pointing to the problematic table. Please run the DESCRIBE TABLE statement yourself and share the output of it here as well as the version of your ClickHouse installation.

jirislav avatar May 29 '24 19:05 jirislav

heres the debug log:

Extracting column business_units with type Nested(bu_id String, name Nullable(String), currency Nullable(String), status String, timezone Nullable(String), tw_account_id Nullable(String), integration_ids Array(Nullable(String)), created_at Nullable(DateTime), updated_at Nullable(DateTime)) (com.clickhouse.kafka.connect.sink.db.mapping.Column) [task-thread-sonic-clickhouse-0]

clickhouse version 24.4

dermasmid avatar May 29 '24 19:05 dermasmid

note, this column is not from the table i intend to write to, but because its in the database that i set, it seems like to connector is fetching it. if i moved my table of interest to a clean database, it worked without issue

dermasmid avatar May 29 '24 19:05 dermasmid

i opened issue #399

dermasmid avatar Jun 03 '24 11:06 dermasmid