trino
trino copied to clipboard
Allow to configure prefix for internal Kafka fields
Description
When we process Kafka topics, for internal usage we add some bunch of additional columns with some useful data and we fill them with data which is coming from Kafka (RecordSet), for example:
case PARTITION_OFFSET_FIELD -> longValueProvider(message.offset());
By default in current implementation this columns have some hardcoded names like:
_partition_id
_message_corrupt
e.t.c
So it could be a situation, that Kafka topic itself have some fields with similar name, and in this case during processing we could have a conflict (like two columns have the same name, for example _key
)
So reason of this PR is to provide ability to tune internal column names with custom prefix like XX_my_prefix_XX_key
, so this conflict could be more rare then with simple current default prefix _key
This PR will not resolve problem, but just postpone it until times when users will have more difficult column name, which will conflict with our internal column.
This change is backward compatible so for people who don't have colliding field names they don't need to change their existing queries to work with newer version.
The drawback is that this can't anticipate a collision ahead of time. It does however allow adminstrators to use a unique prefix unlikely to ever exist - e.g. __kafka_message_metadata_
.
Thank you, @vlad-lyutenko and @hashhar for the description :)
Non-technical explanation
Release notes
( ) This is not user-visible or docs only and no release notes are required. ( ) Release notes are required, please propose a release note for me. (*) Release notes are required, with the following suggested text: Workaround for encountering "multiple entries with same key" while querying Kafka
# Section
* Fix some things. ({issue}`issuenumber`)
I tried to understand what is going in this PR, let me try to summarise it and please correct me if I am wrong or miss something.
When we process Kafka topics, for internal usage we add some bunch of additional columns with some useful data and we fill them with data which is coming from Kafka (RecordSet), for example:
case PARTITION_OFFSET_FIELD -> longValueProvider(message.offset());
By default in current implementation this columns have some hardcoded names like:
_partition_id
_message_corrupt
e.t.c
So it could be a situation, that Kafka topic itself have some fields with similar name, and in this case during processing we could have a conflict (like two columns have the same name, for example _key
)
So reason of this PR is to provide ability to tune internal column names with custom prefix like XX_my_prefix_XX_key
,
so this conflict could be more rare then with simple current default prefix _key
If my description is correct and I understand correctly, this PR will not resolve problem, but just postpone it until times when users will have more difficult column name, which will conflict with our internal column.
But maybe I missed smth, please correct me if I ma wrong
@vlad-lyutenko Exactly correct understanding.
this PR will not resolve problem, but just postpone it until times when users will have more difficult column name, which will conflict with our internal column.
True. This change has some benefits - it's backward compatible so for people who don't have colliding field names they don't need to change their existing queries to work with newer version.
The drawback is that this can't anticipate a collision ahead of time. It does however allow adminstrators to use a unique prefix unlikely to ever exist - e.g. __kafka_message_metadata_
.
Is there some other solution you can think of? One idea which I was thinking was to expose all internal columns as a ROW
type with multiple fields instead of separate column per field - that way you only have to worry about one collision.
One idea which I was thinking was to expose all internal columns as a ROW type with multiple fields instead of separate column per field - that way you only have to worry about one collision.
The probability in O() notation is the same, however it's more complex implementation. With 10 columns it's O(10 * n) = O(n)
Is there some other solution you can think of? One idea which I was thinking was to expose all internal columns as a
ROW
type with multiple fields instead of separate column per field - that way you only have to worry about one collision.
I think in this case, we will lose backward compatibility for people who already have some queries, I more think about solution of providing users with more detailed description what happens and how to resolve problem, using custom prefix. Looks like current exception is coming somewhere from core/engine (not from connector). So maybe we can add such check on column collision inside connector, with more detailed description, not to get any open issues/tickets from users in future.
I more think about solution of providing users with more detailed description what happens and how to resolve problem, using custom prefix.
This I agree with - see https://github.com/trinodb/trino/pull/14224#discussion_r976109333
What if we could generate the name of these meta-columns based on the table structure. Like if we have column _timestamp_n
then we could generate the metacolumn name as _timestamp_(n+1)
it would be backward compatible and will also avoid the conflict if it happens anytime in the future ?
What if we could generate the name of these meta-columns based on the table structure. Like if we have column
_timestamp_n
then we could generate the metacolumn name as_timestamp_(n+1)
it would be backward compatible and will also avoid the conflict if it happens anytime in the future ?
If these internal columns are used by endusers (and according to docs it's possible to query them) I think it's no-go as most probably clients rely on some concrete names.
I think the current solution is the best we can do for now while also not breaking user queries.
An alternative might be to expose these columns using prefixes which are invalid for the target system - in Hive that's $
but AFAIK in Kafka connector there are no limitations on field names other than for Avro which requires prefix to be [A-Za-z_]
.
LGTM % remaining comments. Please let me know when addressed @ssheikin
Looks like current exception is coming somewhere from core/engine (not from connector). So maybe we can add such check on column collision inside connector, with more detailed description, not to get any open issues/tickets from users in future.
Current exception comes from connector. I've Improved exception message.
@hashhar @vlad-lyutenko @Praveen2112 All comments are addressed. Please take a look one more time.
Looks like current exception is coming somewhere from core/engine (not from connector). So maybe we can add such check on column collision inside connector, with more detailed description, not to get any open issues/tickets from users in future.
Current exception comes from connector. I've Improved exception message.
Ok, thx, now I see it was from buildOrThrow
of immutable map.
I am ok with changes
ci / pt (default, suite-6-non-generic, ) (pull_request) https://github.com/trinodb/trino/actions/runs/3111929833/jobs/5045112147
tests | 2022-09-22 23:00:02 INFO: FAILURE / io.trino.tests.product.kafka.TestKafkaPushdownSmokeTest.testCreateTimePushdown (Groups: profile_specific_tests, kafka) took 6.0 seconds
tests | 2022-09-22 23:00:02 SEVERE: Failure cause:
tests | java.lang.AssertionError: Not equal rows:
tests | 0 - expected: 2|
tests | 0 - actual: 3|
tests | at io.trino.tests.product.kafka.TestKafkaPushdownSmokeTest.testCreateTimePushdown(TestKafkaPushdownSmokeTest.java:190)
tests | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
tests | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
tests | at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
tests | at java.base/java.lang.reflect.Method.invoke(Method.java:568)
tests | at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
tests | at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
tests | at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
tests | at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
tests | at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
tests | at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
tests | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
tests | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
tests | at java.base/java.lang.Thread.run(Thread.java:833)
@hashhar I believe there no outstanding questions left here. Please approve and merge.
@wendigo please approive
Restarting checks