spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-39821][PYTHON][PS] Fix error during using DatetimeIndex

Open bzhaoopenstack opened this issue 2 years ago • 11 comments

Pandas disallow conversion between datetime/timedelta and conversions for any datetimelike to float.

This will raise error in PYSPARK, during we simply call a DatetimeIndex. So we need to avoid to call astype with datetime64.

BTW, PYSPARK PANDAS announces that won't support DatetimeTZD type. So lets skip datetime64 type only in base repr func in Index.

What changes were proposed in this pull request?

Skip datetime64 type during exec astype to convert by pandas in repr func.

Why are the changes needed?

Improve the experience of spark python developers

Does this PR introduce any user-facing change?

No

How was this patch tested?

Using Python version 3.8.13 (default, Jun 29 2022 11:50:19)
Spark context Web UI available at http://172.25.179.45:4042
Spark context available as 'sc' (master = local[*], app id = local-1658283215853).
SparkSession available as 'spark'.
>>> from pyspark import pandas as ps
WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
>>> ps.DatetimeIndex(['1970-01-01', '1970-01-01', '1970-01-01'])
/home/spark/spark/python/pyspark/pandas/internal.py:1573: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  fields = [
/home/spark/spark/python/pyspark/sql/pandas/conversion.py:486: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for column, series in pdf.iteritems():
/home/spark/.pyenv/versions/3.8.13/lib/python3.8/site-packages/_pydevd_bundle/pydevd_utils.py:601: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for item in s.iteritems():
/home/spark/.pyenv/versions/3.8.13/lib/python3.8/site-packages/_pydevd_bundle/pydevd_utils.py:601: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for item in s.iteritems():
/home/spark/.pyenv/versions/3.8.13/lib/python3.8/site-packages/_pydevd_bundle/pydevd_utils.py:601: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for item in s.iteritems():
/home/spark/.pyenv/versions/3.8.13/lib/python3.8/site-packages/_pydevd_bundle/pydevd_utils.py:601: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for item in s.iteritems():
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/spark/spark/python/pyspark/pandas/indexes/base.py", line 2770, in __repr__
    pindex = self._psdf._get_or_create_repr_pandas_cache(max_display_count).index
  File "/home/spark/spark/python/pyspark/pandas/frame.py", line 12780, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "/home/spark/spark/python/pyspark/pandas/frame.py", line 12775, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "/home/spark/spark/python/pyspark/pandas/utils.py", line 589, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "/home/spark/spark/python/pyspark/pandas/internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "/home/spark/spark/python/pyspark/sql/pandas/conversion.py", line 248, in toPandas
    series = series.astype(t, copy=False)
  File "/home/spark/upstream/pandas/pandas/core/generic.py", line 6095, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 386, in astype
    return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 308, in apply
    applied = getattr(b, f)(**kwargs)
  File "/home/spark/upstream/pandas/pandas/core/internals/blocks.py", line 526, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 299, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 227, in astype_array
    values = values.astype(dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimes.py", line 631, in astype
    return dtl.DatetimeLikeArrayMixin.astype(self, dtype, copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py", line 504, in astype
    raise TypeError(msg)
TypeError: Cannot cast DatetimeArray to dtype datetime64


bzhaoopenstack avatar Jul 20 '22 06:07 bzhaoopenstack

Looks there are other testcases need to be fixed. This is I testing on master without any change.

spark@DESKTOP-U0I7MO9:~/spark$ python/run-tests --testnames 'pyspark.sql.tests.test_dataframe'
Running PySpark tests. Output is in /home/spark/spark/python/unit-tests.log
Will test against the following Python executables: ['/home/spark/.pyenv/versions/3.8.13/bin/python3']
Will test the following Python tests: ['pyspark.sql.tests.test_dataframe']
/home/spark/.pyenv/versions/3.8.13/bin/python3 python_implementation is CPython
/home/spark/.pyenv/versions/3.8.13/bin/python3 version is: Python 3.8.13
Starting test(/home/spark/.pyenv/versions/3.8.13/bin/python3): pyspark.sql.tests.test_dataframe (temp output: /tmp/home_spark_.pyenv_versions_3.8.13_bin_python3__pyspark.sql.tests.test_dataframe__3gog72u3.log)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                
Running tests...
----------------------------------------------------------------------
  test_cache (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.535s)
  test_column_iterator (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.005s)
  test_create_dataframe_from_array_of_long (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (1.341s)
  test_create_dataframe_from_pandas_with_day_time_interval (pyspark.sql.tests.test_dataframe.DataFrameTests) ... /home/spark/spark/python/pyspark/sql/pandas/conversion.py:474: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for column, series in pdf.iteritems():
/home/spark/spark/python/pyspark/sql/pandas/conversion.py:486: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for column, series in pdf.iteritems():
OK (0.156s)
  test_create_dataframe_from_pandas_with_dst (pyspark.sql.tests.test_dataframe.DataFrameTests) ... /home/spark/spark/python/pyspark/sql/pandas/conversion.py:474: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for column, series in pdf.iteritems():
/home/spark/spark/python/pyspark/sql/pandas/conversion.py:486: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for column, series in pdf.iteritems():
ERROR (0.140s)
  test_create_dataframe_from_pandas_with_timestamp (pyspark.sql.tests.test_dataframe.DataFrameTests) ... /home/spark/spark/python/pyspark/sql/pandas/conversion.py:474: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for column, series in pdf.iteritems():
/home/spark/spark/python/pyspark/sql/pandas/conversion.py:486: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for column, series in pdf.iteritems():
OK (0.120s)
  test_create_dataframe_required_pandas_not_found (pyspark.sql.tests.test_dataframe.DataFrameTests) ... SKIP (0.000s)
  test_create_nan_decimal_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.207s)
  test_df_show (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.940s)
  test_drop_duplicates (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (1.007s)
  test_dropna (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (1.602s)
  test_duplicated_column_names (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.195s)
  test_extended_hint_types (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.114s)
  test_fillna (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (1.543s)
  test_freqItems (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.288s)
  test_generic_hints (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.127s)
  test_help_command (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.372s)
  test_input_files (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (1.143s)
  test_invalid_join_method (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.050s)
  test_join_without_on (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.152s)
  test_observe (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.425s)
  test_observe_str (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (10.161s)
  test_pandas_api (pyspark.sql.tests.test_dataframe.DataFrameTests) ... /home/spark/spark/python/pyspark/pandas/utils.py:976: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small.
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)
/home/spark/spark/python/pyspark/pandas/utils.py:976: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small.
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)
OK (0.697s)
  test_range (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.235s)
  test_repartitionByRange_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (1.960s)
  test_replace (pyspark.sql.tests.test_dataframe.DataFrameTests) ... /home/spark/spark/python/pyspark/sql/dataframe.py:2791: UserWarning: to_replace is a dict and value is not None. value will be ignored.
  warnings.warn("to_replace is a dict and value is not None. value will be ignored.")
OK (2.918s)
  test_repr_behaviors (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.835s)
  test_require_cross (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.556s)
  test_same_semantics_error (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.023s)
  test_sample (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.042s)
  test_toDF_with_schema_string (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (1.424s)
  test_to_local_iterator (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.535s)
  test_to_local_iterator_not_fully_consumed (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (2.608s)
  test_to_local_iterator_prefetch (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.262s)
  test_to_pandas (pyspark.sql.tests.test_dataframe.DataFrameTests) ... ERROR (0.093s)
  test_to_pandas_avoid_astype (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.094s)
  test_to_pandas_from_empty_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ... ERROR (0.396s)
  test_to_pandas_from_mixed_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ... ERROR (0.167s)
  test_to_pandas_from_null_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.078s)
  test_to_pandas_on_cross_join (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.162s)
  test_to_pandas_required_pandas_not_found (pyspark.sql.tests.test_dataframe.DataFrameTests) ... SKIP (0.000s)
  test_to_pandas_with_duplicated_column_names (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.051s)
  test_with_column_with_existing_name (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.085s)
  test_with_columns (pyspark.sql.tests.test_dataframe.DataFrameTests) ... OK (0.254s)
  test_query_execution_listener_on_collect (pyspark.sql.tests.test_dataframe.QueryExecutionListenerTests) ... OK (0.051s)
  test_query_execution_listener_on_collect_with_arrow (pyspark.sql.tests.test_dataframe.QueryExecutionListenerTests) ... OK (0.043s)

======================================================================
ERROR [0.140s]: test_create_dataframe_from_pandas_with_dst (pyspark.sql.tests.test_dataframe.DataFrameTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/spark/spark/python/pyspark/sql/tests/test_dataframe.py", line 1008, in test_create_dataframe_from_pandas_with_dst
    assert_frame_equal(pdf, df.toPandas())
  File "/home/spark/spark/python/pyspark/sql/pandas/conversion.py", line 248, in toPandas
    series = series.astype(t, copy=False)
  File "/home/spark/upstream/pandas/pandas/core/generic.py", line 6095, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 386, in astype
    return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 308, in apply
    applied = getattr(b, f)(**kwargs)
  File "/home/spark/upstream/pandas/pandas/core/internals/blocks.py", line 526, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 299, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 227, in astype_array
    values = values.astype(dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimes.py", line 631, in astype
    return dtl.DatetimeLikeArrayMixin.astype(self, dtype, copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py", line 504, in astype
    raise TypeError(msg)
TypeError: Cannot cast DatetimeArray to dtype datetime64

======================================================================
ERROR [0.093s]: test_to_pandas (pyspark.sql.tests.test_dataframe.DataFrameTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/spark/spark/python/pyspark/sql/tests/test_dataframe.py", line 797, in test_to_pandas
    pdf = self._to_pandas()
  File "/home/spark/spark/python/pyspark/sql/tests/test_dataframe.py", line 791, in _to_pandas
    return df.toPandas()
  File "/home/spark/spark/python/pyspark/sql/pandas/conversion.py", line 248, in toPandas
    series = series.astype(t, copy=False)
  File "/home/spark/upstream/pandas/pandas/core/generic.py", line 6095, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 386, in astype
    return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 308, in apply
    applied = getattr(b, f)(**kwargs)
  File "/home/spark/upstream/pandas/pandas/core/internals/blocks.py", line 526, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 299, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 227, in astype_array
    values = values.astype(dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimes.py", line 631, in astype
    return dtl.DatetimeLikeArrayMixin.astype(self, dtype, copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py", line 504, in astype
    raise TypeError(msg)
TypeError: Cannot cast DatetimeArray to dtype datetime64

======================================================================
ERROR [0.396s]: test_to_pandas_from_empty_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/spark/spark/python/pyspark/sql/tests/test_dataframe.py", line 886, in test_to_pandas_from_empty_dataframe
    dtypes_when_nonempty_df = self.spark.sql(sql).toPandas().dtypes
  File "/home/spark/spark/python/pyspark/sql/pandas/conversion.py", line 248, in toPandas
    series = series.astype(t, copy=False)
  File "/home/spark/upstream/pandas/pandas/core/generic.py", line 6095, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 386, in astype
    return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 308, in apply
    applied = getattr(b, f)(**kwargs)
  File "/home/spark/upstream/pandas/pandas/core/internals/blocks.py", line 526, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 299, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 227, in astype_array
    values = values.astype(dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimes.py", line 631, in astype
    return dtl.DatetimeLikeArrayMixin.astype(self, dtype, copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py", line 504, in astype
    raise TypeError(msg)
TypeError: Cannot cast DatetimeArray to dtype datetime64

======================================================================
ERROR [0.167s]: test_to_pandas_from_mixed_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/spark/spark/python/pyspark/sql/tests/test_dataframe.py", line 952, in test_to_pandas_from_mixed_dataframe
    pdf_with_some_nulls = self.spark.sql(sql).toPandas()
  File "/home/spark/spark/python/pyspark/sql/pandas/conversion.py", line 248, in toPandas
    series = series.astype(t, copy=False)
  File "/home/spark/upstream/pandas/pandas/core/generic.py", line 6095, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 386, in astype
    return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 308, in apply
    applied = getattr(b, f)(**kwargs)
  File "/home/spark/upstream/pandas/pandas/core/internals/blocks.py", line 526, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 299, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 227, in astype_array
    values = values.astype(dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimes.py", line 631, in astype
    return dtl.DatetimeLikeArrayMixin.astype(self, dtype, copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py", line 504, in astype
    raise TypeError(msg)
TypeError: Cannot cast DatetimeArray to dtype datetime64

----------------------------------------------------------------------
Ran 46 tests in 41.235s

FAILED (errors=4, skipped=2)

Generating XML reports...
+---+
| _1|
+---+
|foo|
+---+

+---+
| _1|
+---+
|foo|
+---+

-RECORD 0--
 _1  | f   

+---+
| _1|
+---+
|  f|
+---+

+---+
| _1|
+---+
|  f|
+---+


Had test failures in pyspark.sql.tests.test_dataframe with /home/spark/.pyenv/versions/3.8.13/bin/python3; see logs.
spark@DESKTOP-U0I7MO9:~/spark$ 

bzhaoopenstack avatar Jul 20 '22 08:07 bzhaoopenstack

cc @zhengruifeng @xinrong-meng @itholic FYI

HyukjinKwon avatar Jul 20 '22 08:07 HyukjinKwon

Can one of the admins verify this patch?

AmplabJenkins avatar Jul 21 '22 00:07 AmplabJenkins

@bzhaoopenstack Thanks for reporting this!

BTW, I can not reproduce the case https://issues.apache.org/jira/browse/SPARK-39821 with master branch:

Python 3.9.12 (main, Apr  5 2022, 01:52:34)
Type 'copyright', 'credits' or 'license' for more information
IPython 8.4.0 -- An enhanced Interactive Python. Type '?' for help.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/21 17:26:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.0-SNAPSHOT
      /_/

Using Python version 3.9.12 (main, Apr  5 2022 01:52:34)
Spark context Web UI available at http://10.0.0.13:4040
Spark context available as 'sc' (master = local[*], app id = local-1658395592665).
SparkSession available as 'spark'.

In [1]: from pyspark import pandas as ps

In [2]: a = ps.DatetimeIndex(['1970-01-01', '1970-01-01', '1970-01-01'])

In [3]: a
DatetimeIndex(['1970-01-01', '1970-01-01', '1970-01-01'], dtype='datetime64[ns]', freq=None)

In [4]: a.__repr__()
Out[4]: "DatetimeIndex(['1970-01-01', '1970-01-01', '1970-01-01'], dtype='datetime64[ns]', freq=None)"

So should this fix target against 3.2.x?

zhengruifeng avatar Jul 21 '22 09:07 zhengruifeng

I also test with Spark 3.3.0 with Python 3.9.12, and it's fine.

could you help figure out whether this repr issue only exists in Spark 3.2.x or Python 3.8.x?

zhengruifeng avatar Jul 21 '22 09:07 zhengruifeng

I also test with Spark 3.3.0 with Python 3.9.12, and it's fine.

could you help figure out whether this repr issue only exists in Spark 3.2.x or Python 3.8.x?

Thanks Zheng, I test with Spark master and Pandas master branch, that's I just know. I think that's why we are confused about this test result.

bzhaoopenstack avatar Jul 21 '22 12:07 bzhaoopenstack

Is it dependent on pandas version being used? See also https://github.com/apache/spark/blob/master/dev/infra/Dockerfile

HyukjinKwon avatar Jul 22 '22 00:07 HyukjinKwon

Is it dependent on pandas version being used? See also https://github.com/apache/spark/blob/master/dev/infra/Dockerfile

Hi, I tested with pandas 1.3.X and 1.4.X. That's true that anything is OK and won't raise error. But in pandas master branch, that's true the pandas master still raise error. And in my env, it goes inside different code path.

The below is the good ones, pandas 1.3.x and 1.4.x

>>> from pyspark import pandas as ps
/home/spark/upstream/pandas/pandas/compat/__init__.py:124: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Atting to use lzma compression will result in a RuntimeError.
  warnings.warn(msg)
WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver andcutor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
>>> a = ps.DatetimeIndex(['1970-01-01', '1970-01-01', '1970-01-01'])
> /home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py(390)astype()
-> if is_object_dtype(dtype):
(Pdb) c
>>> a
> /home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py(390)astype()   
-> if is_object_dtype(dtype):
(Pdb) l
385  	        #   3. DatetimeArray.astype handles datetime -> period
386  	        dtype = pandas_dtype(dtype)
387  	        import pdb;
388  	        pdb.set_trace()
389  	
390  ->	        if is_object_dtype(dtype):
391  	            return self._box_values(self.asi8.ravel()).reshape(self.shape)
392  	        elif is_string_dtype(dtype) and not is_categorical_dtype(dtype):
393  	            if is_extension_array_dtype(dtype):
394  	                arr_cls = dtype.construct_array_type()
395  	                return arr_cls._from_sequence(self, dtype=dtype, copy=copy)
(Pdb) dtype
dtype('O')
(Pdb) is_object_dtype(dtype)
True
(Pdb) w
  <stdin>(1)<module>()
  /home/spark/spark/python/pyspark/pandas/indexes/base.py(2770)__repr__()
-> pindex = self._psdf._get_or_create_repr_pandas_cache(max_display_count).index
  /home/spark/spark/python/pyspark/pandas/frame.py(12780)_get_or_create_repr_pandas_cache()
-> self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  /home/spark/spark/python/pyspark/pandas/frame.py(12775)_to_internal_pandas()
-> return self._internal.to_pandas_frame
  /home/spark/spark/python/pyspark/pandas/utils.py(589)wrapped_lazy_property()
-> setattr(self, attr_name, fn(self))
  /home/spark/spark/python/pyspark/pandas/internal.py(1056)to_pandas_frame()
-> pdf = sdf.toPandas()
  /home/spark/spark/python/pyspark/sql/pandas/conversion.py(271)toPandas()
-> df[field.name] = _check_series_convert_timestamps_local_tz(
  /home/spark/spark/python/pyspark/sql/pandas/types.py(382)_check_series_convert_timestamps_local_tz()
-> return _check_series_convert_timestamps_localize(s, None, timezone)
  /home/spark/spark/python/pyspark/sql/pandas/types.py(353)_check_series_convert_timestamps_localize()
-> s.apply(
  /home/spark/upstream/pandas/pandas/core/series.py(4357)apply()
-> return SeriesApply(self, func, convert_dtype, args, kwargs).apply()
  /home/spark/upstream/pandas/pandas/core/apply.py(1043)apply()
-> return self.apply_standard()
  /home/spark/upstream/pandas/pandas/core/apply.py(1092)apply_standard()
-> values = obj.astype(object)._values
  /home/spark/upstream/pandas/pandas/core/generic.py(5815)astype()
-> new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  /home/spark/upstream/pandas/pandas/core/internals/managers.py(418)astype()
-> return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
  /home/spark/upstream/pandas/pandas/core/internals/managers.py(327)apply()
-> applied = getattr(b, f)(**kwargs)
  /home/spark/upstream/pandas/pandas/core/internals/blocks.py(591)astype()
-> new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  /home/spark/upstream/pandas/pandas/core/dtypes/cast.py(1309)astype_array_safe()
-> new_values = astype_array(values, dtype, copy=copy)
  /home/spark/upstream/pandas/pandas/core/dtypes/cast.py(1254)astype_array()
-> values = values.astype(dtype, copy=copy)
  /home/spark/upstream/pandas/pandas/core/arrays/datetimes.py(646)astype()
-> return dtl.DatetimeLikeArrayMixin.astype(self, dtype, copy)
> /home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py(390)astype()
-> if is_object_dtype(dtype):
(Pdb) n

pandas main(master) branch

>>> from pyspark import pandas as ps
WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver ancutor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
>>> a = ps.DatetimeIndex(['1970-01-01', '1970-01-01', '1970-01-01'])
/home/spark/spark/python/pyspark/pandas/internal.py:1573: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .itnstead.
  fields = [
> /home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py(430)astype()
-> dtype = pandas_dtype(dtype)
(Pdb) dtype
dtype('O')
(Pdb) c
/home/spark/spark/python/pyspark/sql/pandas/conversion.py:486: FutureWarning: iteritems is deprecated and will be removed in a future version. Usems instead.
  for column, series in pdf.iteritems():
>>> a
> /home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py(430)astype()   
-> dtype = pandas_dtype(dtype)
(Pdb) dtype
dtype('<M8')
(Pdb) c
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/spark/spark/python/pyspark/pandas/indexes/base.py", line 2770, in __repr__
    pindex = self._psdf._get_or_create_repr_pandas_cache(max_display_count).index
  File "/home/spark/spark/python/pyspark/pandas/frame.py", line 12780, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "/home/spark/spark/python/pyspark/pandas/frame.py", line 12775, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "/home/spark/spark/python/pyspark/pandas/utils.py", line 589, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "/home/spark/spark/python/pyspark/pandas/internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "/home/spark/spark/python/pyspark/sql/pandas/conversion.py", line 248, in toPandas
    series = series.astype(t, copy=False)
  File "/home/spark/upstream/pandas/pandas/core/generic.py", line 6095, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 386, in astype
    return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/internals/managers.py", line 308, in apply
    applied = getattr(b, f)(**kwargs)
  File "/home/spark/upstream/pandas/pandas/core/internals/blocks.py", line 526, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 299, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/dtypes/astype.py", line 227, in astype_array
    values = values.astype(dtype, copy=copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimes.py", line 631, in astype
    return dtl.DatetimeLikeArrayMixin.astype(self, dtype, copy)
  File "/home/spark/upstream/pandas/pandas/core/arrays/datetimelike.py", line 430, in astype
    dtype = pandas_dtype(dtype)
TypeError: Cannot cast DatetimeArray to dtype datetime64
>>> 


I will debug further for locating the root cause.

bzhaoopenstack avatar Jul 22 '22 03:07 bzhaoopenstack

https://github.com/pandas-dev/pandas/commit/67e8c4c3761ab1da4b0a341a472c0fe2ea393e8b

This is the associated commit from pandas upstream. Analysising the history

bzhaoopenstack avatar Jul 22 '22 07:07 bzhaoopenstack

I had opened an issue in Pandas community. Let's waiting

bzhaoopenstack avatar Jul 25 '22 03:07 bzhaoopenstack

From the pandas community, it seems the new behavior is good and expected. They only support several matches during using astype with DatetimeArray . We should apply it on PySpark side for future pandas versions if we plan to upgrade it in PySpark.

bzhaoopenstack avatar Aug 01 '22 12:08 bzhaoopenstack

qq: Now the pandas 1.5.1 is released, is this any update related to this PR?

itholic avatar Oct 31 '22 15:10 itholic

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Feb 09 '23 00:02 github-actions[bot]

With the release of pandas 2.0, I think this is PR should be re-opened, right?

I can recreate the issue originally described with

Python 3.9.16 (main, May  3 2023, 09:54:39) 
[GCC 10.2.1 20210110] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyspark
>>> pyspark.__version__
'3.4.0'
>>> import pandas
>>> pandas.__version__
'2.0.1'
>>> import pyspark.pandas as ps
>>> ps.DatetimeIndex(["1970-01-01", "1970-01-02", "1970-01-03"])
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/18 21:07:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/18 21:07:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pyspark/pandas/indexes/base.py", line 2705, in __repr__
    pindex = self._psdf._get_or_create_repr_pandas_cache(max_display_count).index
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 13347, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pyspark/pandas/frame.py", line 13342, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pyspark/pandas/utils.py", line 588, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pyspark/pandas/internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py", line 251, in toPandas
    if (t is not None and not all([is_timedelta64_dtype(t),is_datetime64_dtype(t)])) or should_check_timedelta:
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pandas/core/generic.py", line 6324, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pandas/core/internals/managers.py", line 451, in astype
    return self.apply(
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pandas/core/internals/managers.py", line 352, in apply
    applied = getattr(b, f)(**kwargs)
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pandas/core/internals/blocks.py", line 511, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pandas/core/dtypes/astype.py", line 242, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pandas/core/dtypes/astype.py", line 184, in astype_array
    values = values.astype(dtype, copy=copy)
  File "/home/ubuntu/.local/lib/python3.9/site-packages/pandas/core/arrays/datetimes.py", line 694, in astype
    raise TypeError(
TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.

hengoren avatar May 22 '23 12:05 hengoren