dask-sql icon indicating copy to clipboard operation
dask-sql copied to clipboard

[ENH] Support casting timedeltas to plain numerics

Open randerzander opened this issue 3 years ago • 0 comments

Related to #411

I'm trying to convert a timedelta to an integer:

import pandas as pd
from dask_sql import Context

c = Context()

df = pd.DataFrame({'dt0': ['2022-03-01 12:00:00'], 'dt1': ['2022-03-01 13:00:00']})
df['dt0'] = df['dt0'].astype('datetime64[s]')
df['dt1'] = df['dt1'].astype('datetime64[s]')

c.create_table('test_dt', df)
c.sql("""
SELECT CAST(dt1-dt0 SECOND as int)
FROM test_dt
""").compute()

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:818, in RexCallPlugin.convert(self, rex, dc, context)
    817 try:
--> 818     operation = self.OPERATION_MAPPING[operator_name]
    819 except KeyError:

KeyError: 'reinterpret'

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:821, in RexCallPlugin.convert(self, rex, dc, context)
    820 try:
--> 821     operation = context.schema[schema_name].functions[operator_name]
    822 except KeyError:  # pragma: no cover

KeyError: 'reinterpret'

During handling of the above exception, another exception occurred:

NotImplementedError                       Traceback (most recent call last)
Input In [122], in <module>
----> 1 get_ipython().run_cell_magic('sql', '', '\nselect cast(td as int) from (\nselect (dt1-dt0) SECOND td\nfrom test_dt\n) a\n')

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/IPython/core/interactiveshell.py:2257, in InteractiveShell.run_cell_magic(self, magic_name, line, cell)
   2255 with self.builtin_trap:
   2256     args = (magic_arg_s, cell)
-> 2257     result = fn(*args, **kwargs)
   2258 return result

Input In [8], in sql(line, cell, local_ns)
     12 sql_statement = cell.format(**local_ns)
     13 t0 = time.time()
---> 14 res = c.sql(sql_statement)
     15 if (
     16     "CREATE OR REPLACE TABLE" in sql_statement
     17     or "CREATE OR REPLACE VIEW" in sql_statement
     18 ):
     19     table = sql_statement.split("CREATE OR REPLACE")[1]

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/context.py:460, in Context.sql(self, sql, return_futures, dataframes, gpu)
    456         self.create_table(df_name, df, gpu=gpu)
    458 rel, select_names, _ = self._get_ral(sql)
--> 460 dc = RelConverter.convert(rel, context=self)
    462 if dc is None:
    463     return

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rel/convert.py:60, in RelConverter.convert(cls, rel, context)
     54     raise NotImplementedError(
     55         f"No conversion for class {class_name} available (yet)."
     56     )
     57 logger.debug(
     58     f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     59 )
---> 60 df = plugin_instance.convert(rel, context=context)
     61 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     62 return df

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rel/logical/project.py:55, in DaskProjectPlugin.convert(self, rel, context)
     53 else:
     54     random_name = new_temporary_column(df)
---> 55     new_columns[random_name] = RexConverter.convert(
     56         expr, dc, context=context
     57     )
     58     logger.debug(f"Adding a new column {key} out of {expr}")
     59     new_mappings[key] = random_name

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/convert.py:66, in RexConverter.convert(cls, rex, dc, context)
     58     raise NotImplementedError(
     59         f"No conversion for class {class_name} available (yet)."
     60     )
     62 logger.debug(
     63     f"Processing REX {rex} using {plugin_instance.__class__.__name__}..."
     64 )
---> 66 df = plugin_instance.convert(rex, dc, context=context)
     67 logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     68 return df

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:809, in RexCallPlugin.convert(self, rex, dc, context)
    802 def convert(
    803     self,
    804     rex: "org.apache.calcite.rex.RexNode",
   (...)
    807 ) -> SeriesOrScalar:
    808     # Prepare the operands by turning the RexNodes into python expressions
--> 809     operands = [
    810         RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    811     ]
    813     # Now use the operator name in the mapping
    814     schema_name, operator_name = context.fqn(rex.getOperator().getNameAsId())

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:810, in <listcomp>(.0)
    802 def convert(
    803     self,
    804     rex: "org.apache.calcite.rex.RexNode",
   (...)
    807 ) -> SeriesOrScalar:
    808     # Prepare the operands by turning the RexNodes into python expressions
    809     operands = [
--> 810         RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    811     ]
    813     # Now use the operator name in the mapping
    814     schema_name, operator_name = context.fqn(rex.getOperator().getNameAsId())

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/convert.py:66, in RexConverter.convert(cls, rex, dc, context)
     58     raise NotImplementedError(
     59         f"No conversion for class {class_name} available (yet)."
     60     )
     62 logger.debug(
     63     f"Processing REX {rex} using {plugin_instance.__class__.__name__}..."
     64 )
---> 66 df = plugin_instance.convert(rex, dc, context=context)
     67 logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     68 return df

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:809, in RexCallPlugin.convert(self, rex, dc, context)
    802 def convert(
    803     self,
    804     rex: "org.apache.calcite.rex.RexNode",
   (...)
    807 ) -> SeriesOrScalar:
    808     # Prepare the operands by turning the RexNodes into python expressions
--> 809     operands = [
    810         RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    811     ]
    813     # Now use the operator name in the mapping
    814     schema_name, operator_name = context.fqn(rex.getOperator().getNameAsId())

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:810, in <listcomp>(.0)
    802 def convert(
    803     self,
    804     rex: "org.apache.calcite.rex.RexNode",
   (...)
    807 ) -> SeriesOrScalar:
    808     # Prepare the operands by turning the RexNodes into python expressions
    809     operands = [
--> 810         RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
    811     ]
    813     # Now use the operator name in the mapping
    814     schema_name, operator_name = context.fqn(rex.getOperator().getNameAsId())

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/convert.py:66, in RexConverter.convert(cls, rex, dc, context)
     58     raise NotImplementedError(
     59         f"No conversion for class {class_name} available (yet)."
     60     )
     62 logger.debug(
     63     f"Processing REX {rex} using {plugin_instance.__class__.__name__}..."
     64 )
---> 66 df = plugin_instance.convert(rex, dc, context=context)
     67 logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     68 return df

File ~/conda/envs/dsql-2-28/lib/python3.9/site-packages/dask_sql/physical/rex/core/call.py:823, in RexCallPlugin.convert(self, rex, dc, context)
    821         operation = context.schema[schema_name].functions[operator_name]
    822     except KeyError:  # pragma: no cover
--> 823         raise NotImplementedError(f"{operator_name} not (yet) implemented")
    825 logger.debug(
    826     f"Executing {operator_name} on {[str(LoggableDataFrame(df)) for df in operands]}"
    827 )
    829 kwargs = {}

NotImplementedError: reinterpret not (yet) implemented

cc @ayushdg , @jdye64

randerzander avatar Mar 02 '22 01:03 randerzander