Register pivot
Could someone point me to where I could see an example for registering a new BigQuery operator to ibis?
More specifically, I would like to add the new pivot operator.
I imagine it to be called like this
table = con.table('table')
pivot = table.pivot(index='family', column={'type': ['fire', 'water']}, values=['sum', 'strength'])
And I would expect it to produce something like
SELECT * FROM
(SELECT * FROM `table`)
PIVOT(SUM(strength) FOR type IN ('fire', 'water', ))
I had a quick look at compiler.py but it isn't clear to me how to achieve this.
I had another look and it seems to me like I would need to overwrite the SelectBuilder and maybe even the UnionBuilder classes. If I wanted to achieve this outside this repo?
The ideal flow is that we add the operator to https://github.com/ibis-project/ibis/blob/master/ibis/expr/operations.py (plus add the pivot method to the table class(es). Then we register the implementation here: https://github.com/ibis-project/ibis-bigquery/blob/abe966b6ed8567fb26bb4f4c7f711f87921b9819/ibis_bigquery/compiler.py#L481-L484 (assuming this is a new operation in Ibis 2.x).
It's certainly possible to monkeypatch the base ibis classes here to have BigQuery-specific functionality, but I'd like to make sure we stay consistent with the rest of the ibis community.
Hm I think in this case it's very bigquery specific. Most databases don't have a pivot implementation?
I tried to register this for our project. This is what I have so far (probably not quite correct but not the issue right now).
import ibis.expr.rules as rlz
from ibis.expr.operations import Arg, TableNode
from ibis.expr.types import TableExpr
from ibis_bigquery import BigQueryExprTranslator, BigQueryTable
from ibis.expr.schema import HasSchema
compiles = BigQueryExprTranslator.compiles
class Pivot(TableNode, HasSchema):
table = Arg(TableExpr)
aggregation = Arg(rlz.scalar(rlz.string))
input_column = Arg(rlz.noop)
pivot_columns = Arg(rlz.array_of(rlz.string))
def pivot(table, aggregation, input_column, pivot_columns):
return Pivot(table, aggregation, input_column, pivot_columns).to_expr()
TableExpr.pivot = pivot
@compiles(Pivot)
def _pivot(t, expr):
table, aggregation, input_column, pivot_columns = expr.op().args
return (f"SELECT * FROM ({t.translate(table)})"
f"PIVOT({t.translate(aggregation)} FOR {t.translate(input_column)} IN {t.translate(pivot_columns)})")
Unfortunately, when I try to call table.pivot(...).compile() I am getting an error originating in base_sqlalchemy:
.
.
.
~/Projects/gyana/.venv/lib/python3.8/site-packages/ibis/backends/base_sqlalchemy/compiler.py in _collect_elements(self)
373
374 if isinstance(root_op, ops.TableNode):
--> 375 self._collect(source_expr, toplevel=True)
376 if self.table_set is None:
377 raise com.InternalError('no table set')
~/Projects/gyana/.venv/lib/python3.8/site-packages/ibis/backends/base_sqlalchemy/compiler.py in _collect(self, expr, toplevel)
399 self._collect_Join(expr, toplevel=toplevel)
400 else:
--> 401 raise NotImplementedError(type(op))
402
403 self.op_memo.add(op)
NotImplementedError: <class '__main__.Pivot'>
I can't figure out what I am doing wrong. It looks to me like it's not registering the compile function . I can call table.pivot(...) without an error but if I call compile it fails. Do I need to do something different when registering something on a TableExpr rather than a StringValue (the only thing I have gotten to work so far).