ibis
ibis copied to clipboard
feat(spark): can we support connectors better?
Is your feature request related to a problem?
No
What is the motivation behind your request?
In streaming engines, we need to connect to sources and sinks. Flink provides a relatively straightforward API so that all the connector configurations can be passed in as table properties, and everything can be written in SQL (including the definition of watermarks, connector configurations, etc). Because of that, we're able to have a relatively clean API in Ibis to define connectors in the Flink backend.
On the other hand, this is not the case in Spark. There is a lot of code you need to write in a Spark streaming job, including manually parsing the Kafka messages, from binary to string, then to your expected schema. E.g.,
## to set up the source
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructField, StructType, TimestampType, LongType, DoubleType, IntegerType
schema = StructType(
[
StructField('createTime', TimestampType(), True),
StructField('orderId', LongType(), True),
StructField('payAmount', DoubleType(), True),
StructField('payPlatform', IntegerType(), True),
StructField('provinceId', IntegerType(), True),
])
streaming_df = session.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "payment_msg")\
.option("startingOffsets","earliest")\
.load()\
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
.select("parsed_value.*")\
.withWatermark("createTime", "10 seconds")
streaming_df.createOrReplaceTempView("streaming_df")
## this is where ibis code would go
## to set up the sink
result_df = (window_agg
.writeStream
.outputMode("append")
.format("kafka")
.option("checkpointLocation", "checkpoint")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.start())
Obviously, this is connector-specific, but Spark also only supports a handful of connectors ootb (for any additional ones that are not supported, the user can implement their own logic).
Spark does not support defining watermarks in SQL so the watermark definition has to be written in Spark DataFrame API. The motivation is to make the UX better so that the user 1) doesn't need to write a bunch of boilerplate code and 2) doesn't need to know where the pyspark code stops and the ibis code begins.
Describe the solution you'd like
Support connectors better in Spark. Still need some more design around how we're going to do this exactly, and to what extent we can do this, but, e.g., a starting point could be to wrap all the options in a kwarg.
What version of ibis are you running?
main
What backend(s) are you using, if any?
Spark
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Existing Ibis API
In Flink and RW, we allow the user to pass all the configurations into create_table/create_source/create_sink and it gets compiled into a SQL query:
# RW
CREATE SOURCE s1 (
product VARCHAR,
user VARCHAR,
price DOUBLE PRECISION
order_time TIMESTAMP,
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
connector = 'kafka',
topic = 'test_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
# Flink
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
The PySpark backend currently has the following methods:
read_csvread_deltaread_jsonread_parquet
In Flink, streaming/batch is configured in the environment settings (managed by the user). In Spark, streaming/batch is configured on the source/sink.
A concrete example:
Flink: If the user creates a streaming env and then uses read_parquet to read a parquet file/directory, the parquet file/directory is read as a streaming source.
Spark: If the user uses read_parquet to read a parquet file/directory, it's not clear whether it should be read as a batch source or a streaming source.
Proposals
We can deprioritize the sources and sinks used for testing purposes for now so we only need to support:
- File source (text, csv, json, orc, parquet)
- Kafka source
- File sink
- Kafka sink
- Foreach/ForeachBatch sink
How to distinguish between batch vs streaming:
- Match Flink behavior and force users to configure batch vs streaming on a session level.
session = SparkSession.builder.getOrCreate() # user manages the session object
batch_con = ibis.pyspark.connect(session, mode="batch")
streaming_con = ibis.pyspark.connect(session, mode="streaming")
Pros:
- Really straightforward implementation, really simple API
- Unified UX across streaming backends
Cons:
- User needs to create separate connections if they have both batch and streaming workloads
- May be unintuitive if the user comes from Spark API
- There are some streaming connectors that are not supported in batch, and we would still need to add new APIs for them
- Introduce a
modeparameter to distinguish between batch and streaming sources/sinks.
session = SparkSession.builder.getOrCreate()
con = ibis.pyspark.connect(session)
# read parquet as batch source
con.read_parquet(path="some/parquet/path", mode="batch", ...)
# read csv as streaming source
con.read_csv(path="some/csv/path", mode="streaming", ...)
# write parquet as batch source
con.to_parquet(path="some/parquet/path", mode="batch", ...)
In streaming mode we need to allow the user to define watermarks. This could be done as a parameter on the read_X API.
Pros:
- Pretty intuitive
- Matches with how sources and sinks are managed using PySpark
Cons:
- Significantly more APIs to maintain and not very scalable, especially if Spark adds/drops connector support upstream
New connectors
- Kafka source
- Kafka sink
- There are some connectors that are mostly used for testing so maybe we don't need to support them atm
- I'm not sure if it makes sense to support ORC/text separately
- Foreach/ForeachBatch are essentially UDFs... We could provide a wrapper around the method for consistency but there's not much we can do other than that.
[NOTE] We need to manage temporary views for the user (either implicitly or explicitly) in order to run SQL queries in Spark streaming.
- Implicit: when a source is created, a temporary view is also created on top of it.
- Explicit: the user needs to define a source, and then create a view, before executing any query.
- Introduce new
read_X/to_XAPIs
Kafa source:
# 1. you can't read kafka as a batch source so we don't need the `mode` parameter here
# 2. provide an option to parse the data for the user because spark doesn't automatically parse the data
con.read_kafka(options={"kafka.bootstrap.servers": "host1:port1,host2:port2", ...}, schema=schema, auto_format=True)
Kafka sink:
# provide an option to format the data for the user before writing into the sink because spark doesn't do the conversion to json automatically
con.to_kafka(options={"kafka.bootstrap.servers": "host1:port1,host2:port2", ...}, schema=schema, auto_format=True)
You need to actually call .start() to start a streaming query and it starts the query in the background (non-blocking) but it will continuously print logs to console/notebook output.
- Introduce a generic file system source/sink
Basically expose a generic method that has csv/parquet/json/etc as a param for format.
- Introduce new
create_sourceandcreate_sinkAPIs on the PySpark backend. Pretty similar to what we do in the Flink/RW backend; we're just passing the kwargs we receive to the underlying PySpark method.
The difference is that you cannot define sinks independently of the query in Spark. In Flink you can do something like: create source table, create sink table, define a transformation, then insert the transformation into the sink table that you already created.
Sources and sinks are assumed to be streaming, so we don't need to distinguish between batch and streaming.
t = con.create_source(format="kafka", options={"kafka.bootstrap.servers": "host1:port1,host2:port2", ...}, watermark=...)
expr = ...
con.create_sink(expr, format="kafka", options={...})
Managing streaming queries
I'm not sure if this is something that we should get into, but basically PySpark provides methods to interact with the streaming query.
Close as done.