mage-ai icon indicating copy to clipboard operation
mage-ai copied to clipboard

Run a pipeline without the need for decorators (data_loaders, transformers, data_exporters)

Open albertfreist opened this issue 3 years ago • 2 comments

I'm testing Mage to see if it can be a good fit for a project I'm working on. I already have a pipeline in notebooks, using Spark Structured Streaming. I tried to just copy the notebooks to mage and use scratchpad, and it works fine, but then it's not possible to run it as a pipeline. I then tried to use data loader, transformer and data exporter, but I get some strange errors.

The pipeline reads from a Kafka topic using spark structured streaming, so first I thought I could just wrap the spark read method in a decorator and it would work. So I created a streaming pipeline, but then I can't make the data loader in spark and have to use the kafka yaml file. So instead I tried to use batch pipeline. Here is a super simple example

from pyspark.sql import SparkSession

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

spark = SparkSession.builder.appName("foo").getOrCreate()


@data_loader
def load_data(*args, **kwargs):
    return (spark.readStream.format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9093")
            .option("subscribe", "example")
            .option("startingOffsets", "earliest")
            .load())


@test
def test_output(df, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert df is not None, 'The output is undefined'

But I get the following error

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[7], line 71
     68     else:
     69         return find(lambda val: val is not None, output)
---> 71 df = execute_custom_code()
     73 # Post processing code below (source: output_display.py)
     76 def __custom_output():

Cell In[7], line 55, in execute_custom_code()
     50     block.run_upstream_blocks()
     52 global_vars = {'env': 'dev', 'execution_date': datetime.datetime(2023, 1, 2, 21, 0, 21, 934826), 'event': {}} or dict()
---> 55 block_output = block.execute_sync(
     56     custom_code=code,
     57     global_vars=global_vars,
     58     analyze_outputs=True,
     59     update_status=True,
     60     test_execution=True,
     61 )
     62 if False:
     63     block.run_tests(custom_code=code, update_tests=False)

File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/block/__init__.py:575, in Block.execute_sync(self, analyze_outputs, build_block_output_stdout, custom_code, execution_partition, global_vars, logger, run_all_blocks, test_execution, update_status, store_variables, verify_output, input_from_output, runtime_arguments, dynamic_block_index, dynamic_block_uuid, dynamic_upstream_block_uuids)
    568     if logger is not None:
    569         logger.exception(
    570             f'Failed to execute block {self.uuid}',
    571             block_type=self.type,
    572             block_uuid=self.uuid,
    573             error=err,
    574         )
--> 575     raise err
    576 finally:
    577     if update_status:

File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/block/__init__.py:544, in Block.execute_sync(self, analyze_outputs, build_block_output_stdout, custom_code, execution_partition, global_vars, logger, run_all_blocks, test_execution, update_status, store_variables, verify_output, input_from_output, runtime_arguments, dynamic_block_index, dynamic_block_uuid, dynamic_upstream_block_uuids)
    542 if store_variables and self.pipeline.type != PipelineType.INTEGRATION:
    543     try:
--> 544         self.store_variables(
    545             variable_mapping,
    546             execution_partition=execution_partition,
    547             override_outputs=True,
    548             spark=(global_vars or dict()).get('spark'),
    549             dynamic_block_uuid=dynamic_block_uuid,
    550         )
    551     except ValueError as e:
    552         if str(e) == 'Circular reference detected':

File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/block/__init__.py:1210, in Block.store_variables(self, variable_mapping, execution_partition, override, override_outputs, spark, dynamic_block_uuid)
   1208     if spark is not None and type(data) is pd.DataFrame:
   1209         data = spark.createDataFrame(data)
-> 1210     self.pipeline.variable_manager.add_variable(
   1211         self.pipeline.uuid,
   1212         uuid_to_use,
   1213         uuid,
   1214         data,
   1215         partition=execution_partition,
   1216     )
   1218 for uuid in removed_variables:
   1219     self.pipeline.variable_manager.delete_variable(
   1220         self.pipeline.uuid,
   1221         uuid_to_use,
   1222         uuid,
   1223     )

File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/variable_manager.py:72, in VariableManager.add_variable(self, pipeline_uuid, block_uuid, variable_uuid, data, partition, variable_type)
     70 variable.delete()
     71 variable.variable_type = variable_type
---> 72 variable.write_data(data)

File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/variable.py:153, in Variable.write_data(self, data)
    151     self.__write_parquet(data)
    152 elif self.variable_type == VariableType.SPARK_DATAFRAME:
--> 153     self.__write_spark_parquet(data)
    154 elif self.variable_type == VariableType.GEO_DATAFRAME:
    155     self.__write_geo_dataframe(data)

File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/variable.py:279, in Variable.__write_spark_parquet(self, data)
    277 def __write_spark_parquet(self, data) -> None:
    278     (
--> 279         data.write
    280         .option('header', 'True')
    281         .mode('overwrite')
    282         .csv(self.variable_path)
    283     )

File /usr/local/lib/python3.10/site-packages/pyspark/sql/dataframe.py:338, in DataFrame.write(self)
    326 @property
    327 def write(self) -> DataFrameWriter:
    328     """
    329     Interface for saving the content of the non-streaming :class:`DataFrame` out into external
    330     storage.
   (...)
    336     :class:`DataFrameWriter`
    337     """
--> 338     return DataFrameWriter(self)

File /usr/local/lib/python3.10/site-packages/pyspark/sql/readwriter.py:731, in DataFrameWriter.__init__(self, df)
    729 self._df = df
    730 self._spark = df.sparkSession
--> 731 self._jwrite = df._jdf.write()

File /usr/local/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /usr/local/lib/python3.10/site-packages/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
    192 converted = convert_exception(e.java_exception)
    193 if not isinstance(converted, UnknownException):
    194     # Hide where the exception came from that shows a non-Pythonic
    195     # JVM exception message.
--> 196     raise converted from None
    197 else:
    198     raise

AnalysisException: 'write' can not be called on streaming Dataset/DataFrame

Any idea why this isn't working?

Will it be possible in the future to run a pipeline without the data loader, transformer and data exporter?

albertfreist avatar Jan 02 '23 21:01 albertfreist

👀 👀

tommydangerous avatar Jan 02 '23 21:01 tommydangerous

Hi @albertfreist , for streaming pipelines, we support it in a different way (you don't need to write. When you create a pipeline, you can select "Streaming" type. We currently support Kafka as the source and allow you write transformation code with Python. You can check out this guide: https://docs.mage.ai/guides/streaming-pipeline#create-a-new-pipeline "Spark Structured Streaming" is not supported yet but it's on our immediate roadmap.

wangxiaoyou1993 avatar Jan 05 '23 19:01 wangxiaoyou1993

Hi @albertfreist! We've added your request on our roadmap and currently planned to be released by the end of April. You can see our current roadmap here: https://airtable.com/shrJctaGt4tvU27uF/tbl5bIlFkk2KhCpL4

Thank you so much for your patience and we'll keep you updated and let you know when the feature is available! If this is an urgent matter, please let us know so we can discuss further.

thomaschung408 avatar Feb 17 '23 22:02 thomaschung408