trino icon indicating copy to clipboard operation
trino copied to clipboard

Add limit pushdown for Kafka connectors, and further refactor and enhance predicate pushdown in Kafka connector

Open xxzhky opened this issue 1 year ago • 17 comments

Description

Add limit pushdown for Kafka connectors, and further refactor and enhance predicate pushdown in Kafka connector

Additional context and related issues

Add limit pushdown, and enhance predicate pushdown in Kafka connector

Significant changes have been made to improve predicate push-down support in Kafka connector. The code has been refactored to better handle intersection of old and new domains.

Additionally, the functionality to override partition end and begin offsets with range has been enhanced.

Lastly, an option to enable or disable predicate pushdown at runtime through session properties has been introduced. By default, it supports predicate push-down. Push-down also can be disabled via kafka.predicate-force-push-down-enabled config prop or kafka.predicate_force_push_down_enabled session prop.

I try my test to make your life easier so that extensive comments have been added across multiple files in the Kafka connector for better readability and understanding of the code.

Release notes

(1) Add limit pushdown for Kafka connectors, (2) Enhance predicate pushdown in Kafka connector.

xxzhky avatar Dec 17 '23 03:12 xxzhky

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar Jan 10 '24 17:01 github-actions[bot]

hello why has it gone?

---- Replied Message ---- | From | @.> | | Date | 01/11/2024 01:22 | | To | trinodb/trino @.> | | Cc | Fred @.>, Author @.> | | Subject | Re: [trinodb/trino] Add limit pushdown for Kafka connectors, and further refactor and enhance predicate pushdown in Kafka connector (PR #20146) |

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @@.***@mosabua

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

xxzhky avatar Jan 11 '24 00:01 xxzhky

Could @wendigo @findepi @elonazoulay or anybody else with more knowledge about the Kafka connector help @xxzhky with a review please to move this PR forward.

Also @xxzhky can you confirm that the build passes locally and the CI failures are wrong, or otherwise fix these issues.

mosabua avatar Jan 11 '24 00:01 mosabua

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar Feb 02 '24 17:02 github-actions[bot]

@xxzhky could you rebase and fix any build issues

mosabua avatar Feb 02 '24 20:02 mosabua

@mosabua yes, I confirm that the build passes locally, however, some irrational checking issues happen to me. i.g. ci / maven-checks

xxzhky avatar Feb 04 '24 03:02 xxzhky

I've rebased this PR locally and checked the dependency scope failure and here's the fix:

commit d896b0cf12d7b4c16818467166f225dbe38546fc
Author: Mateusz "Serafin" Gajewski <[email protected]>
Date:   Sun Feb 4 11:19:06 2024 +0100

    Fix

diff --git a/plugin/trino-kafka/pom.xml b/plugin/trino-kafka/pom.xml
index ce1fd85e77..6099a921f3 100644
--- a/plugin/trino-kafka/pom.xml
+++ b/plugin/trino-kafka/pom.xml
@@ -274,6 +274,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-parser</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>io.trino</groupId>
             <artifactId>trino-record-decoder</artifactId>
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestConstraintExtractor.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestConstraintExtractor.java
index 3f98d51064..47e7a597c2 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestConstraintExtractor.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestConstraintExtractor.java
@@ -29,6 +29,7 @@ import io.trino.spi.type.TimestampType;
 import io.trino.spi.type.TimestampWithTimeZoneType;
 import io.trino.spi.type.Type;
 import io.trino.sql.planner.ConnectorExpressionTranslator;
+import io.trino.sql.planner.IrTypeAnalyzer;
 import io.trino.sql.planner.LiteralEncoder;
 import io.trino.sql.planner.Symbol;
 import io.trino.sql.planner.TypeProvider;
@@ -69,7 +70,6 @@ import static io.trino.spi.type.VarcharType.createVarcharType;
 import static io.trino.sql.analyzer.TypeSignatureProvider.fromTypes;
 import static io.trino.sql.analyzer.TypeSignatureTranslator.toSqlType;
 import static io.trino.sql.planner.TestingPlannerContext.PLANNER_CONTEXT;
-import static io.trino.sql.planner.TypeAnalyzer.createTestingTypeAnalyzer;
 import static io.trino.sql.tree.ComparisonExpression.Operator.EQUAL;
 import static io.trino.sql.tree.ComparisonExpression.Operator.GREATER_THAN;
 import static io.trino.sql.tree.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL;
@@ -743,7 +743,7 @@ public class TestConstraintExtractor
                         TypeProvider.viewOf(symbolTypes.entrySet().stream()
                                 .collect(toImmutableMap(entry -> new Symbol(entry.getKey()), Map.Entry::getValue))),
                         PLANNER_CONTEXT,
-                        createTestingTypeAnalyzer(PLANNER_CONTEXT))
+                        new IrTypeAnalyzer(PLANNER_CONTEXT))
                 .orElseThrow(() -> new RuntimeException("Translation to ConnectorExpression failed for: " + expression));
     }

wendigo avatar Feb 04 '24 10:02 wendigo

@wendigo OK. Thanks very much, and could you like to help me fix it.

xxzhky avatar Feb 05 '24 08:02 xxzhky

FYI,here attached the screenshot which the build passes locally. image

xxzhky avatar Feb 05 '24 16:02 xxzhky

@xxzhky failure is in the dependency scope check, not tests. Tests are passing just fine on the CI.

wendigo avatar Feb 05 '24 17:02 wendigo

@xxzhky the diff in the comment above is the fix .. just apply that locally and amend the commit and force push.. then CI should pass

mosabua avatar Feb 05 '24 23:02 mosabua

@mosabua @wendigo all checks done

xxzhky avatar Feb 10 '24 02:02 xxzhky

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar Mar 04 '24 17:03 github-actions[bot]

@mosabua @wendigo why has it gone again

xxzhky avatar Mar 05 '24 03:03 xxzhky

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar Apr 15 '24 17:04 github-actions[bot]

@bitsondatadev @colebow @mosabua I am so busy recently, and I will handle it in the near future.

xxzhky avatar Apr 23 '24 03:04 xxzhky

Sounds good @xxzhky .. adding stale-ignore label.

mosabua avatar Apr 23 '24 04:04 mosabua

@Praveen2112 I've fixed too many conflicts in the past few days. It's very hard to solve all of them.

xxzhky avatar May 30 '24 12:05 xxzhky

@bitsondatadev @colebow @mosabua Is there anyone available to review it?

xxzhky avatar Jun 03 '24 11:06 xxzhky

I'll try to take a review by this week.

Praveen2112 avatar Jun 03 '24 15:06 Praveen2112

@Praveen2112 would you like to take a review recently?

xxzhky avatar Jul 05 '24 03:07 xxzhky

@bitsondatadev @colebow @mosabua Is there anyone available to review? thx.

xxzhky avatar Jul 13 '24 09:07 xxzhky

@xxzhky Is it possible to split the PR into 2-3 commits - one for refactors, enhancement and the limit pushdown. It would make the review process easier. It would also allow us to debug easier in the future

Praveen2112 avatar Jul 15 '24 03:07 Praveen2112