trino icon indicating copy to clipboard operation
trino copied to clipboard

Allow to configure prefix for internal Kafka fields

Open ssheikin opened this issue 2 years ago • 5 comments

Description

When we process Kafka topics, for internal usage we add some bunch of additional columns with some useful data and we fill them with data which is coming from Kafka (RecordSet), for example: case PARTITION_OFFSET_FIELD -> longValueProvider(message.offset()); By default in current implementation this columns have some hardcoded names like: _partition_id _message_corrupt e.t.c So it could be a situation, that Kafka topic itself have some fields with similar name, and in this case during processing we could have a conflict (like two columns have the same name, for example _key) So reason of this PR is to provide ability to tune internal column names with custom prefix like XX_my_prefix_XX_key, so this conflict could be more rare then with simple current default prefix _key This PR will not resolve problem, but just postpone it until times when users will have more difficult column name, which will conflict with our internal column. This change is backward compatible so for people who don't have colliding field names they don't need to change their existing queries to work with newer version. The drawback is that this can't anticipate a collision ahead of time. It does however allow adminstrators to use a unique prefix unlikely to ever exist - e.g. __kafka_message_metadata_.

Thank you, @vlad-lyutenko and @hashhar for the description :)

Non-technical explanation

Release notes

( ) This is not user-visible or docs only and no release notes are required. ( ) Release notes are required, please propose a release note for me. (*) Release notes are required, with the following suggested text: Workaround for encountering "multiple entries with same key" while querying Kafka

# Section
* Fix some things. ({issue}`issuenumber`)

ssheikin avatar Sep 20 '22 22:09 ssheikin

I tried to understand what is going in this PR, let me try to summarise it and please correct me if I am wrong or miss something.

When we process Kafka topics, for internal usage we add some bunch of additional columns with some useful data and we fill them with data which is coming from Kafka (RecordSet), for example:

case PARTITION_OFFSET_FIELD -> longValueProvider(message.offset());

By default in current implementation this columns have some hardcoded names like:

_partition_id _message_corrupt e.t.c

So it could be a situation, that Kafka topic itself have some fields with similar name, and in this case during processing we could have a conflict (like two columns have the same name, for example _key)

So reason of this PR is to provide ability to tune internal column names with custom prefix like XX_my_prefix_XX_key, so this conflict could be more rare then with simple current default prefix _key

If my description is correct and I understand correctly, this PR will not resolve problem, but just postpone it until times when users will have more difficult column name, which will conflict with our internal column.

But maybe I missed smth, please correct me if I ma wrong

vlad-lyutenko avatar Sep 21 '22 12:09 vlad-lyutenko

@vlad-lyutenko Exactly correct understanding.

this PR will not resolve problem, but just postpone it until times when users will have more difficult column name, which will conflict with our internal column.

True. This change has some benefits - it's backward compatible so for people who don't have colliding field names they don't need to change their existing queries to work with newer version.

The drawback is that this can't anticipate a collision ahead of time. It does however allow adminstrators to use a unique prefix unlikely to ever exist - e.g. __kafka_message_metadata_.

Is there some other solution you can think of? One idea which I was thinking was to expose all internal columns as a ROW type with multiple fields instead of separate column per field - that way you only have to worry about one collision.

hashhar avatar Sep 21 '22 12:09 hashhar

One idea which I was thinking was to expose all internal columns as a ROW type with multiple fields instead of separate column per field - that way you only have to worry about one collision.

The probability in O() notation is the same, however it's more complex implementation. With 10 columns it's O(10 * n) = O(n)

ssheikin avatar Sep 21 '22 13:09 ssheikin

Is there some other solution you can think of? One idea which I was thinking was to expose all internal columns as a ROW type with multiple fields instead of separate column per field - that way you only have to worry about one collision.

I think in this case, we will lose backward compatibility for people who already have some queries, I more think about solution of providing users with more detailed description what happens and how to resolve problem, using custom prefix. Looks like current exception is coming somewhere from core/engine (not from connector). So maybe we can add such check on column collision inside connector, with more detailed description, not to get any open issues/tickets from users in future.

vlad-lyutenko avatar Sep 21 '22 13:09 vlad-lyutenko

I more think about solution of providing users with more detailed description what happens and how to resolve problem, using custom prefix.

This I agree with - see https://github.com/trinodb/trino/pull/14224#discussion_r976109333

hashhar avatar Sep 21 '22 18:09 hashhar

What if we could generate the name of these meta-columns based on the table structure. Like if we have column _timestamp_n then we could generate the metacolumn name as _timestamp_(n+1) it would be backward compatible and will also avoid the conflict if it happens anytime in the future ?

Praveen2112 avatar Sep 22 '22 06:09 Praveen2112

What if we could generate the name of these meta-columns based on the table structure. Like if we have column _timestamp_n then we could generate the metacolumn name as _timestamp_(n+1) it would be backward compatible and will also avoid the conflict if it happens anytime in the future ?

If these internal columns are used by endusers (and according to docs it's possible to query them) I think it's no-go as most probably clients rely on some concrete names.

ssheikin avatar Sep 22 '22 08:09 ssheikin

I think the current solution is the best we can do for now while also not breaking user queries.

An alternative might be to expose these columns using prefixes which are invalid for the target system - in Hive that's $ but AFAIK in Kafka connector there are no limitations on field names other than for Avro which requires prefix to be [A-Za-z_].

hashhar avatar Sep 22 '22 09:09 hashhar

LGTM % remaining comments. Please let me know when addressed @ssheikin

hashhar avatar Sep 22 '22 09:09 hashhar

Looks like current exception is coming somewhere from core/engine (not from connector). So maybe we can add such check on column collision inside connector, with more detailed description, not to get any open issues/tickets from users in future.

Current exception comes from connector. I've Improved exception message.

ssheikin avatar Sep 22 '22 13:09 ssheikin

@hashhar @vlad-lyutenko @Praveen2112 All comments are addressed. Please take a look one more time.

ssheikin avatar Sep 22 '22 13:09 ssheikin

Looks like current exception is coming somewhere from core/engine (not from connector). So maybe we can add such check on column collision inside connector, with more detailed description, not to get any open issues/tickets from users in future.

Current exception comes from connector. I've Improved exception message.

Ok, thx, now I see it was from buildOrThrow of immutable map.

vlad-lyutenko avatar Sep 22 '22 14:09 vlad-lyutenko

I am ok with changes

vlad-lyutenko avatar Sep 22 '22 14:09 vlad-lyutenko

ci / pt (default, suite-6-non-generic, ) (pull_request) https://github.com/trinodb/trino/actions/runs/3111929833/jobs/5045112147

tests               | 2022-09-22 23:00:02 INFO: FAILURE     /    io.trino.tests.product.kafka.TestKafkaPushdownSmokeTest.testCreateTimePushdown (Groups: profile_specific_tests, kafka) took 6.0 seconds
tests               | 2022-09-22 23:00:02 SEVERE: Failure cause:
tests               | java.lang.AssertionError: Not equal rows:
tests               | 0 - expected: 2|
tests               | 0 - actual:   3|
tests               | 	at io.trino.tests.product.kafka.TestKafkaPushdownSmokeTest.testCreateTimePushdown(TestKafkaPushdownSmokeTest.java:190)
tests               | 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
tests               | 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
tests               | 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
tests               | 	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
tests               | 	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
tests               | 	at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
tests               | 	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
tests               | 	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
tests               | 	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
tests               | 	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
tests               | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
tests               | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
tests               | 	at java.base/java.lang.Thread.run(Thread.java:833)

ssheikin avatar Sep 23 '22 08:09 ssheikin

@hashhar I believe there no outstanding questions left here. Please approve and merge.

ssheikin avatar Oct 03 '22 09:10 ssheikin

@wendigo please approive

ssheikin avatar Oct 11 '22 15:10 ssheikin

image Restarting checks

ssheikin avatar Oct 13 '22 09:10 ssheikin