butterfree icon indicating copy to clipboard operation
butterfree copied to clipboard

AnalysisException: Undefined function: 'fat'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0

Open manhtd98 opened this issue 1 year ago • 1 comments

I try to get feature from csv and kafka but got that error:

from butterfree.extract import Source
from butterfree.extract.readers import FileReader
from butterfree.extract.readers import KafkaReader

kafka_reader = KafkaReader(
    id="events",
    topic="queue.transactions",
    value_schema=schema_kafka,
    connection_string="kafka:29092",
    stream=False
)

readers = [
    kafka_reader,
    FileReader(id="nutrients", path="starbucks-menu-nutrition-drinks.csv", format="csv", schema=schema_file)
]

query = """
select
    *
from
    events
    join nutrients
        on events.product_name = nutrients.name
"""

source = Source(readers=readers, query=query)
source_df = source.construct(spark_client)

Error

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-18-9b5f19e7b117> in <module>
----> 1 source_df = source.construct(spark_client)

/opt/conda/lib/python3.8/site-packages/butterfree/extract/source.py in construct(self, client, start_date, end_date)
     82         """
     83         for reader in self.readers:
---> 84             reader.build(
     85                 client=client, start_date=start_date, end_date=end_date
     86             )  # create temporary views for each reader

/opt/conda/lib/python3.8/site-packages/butterfree/extract/readers/reader.py in build(self, client, columns, start_date, end_date)
    103 
    104         """
--> 105         column_selection_df = self._select_columns(columns, client)
    106         transformed_df = self._apply_transformations(column_selection_df)
    107 

/opt/conda/lib/python3.8/site-packages/butterfree/extract/readers/reader.py in _select_columns(self, columns, client)
    119     ) -> DataFrame:
    120         df = self.consume(client)
--> 121         return df.selectExpr(
    122             *(
    123                 [

/usr/local/spark/python/pyspark/sql/dataframe.py in selectExpr(self, *expr)
   1433         if len(expr) == 1 and isinstance(expr[0], list):
   1434             expr = expr[0]
-> 1435         jdf = self._jdf.selectExpr(self._jseq(expr))
   1436         return DataFrame(jdf, self.sql_ctx)
   1437 

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    135                 # Hide where the exception came from that shows a non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

/usr/local/spark/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Undefined function: 'fat'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0


manhtd98 avatar May 05 '23 14:05 manhtd98