flink-http-connector icon indicating copy to clipboard operation
flink-http-connector copied to clipboard

Regression using Flink 1.20 relating to timestamp precision

Open davidradl opened this issue 9 months ago • 5 comments

I have a test case

 CREATE TABLE `source_1`
 (
     `customerId`                   STRING,
     `param_string_date_time`       TIMESTAMP_LTZ(9),
     `ts` TIMESTAMP(3),
     WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE
 )
 WITH (
     'connector' = 'filesystem',
     'path' = '/Users/davidradley/temp/regressionissue.txt',
     'format' = 'json'
 );

with the file contents:

{ "orderId": 1, "customerId": "1", "param_string_date_time": "2022-11-20 00:00:00.000Z", "ts": "2020-04-15 08:05:00.000" }
{ "orderId": 2, "customerId": "2", "param_string_date_time": "2022-11-21 00:00:00.000Z", "ts": "2020-04-15 08:07:00.000" }
{ "orderId": 3, "customerId": "3", "param_string_date_time": "2022-11-22 00:00:00.000Z", "ts": "2020-04-15 08:09:00.000" }
{ "orderId": 4, "customerId": "4", "param_string_date_time": "2022-11-23 00:00:00.000Z", "ts": "2020-04-15 08:11:00.000" }

I then create a view

CREATE TEMPORARY VIEW `api_1_source_1__API` AS
SELECT *, PROCTIME() AS `proc_time`
FROM `source_1`; 

and a lookup table

CREATE TEMPORARY TABLE `api_1_lookup__API`
(
    `customerId`           STRING,
    `param_string_date_time` TIMESTAMP_LTZ(6)
)
WITH (
    'connector' = 'rest-lookup',
    'url' = 'http://localhost:8089/api1',
    'format' = 'json',
    'asyncPolling' = 'false',
    'lookup-method' = 'GET',
    'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',
    'gid.connector.http.source.lookup.header.Origin' = '*',
    'gid.connector.http.source.lookup.header.X-Content-Type-Options' = 'nosniff',
    'gid.connector.http.source.lookup.request.timeout' = '30',
    'gid.connector.http.source.lookup.request.thread-pool.size' = '8',
    'gid.connector.http.source.lookup.response.thread-pool.size' = '4'
);

and so a lookup

SELECT
    `api_1_source_1__API`.`customerId` AS `customerId`,
    `api_1_lookup__API`.`param_string_date_time`  AS `param_string_date_time`
FROM `api_1_source_1__API`
JOIN `api_1_lookup__API` FOR SYSTEM_TIME AS OF `api_1_source_1__API`.`proc_time` ON
    `api_1_lookup__API`.`customerId`=`api_1_source_1__API`.`customerId` AND
    `api_1_lookup__API`.`param_string_date_time`=`api_1_source_1__API`.`param_string_date_time`;
;

At 1.19.1, the query that goes up has 2 query params customerId and param_string_date_time
At 1.20, the query that goes up has 1 query param customerId

This is caused by a flink PR which introduces logic to do compares at the maximum precision. The implication of this is that the table planner ends up seeing TIMESTAMP(6) and TIMESTAMP(9) as the 2 types in commonPhysicalLookupJoin analyzeLookupKeys. So it decides that this is not going to be a lookup key instead it treats it as a join condition.

Unfortunately the http connector currently is not looking for join conditions, so we lose this.

For JDBC connectors doing a look up, they support join conditions so the lookup join succeeds.

some thoughts / observations:

  • the http lookup connector to look for equality join conditions using similar logic to the JDBC connector then go ahead with this equality join condition as if it were a join key.
  • alternatively the lookup connector could be more explicit as to which conditions should be treated a lookup keys and which are join conditions, this would allow non-equality join conditions to be handled in the connector as a new capability
  • unfortunately applyFilters comes in an interface supportsFilterPushdown, which is only driven for scan sources.

@grzegorz8 @kzarzycki WDYT?

davidradl avatar Feb 03 '25 17:02 davidradl