Run a pipeline without the need for decorators (data_loaders, transformers, data_exporters)
I'm testing Mage to see if it can be a good fit for a project I'm working on. I already have a pipeline in notebooks, using Spark Structured Streaming. I tried to just copy the notebooks to mage and use scratchpad, and it works fine, but then it's not possible to run it as a pipeline. I then tried to use data loader, transformer and data exporter, but I get some strange errors.
The pipeline reads from a Kafka topic using spark structured streaming, so first I thought I could just wrap the spark read method in a decorator and it would work. So I created a streaming pipeline, but then I can't make the data loader in spark and have to use the kafka yaml file. So instead I tried to use batch pipeline. Here is a super simple example
from pyspark.sql import SparkSession
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
spark = SparkSession.builder.appName("foo").getOrCreate()
@data_loader
def load_data(*args, **kwargs):
return (spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("subscribe", "example")
.option("startingOffsets", "earliest")
.load())
@test
def test_output(df, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert df is not None, 'The output is undefined'
But I get the following error
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
Cell In[7], line 71
68 else:
69 return find(lambda val: val is not None, output)
---> 71 df = execute_custom_code()
73 # Post processing code below (source: output_display.py)
76 def __custom_output():
Cell In[7], line 55, in execute_custom_code()
50 block.run_upstream_blocks()
52 global_vars = {'env': 'dev', 'execution_date': datetime.datetime(2023, 1, 2, 21, 0, 21, 934826), 'event': {}} or dict()
---> 55 block_output = block.execute_sync(
56 custom_code=code,
57 global_vars=global_vars,
58 analyze_outputs=True,
59 update_status=True,
60 test_execution=True,
61 )
62 if False:
63 block.run_tests(custom_code=code, update_tests=False)
File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/block/__init__.py:575, in Block.execute_sync(self, analyze_outputs, build_block_output_stdout, custom_code, execution_partition, global_vars, logger, run_all_blocks, test_execution, update_status, store_variables, verify_output, input_from_output, runtime_arguments, dynamic_block_index, dynamic_block_uuid, dynamic_upstream_block_uuids)
568 if logger is not None:
569 logger.exception(
570 f'Failed to execute block {self.uuid}',
571 block_type=self.type,
572 block_uuid=self.uuid,
573 error=err,
574 )
--> 575 raise err
576 finally:
577 if update_status:
File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/block/__init__.py:544, in Block.execute_sync(self, analyze_outputs, build_block_output_stdout, custom_code, execution_partition, global_vars, logger, run_all_blocks, test_execution, update_status, store_variables, verify_output, input_from_output, runtime_arguments, dynamic_block_index, dynamic_block_uuid, dynamic_upstream_block_uuids)
542 if store_variables and self.pipeline.type != PipelineType.INTEGRATION:
543 try:
--> 544 self.store_variables(
545 variable_mapping,
546 execution_partition=execution_partition,
547 override_outputs=True,
548 spark=(global_vars or dict()).get('spark'),
549 dynamic_block_uuid=dynamic_block_uuid,
550 )
551 except ValueError as e:
552 if str(e) == 'Circular reference detected':
File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/block/__init__.py:1210, in Block.store_variables(self, variable_mapping, execution_partition, override, override_outputs, spark, dynamic_block_uuid)
1208 if spark is not None and type(data) is pd.DataFrame:
1209 data = spark.createDataFrame(data)
-> 1210 self.pipeline.variable_manager.add_variable(
1211 self.pipeline.uuid,
1212 uuid_to_use,
1213 uuid,
1214 data,
1215 partition=execution_partition,
1216 )
1218 for uuid in removed_variables:
1219 self.pipeline.variable_manager.delete_variable(
1220 self.pipeline.uuid,
1221 uuid_to_use,
1222 uuid,
1223 )
File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/variable_manager.py:72, in VariableManager.add_variable(self, pipeline_uuid, block_uuid, variable_uuid, data, partition, variable_type)
70 variable.delete()
71 variable.variable_type = variable_type
---> 72 variable.write_data(data)
File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/variable.py:153, in Variable.write_data(self, data)
151 self.__write_parquet(data)
152 elif self.variable_type == VariableType.SPARK_DATAFRAME:
--> 153 self.__write_spark_parquet(data)
154 elif self.variable_type == VariableType.GEO_DATAFRAME:
155 self.__write_geo_dataframe(data)
File /usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/variable.py:279, in Variable.__write_spark_parquet(self, data)
277 def __write_spark_parquet(self, data) -> None:
278 (
--> 279 data.write
280 .option('header', 'True')
281 .mode('overwrite')
282 .csv(self.variable_path)
283 )
File /usr/local/lib/python3.10/site-packages/pyspark/sql/dataframe.py:338, in DataFrame.write(self)
326 @property
327 def write(self) -> DataFrameWriter:
328 """
329 Interface for saving the content of the non-streaming :class:`DataFrame` out into external
330 storage.
(...)
336 :class:`DataFrameWriter`
337 """
--> 338 return DataFrameWriter(self)
File /usr/local/lib/python3.10/site-packages/pyspark/sql/readwriter.py:731, in DataFrameWriter.__init__(self, df)
729 self._df = df
730 self._spark = df.sparkSession
--> 731 self._jwrite = df._jdf.write()
File /usr/local/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /usr/local/lib/python3.10/site-packages/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
192 converted = convert_exception(e.java_exception)
193 if not isinstance(converted, UnknownException):
194 # Hide where the exception came from that shows a non-Pythonic
195 # JVM exception message.
--> 196 raise converted from None
197 else:
198 raise
AnalysisException: 'write' can not be called on streaming Dataset/DataFrame
Any idea why this isn't working?
Will it be possible in the future to run a pipeline without the data loader, transformer and data exporter?
👀 👀
Hi @albertfreist , for streaming pipelines, we support it in a different way (you don't need to write. When you create a pipeline, you can select "Streaming" type. We currently support Kafka as the source and allow you write transformation code with Python. You can check out this guide: https://docs.mage.ai/guides/streaming-pipeline#create-a-new-pipeline "Spark Structured Streaming" is not supported yet but it's on our immediate roadmap.
Hi @albertfreist! We've added your request on our roadmap and currently planned to be released by the end of April. You can see our current roadmap here: https://airtable.com/shrJctaGt4tvU27uF/tbl5bIlFkk2KhCpL4
Thank you so much for your patience and we'll keep you updated and let you know when the feature is available! If this is an urgent matter, please let us know so we can discuss further.