ibis
ibis copied to clipboard
feat(pyspark): implement native CSV/Parquet export
@mark-druffel needs Spark-native export functionality. We had put off implementing the fast path because we were trying to figure out if we could unify output options, but we should probably add the fast path until we figure out a better alternative.
~(Still in draft, because I need to replace the tests I xfailed with Spark-specific tests.)~ Ready!
Apologies for the drive by comment (I'm on leave, so don't block merging this on me). IMO we do want to figure out a consistent story around the artifacts produced by to_parquet/to_csv across backends. Having one backend produce a directory and another produce just a single file feels odd IMO and defeats the portable nature of ibis code. See #8584 for an issue outlining the issue (IMO options 4 or 5 are best). I'd rather we do this "correct" now than have to deprecate and change this behavior later.
@deepyaman Thanks again for opening this PR! We were able to write a work around in our use case so this isn't a rush for us although it would be nice so that we don't have to continue maintaining a work around...
On a separate note, while implementing our work around I tried to use compile as you did in your code. I'm guessing I'm just using it improperly or it's maybe a version issue... I tried it a few different ways and always got back to the same error:
AttributeError: 'DataFrame' object has no attribute '_get_object_id'
I'm just sharing this in case it's an actual issue. I'm using pyspark 3.3.4 & ibis 8.0.0. Below is a reproducible example.
import ibis
from ibis import _
from pyspark.sql import SparkSession
ibis.options.interactive = False
filepath = "penguins.parquet"
table_name = "penguins"
ibis.examples.penguins.fetch().to_parquet(path = filepath)
spark = SparkSession.builder.getOrCreate()
ispark = ibis.pyspark.connect(spark)
df = ispark.read_parquet(filepath, table_name = table_name)
expr = (df
.filter(_.bill_length_mm < 37)
.filter(_.bill_depth_mm > 17)
.group_by(by = [_.sex, _.species])
.agg(
n = _.count(),
avg_bill_length_mm = _.bill_length_mm.mean(),
avg_bill_depth_mm = _.bill_depth_mm.mean()
)
.order_by(-_.n)
).as_table()
df = ispark._session.sql(ispark.compile(expr))
Error
AttributeError Traceback (most recent call last)
Cell In[31], line 23
11 df = ispark.read_parquet(filepath, table_name = table_name)
12 expr = (df
13 .filter(_.bill_length_mm < 37)
14 .filter(_.bill_depth_mm > 17)
(...)
21 .order_by(-_.n)
22 ).as_table()
---> 23 df = ispark._session.sql(ispark.compile(expr))
File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/pyspark/sql/session.py:1034, in SparkSession.sql(self, sqlQuery, **kwargs)
1032 sqlQuery = formatter.format(sqlQuery, **kwargs)
1033 try:
-> 1034 return DataFrame(self._jsparkSession.sql(sqlQuery), self)
1035 finally:
1036 if len(kwargs) > 0:
File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/java_gateway.py:1313, in JavaMember.__call__(self, *args)
1312 def __call__(self, *args):
-> 1313 args_command, temp_args = self._build_args(*args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/java_gateway.py:1283, in JavaMember._build_args(self, *args)
1279 new_args = args
1280 temp_args = []
1282 args_command = "".join(
-> 1283 [get_command_part(arg, self.pool) for arg in new_args])
1285 return args_command, temp_args
File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/java_gateway.py:1283, in <listcomp>(.0)
1279 new_args = args
1280 temp_args = []
1282 args_command = "".join(
-> 1283 [get_command_part(arg, self.pool) for arg in new_args])
1285 return args_command, temp_args
File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/protocol.py:298, in get_command_part(parameter, python_proxy_pool)
296 command_part += ";" + interface
297 else:
--> 298 command_part = REFERENCE_TYPE + parameter._get_object_id()
300 command_part += "\n"
302 return command_part
File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/pyspark/sql/dataframe.py:1988, in DataFrame.__getattr__(self, name)
1978 """Returns the :class:`Column` denoted by ``name``.
1979
1980 .. versionadded:: 1.3.0
(...)
1985 [Row(age=2), Row(age=5)]
1986 """
1987 if name not in self.columns:
-> 1988 raise AttributeError(
1989 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name)
1990 )
1991 jc = self._jdf.apply(name)
1992 return Column(jc)
AttributeError: 'DataFrame' object has no attribute '_get_object_id'
@deepyaman Is this PR still viable? @chloeh13q Implemented the various to_*_dir methods recently, so maybe this can be closed?
@deepyaman Is this PR still viable? @chloeh13q Implemented the various
to_*_dirmethods recently, so maybe this can be closed?
I'll double-check tomorrow!
@cpcloud @chloeh13q I think most things are covered, but the changes to to_delta should still be relevant. I can update this PR (including title) to just have that, if it makes sense.
@deepyaman Yeah, let's wrap up the to_delta bits and then merge this!
@deepyaman Yeah, let's wrap up the
to_deltabits and then merge this!
Pared it down to just that change.