beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: apache.calcite CannotPlanException: All the inputs have relevant nodes, however the cost is still infinite.

Open crbl1122 opened this issue 1 year ago • 1 comments

What happened?

I use SqlTransform component in an Apache Beam pipeline running in DataFlow. If I add one more variable in the SQL query, I get the error:

RuntimeError: org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to convert query .... Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BEAM_LOGICAL. All the inputs have relevant nodes, however the cost is still infinite.

So, this query is working:

windowing_query = """SELECT DATE_STR, SUBS_ID, (NUM_1 + NUM_2 + NUM_3) AS TOTAL_COST,
                    AVG(NUM_1) OVER (w ROWS 2 PRECEDING) AS NUM_1_sliding_3M
                    FROM PCOLLECTION 
                    WINDOW w AS (PARTITION BY SUBS_ID ORDER BY DATE_STR)"""

While, this query is not working:

windowing_query = """SELECT DATE_STR, SUBS_ID, (NUM_1 + NUM_2 + NUM_3) AS TOTAL_COST,
                    AVG(NUM_1) OVER (w ROWS 2 PRECEDING) AS NUM_1_sliding_3M,
                    AVG(NUM_2) OVER (w ROWS 2 PRECEDING) AS NUM_2_sliding_3M
                    FROM PCOLLECTION 
                    WINDOW w AS (PARTITION BY SUBS_ID ORDER BY DATE_STR)"""

The difference between these two queries is only one line: AVG(NUM_2) OVER (w ROWS 2 PRECEDING) AS NUM_2_sliding_3M

Pipeline definition:

   with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        logging.info(f'pipeline_options: {pipeline_options}')
        logging.getLogger().setLevel(logging.INFO)
        

        # Preprocess train data
        step = 'train'
        # Read raw train data from BQ
        raw_train_dataset = read_from_bq(pipeline, step, data_size) 
        rows_train_dataset = raw_train_dataset[0] | 'Convert to Rows' >> beam.ParDo(ConvertToRow(data_types))
        
        # Apply the SQL transform
        filtered_rows = rows_train_dataset | SqlTransform(windowing_query)

Why the SqlTransform does not accept more than one rolling average computation?

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [X] Component: Python SDK
  • [X] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [ ] Component: Google Cloud Dataflow Runner

crbl1122 avatar Mar 01 '24 08:03 crbl1122

I tested locally different queries and here are the results. I guess total and avg1, avg2 columns are somehow related and together lead to an error in certain cases.

Successful:

  1. SELECT (f_int2 + f_int2) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)
  2. SELECT AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1, AVG(f_int2) OVER (w ROWS 2 PRECEDING) as avg2 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)
  3. SELECT (f_int2 + f_int2) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg2 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)

With error:

  1. SELECT (f_int + f_int) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)
  2. SELECT (f_int + f_int) as total, AVG(f_int2) OVER (w ROWS 2 PRECEDING) as avg1 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)
  3. SELECT (f_int2 + f_int2) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1, AVG(f_int2) OVER (w ROWS 2 PRECEDING) as avg2 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)

Amar3tto avatar Jun 18 '24 13:06 Amar3tto