[Bug]: apache.calcite CannotPlanException: All the inputs have relevant nodes, however the cost is still infinite.
What happened?
I use SqlTransform component in an Apache Beam pipeline running in DataFlow. If I add one more variable in the SQL query, I get the error:
RuntimeError: org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to convert query .... Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BEAM_LOGICAL. All the inputs have relevant nodes, however the cost is still infinite.
So, this query is working:
windowing_query = """SELECT DATE_STR, SUBS_ID, (NUM_1 + NUM_2 + NUM_3) AS TOTAL_COST,
AVG(NUM_1) OVER (w ROWS 2 PRECEDING) AS NUM_1_sliding_3M
FROM PCOLLECTION
WINDOW w AS (PARTITION BY SUBS_ID ORDER BY DATE_STR)"""
While, this query is not working:
windowing_query = """SELECT DATE_STR, SUBS_ID, (NUM_1 + NUM_2 + NUM_3) AS TOTAL_COST,
AVG(NUM_1) OVER (w ROWS 2 PRECEDING) AS NUM_1_sliding_3M,
AVG(NUM_2) OVER (w ROWS 2 PRECEDING) AS NUM_2_sliding_3M
FROM PCOLLECTION
WINDOW w AS (PARTITION BY SUBS_ID ORDER BY DATE_STR)"""
The difference between these two queries is only one line:
AVG(NUM_2) OVER (w ROWS 2 PRECEDING) AS NUM_2_sliding_3M
Pipeline definition:
with beam.Pipeline(runner, options=pipeline_options) as pipeline:
logging.info(f'pipeline_options: {pipeline_options}')
logging.getLogger().setLevel(logging.INFO)
# Preprocess train data
step = 'train'
# Read raw train data from BQ
raw_train_dataset = read_from_bq(pipeline, step, data_size)
rows_train_dataset = raw_train_dataset[0] | 'Convert to Rows' >> beam.ParDo(ConvertToRow(data_types))
# Apply the SQL transform
filtered_rows = rows_train_dataset | SqlTransform(windowing_query)
Why the SqlTransform does not accept more than one rolling average computation?
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [X] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
I tested locally different queries and here are the results. I guess total and avg1, avg2 columns are somehow related and together lead to an error in certain cases.
Successful:
SELECT (f_int2 + f_int2) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)SELECT AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1, AVG(f_int2) OVER (w ROWS 2 PRECEDING) as avg2 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)SELECT (f_int2 + f_int2) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg2 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)
With error:
SELECT (f_int + f_int) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)SELECT (f_int + f_int) as total, AVG(f_int2) OVER (w ROWS 2 PRECEDING) as avg1 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)SELECT (f_int2 + f_int2) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1, AVG(f_int2) OVER (w ROWS 2 PRECEDING) as avg2 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)