ibis icon indicating copy to clipboard operation
ibis copied to clipboard

feat(pyspark): implement native CSV/Parquet export

Open deepyaman opened this issue 1 year ago • 3 comments

@mark-druffel needs Spark-native export functionality. We had put off implementing the fast path because we were trying to figure out if we could unify output options, but we should probably add the fast path until we figure out a better alternative.

deepyaman avatar Apr 02 '24 00:04 deepyaman

~(Still in draft, because I need to replace the tests I xfailed with Spark-specific tests.)~ Ready!

deepyaman avatar Apr 02 '24 01:04 deepyaman

Apologies for the drive by comment (I'm on leave, so don't block merging this on me). IMO we do want to figure out a consistent story around the artifacts produced by to_parquet/to_csv across backends. Having one backend produce a directory and another produce just a single file feels odd IMO and defeats the portable nature of ibis code. See #8584 for an issue outlining the issue (IMO options 4 or 5 are best). I'd rather we do this "correct" now than have to deprecate and change this behavior later.

jcrist avatar Apr 03 '24 14:04 jcrist

@deepyaman Thanks again for opening this PR! We were able to write a work around in our use case so this isn't a rush for us although it would be nice so that we don't have to continue maintaining a work around...

On a separate note, while implementing our work around I tried to use compile as you did in your code. I'm guessing I'm just using it improperly or it's maybe a version issue... I tried it a few different ways and always got back to the same error:

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

I'm just sharing this in case it's an actual issue. I'm using pyspark 3.3.4 & ibis 8.0.0. Below is a reproducible example.

import ibis
from ibis import _
from pyspark.sql import SparkSession
ibis.options.interactive = False

filepath = "penguins.parquet"
table_name = "penguins"
ibis.examples.penguins.fetch().to_parquet(path = filepath)
spark = SparkSession.builder.getOrCreate()
ispark = ibis.pyspark.connect(spark)
df = ispark.read_parquet(filepath, table_name = table_name)
expr = (df
 .filter(_.bill_length_mm < 37)
 .filter(_.bill_depth_mm > 17)
 .group_by(by = [_.sex, _.species])
 .agg(
     n = _.count(),
     avg_bill_length_mm = _.bill_length_mm.mean(),
     avg_bill_depth_mm = _.bill_depth_mm.mean()
 )
 .order_by(-_.n)
).as_table()
df = ispark._session.sql(ispark.compile(expr))

Error

AttributeError                            Traceback (most recent call last)
Cell In[31], line 23
     11 df = ispark.read_parquet(filepath, table_name = table_name)
     12 expr = (df
     13  .filter(_.bill_length_mm < 37)
     14  .filter(_.bill_depth_mm > 17)
   (...)
     21  .order_by(-_.n)
     22 ).as_table()
---> 23 df = ispark._session.sql(ispark.compile(expr))

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/pyspark/sql/session.py:1034, in SparkSession.sql(self, sqlQuery, **kwargs)
   1032     sqlQuery = formatter.format(sqlQuery, **kwargs)
   1033 try:
-> 1034     return DataFrame(self._jsparkSession.sql(sqlQuery), self)
   1035 finally:
   1036     if len(kwargs) > 0:

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/java_gateway.py:1313, in JavaMember.__call__(self, *args)
   1312 def __call__(self, *args):
-> 1313     args_command, temp_args = self._build_args(*args)
   1315     command = proto.CALL_COMMAND_NAME +\
   1316         self.command_header +\
   1317         args_command +\
   1318         proto.END_COMMAND_PART
   1320     answer = self.gateway_client.send_command(command)

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/java_gateway.py:1283, in JavaMember._build_args(self, *args)
   1279     new_args = args
   1280     temp_args = []
   1282 args_command = "".join(
-> 1283     [get_command_part(arg, self.pool) for arg in new_args])
   1285 return args_command, temp_args

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/java_gateway.py:1283, in <listcomp>(.0)
   1279     new_args = args
   1280     temp_args = []
   1282 args_command = "".join(
-> 1283     [get_command_part(arg, self.pool) for arg in new_args])
   1285 return args_command, temp_args

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/protocol.py:298, in get_command_part(parameter, python_proxy_pool)
    296         command_part += ";" + interface
    297 else:
--> 298     command_part = REFERENCE_TYPE + parameter._get_object_id()
    300 command_part += "\n"
    302 return command_part

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/pyspark/sql/dataframe.py:1988, in DataFrame.__getattr__(self, name)
   1978 """Returns the :class:`Column` denoted by ``name``.
   1979 
   1980 .. versionadded:: 1.3.0
   (...)
   1985 [Row(age=2), Row(age=5)]
   1986 """
   1987 if name not in self.columns:
-> 1988     raise AttributeError(
   1989         "'%s' object has no attribute '%s'" % (self.__class__.__name__, name)
   1990     )
   1991 jc = self._jdf.apply(name)
   1992 return Column(jc)

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

mark-druffel avatar Apr 11 '24 22:04 mark-druffel

@deepyaman Is this PR still viable? @chloeh13q Implemented the various to_*_dir methods recently, so maybe this can be closed?

cpcloud avatar Jul 15 '24 19:07 cpcloud

@deepyaman Is this PR still viable? @chloeh13q Implemented the various to_*_dir methods recently, so maybe this can be closed?

I'll double-check tomorrow!

deepyaman avatar Jul 16 '24 06:07 deepyaman

@cpcloud @chloeh13q I think most things are covered, but the changes to to_delta should still be relevant. I can update this PR (including title) to just have that, if it makes sense.

deepyaman avatar Jul 18 '24 05:07 deepyaman

@deepyaman Yeah, let's wrap up the to_delta bits and then merge this!

cpcloud avatar Jul 18 '24 12:07 cpcloud

@deepyaman Yeah, let's wrap up the to_delta bits and then merge this!

Pared it down to just that change.

deepyaman avatar Jul 18 '24 13:07 deepyaman