butterfree
butterfree copied to clipboard
AnalysisException: Undefined function: 'fat'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0
I try to get feature from csv and kafka but got that error:
from butterfree.extract import Source
from butterfree.extract.readers import FileReader
from butterfree.extract.readers import KafkaReader
kafka_reader = KafkaReader(
id="events",
topic="queue.transactions",
value_schema=schema_kafka,
connection_string="kafka:29092",
stream=False
)
readers = [
kafka_reader,
FileReader(id="nutrients", path="starbucks-menu-nutrition-drinks.csv", format="csv", schema=schema_file)
]
query = """
select
*
from
events
join nutrients
on events.product_name = nutrients.name
"""
source = Source(readers=readers, query=query)
source_df = source.construct(spark_client)
Error
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-18-9b5f19e7b117> in <module>
----> 1 source_df = source.construct(spark_client)
/opt/conda/lib/python3.8/site-packages/butterfree/extract/source.py in construct(self, client, start_date, end_date)
82 """
83 for reader in self.readers:
---> 84 reader.build(
85 client=client, start_date=start_date, end_date=end_date
86 ) # create temporary views for each reader
/opt/conda/lib/python3.8/site-packages/butterfree/extract/readers/reader.py in build(self, client, columns, start_date, end_date)
103
104 """
--> 105 column_selection_df = self._select_columns(columns, client)
106 transformed_df = self._apply_transformations(column_selection_df)
107
/opt/conda/lib/python3.8/site-packages/butterfree/extract/readers/reader.py in _select_columns(self, columns, client)
119 ) -> DataFrame:
120 df = self.consume(client)
--> 121 return df.selectExpr(
122 *(
123 [
/usr/local/spark/python/pyspark/sql/dataframe.py in selectExpr(self, *expr)
1433 if len(expr) == 1 and isinstance(expr[0], list):
1434 expr = expr[0]
-> 1435 jdf = self._jdf.selectExpr(self._jseq(expr))
1436 return DataFrame(jdf, self.sql_ctx)
1437
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
135 # Hide where the exception came from that shows a non-Pythonic
136 # JVM exception message.
--> 137 raise_from(converted)
138 else:
139 raise
/usr/local/spark/python/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Undefined function: 'fat'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0