Pyarrow type error
Apache Iceberg version
0.6.0 (latest release)
Please describe the bug 🐞
Given a table like so:
In [36]: table
Out[36]:
matches(
...
14: player_last_session: optional timestamptz,
...
30: subject_last_session: optional timestamptz,
),
partition by: [run_date, player_agg_cluster_name, initiating_at],
sort order: [],
snapshot: Operation.APPEND: id=6595288807809068528, schema_id=0
I get the following error
In [25]: table.scan().to_arrow()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[25], line 1
----> 1 table.scan().to_arrow()
File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/table/__init__.py:1418, in DataScan.to_arrow(self)
1415 def to_arrow(self) -> pa.Table:
1416 from pyiceberg.io.pyarrow import project_table
-> 1418 return project_table(
1419 self.plan_files(),
1420 self.table,
1421 self.row_filter,
1422 self.projection(),
1423 case_sensitive=self.case_sensitive,
1424 limit=self.limit,
1425 )
File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:1114, in project_table(tasks, table, row_filter, projected_schema, case_sensitive, limit)
1111 if limit is not None:
1112 _ = [f.cancel() for f in futures if not f.done()]
-> 1114 tables = [f.result() for f in completed_futures if f.result()]
1116 if len(tables) < 1:
1117 return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema))
File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:1114, in <listcomp>(.0)
1111 if limit is not None:
1112 _ = [f.cancel() for f in futures if not f.done()]
-> 1114 tables = [f.result() for f in completed_futures if f.result()]
1116 if len(tables) < 1:
1117 return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema))
File ~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:449, in Future.result(self, timeout)
447 raise CancelledError()
448 elif self._state == FINISHED:
--> 449 return self.__get_result()
451 self._condition.wait(timeout)
453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File ~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
399 if self._exception:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
404 self = None
File ~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
55 return
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:957, in _task_to_table(fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, row_counts, limit, name_mapping)
954 if metadata := physical_schema.metadata:
955 schema_raw = metadata.get(ICEBERG_SCHEMA)
956 file_schema = (
--> 957 Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema, name_mapping)
958 )
960 pyarrow_filter = None
961 if bound_row_filter is not AlwaysTrue():
File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:655, in pyarrow_to_schema(schema, name_mapping)
651 else:
652 raise ValueError(
653 "Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
654 )
--> 655 return visit_pyarrow(schema, visitor)
File ~/.pyenv/versions/3.11.7/lib/python3.11/functools.py:909, in singledispatch.<locals>.wrapper(*args, **kw)
905 if not args:
906 raise TypeError(f'{funcname} requires at least '
907 '1 positional argument')
--> 909 return dispatch(args[0].__class__)(*args, **kw)
File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:676, in _(obj, visitor)
674 @visit_pyarrow.register(pa.Schema)
675 def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:
--> 676 return visitor.schema(obj, visit_pyarrow(pa.struct(obj), visitor))
File ~/.pyenv/versions/3.11.7/lib/python3.11/functools.py:909, in singledispatch.<locals>.wrapper(*args, **kw)
905 if not args:
906 raise TypeError(f'{funcname} requires at least '
907 '1 positional argument')
--> 909 return dispatch(args[0].__class__)(*args, **kw)
File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:685, in _(obj, visitor)
683 for field in obj:
684 visitor.before_field(field)
--> 685 result = visit_pyarrow(field.type, visitor)
686 results.append(visitor.field(field, result))
687 visitor.after_field(field)
File ~/.pyenv/versions/3.11.7/lib/python3.11/functools.py:909, in singledispatch.<locals>.wrapper(*args, **kw)
905 if not args:
906 raise TypeError(f'{funcname} requires at least '
907 '1 positional argument')
--> 909 return dispatch(args[0].__class__)(*args, **kw)
File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:718, in _(obj, visitor)
716 if pa.types.is_nested(obj):
717 raise TypeError(f"Expected primitive type, got: {type(obj)}")
--> 718 return visitor.primitive(obj)
File ~/.pyenv/versions/3.11.7/envs/ml/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:891, in _ConvertToIceberg.primitive(self, primitive)
888 primitive = cast(pa.FixedSizeBinaryType, primitive)
889 return FixedType(primitive.byte_width)
--> 891 raise TypeError(f"Unsupported type: {primitive}")
TypeError: Unsupported type: timestamp[ns]
After some debugging, at this line I find
ipdb> physical_schema
player_last_session: timestamp[ns]
...
subject_last_session: timestamp[ns]
I imagine the fix is to do something like this on this line, but currently those overrides are not exposed. Am I on the right track?
I believe that this issue is somewhat similar to #520
@dev-goyal Thanks for raising this. It looks like that's a timestamp with nanosecond precision. Support for nanosecond timestamps is currently being added in the latest specification. Can I ask how you wrote the Parquet file?
Thanks @Fokko, makes sense! I was able to simply reduce precision on my end so it's not a big deal, but I figured it couldn't hurt to raise this.
I wrote these data using DBT into an iceberg table (Athena/Trino as the engine) (sourced from a CDC topic, hence the nanosecond precision), they are represented originally as Timestamp.
Hey @dev-goyal, do you mind posting a snippet of your example above? I think this is very similar to #520
In #520, the iceberg table is created with pyarrow schema. Internally, iceberg converts the schema and "downcast" certain types (large_string -> string, timestamp nano -> timestamp). So when the pyarrow data (with the same schema) is saved as iceberg table, there is a schema mismatch. Iceberg schema is downcast/translated while pyarrow data schema is unchanged.
#523 should help solve this
I'm facing a similar issue in my code.
Tested using main@7fcdb8d25dfa2498ba98a2b8e8d2b327d85fa7c9 (the commit after Minor fixes, #523 followup (#563) and Cast data to Iceberg Table's pyarrow schema (#523))
In my case I'm creating a new table from this arrow schema:
ds: timestamp[ns]
yhat: double
yhat_lower: double
yhat_upper: double
-- schema metadata --
This is the full stacktrace:
Traceback (most recent call last):
File "/bpln/cba723bb/82eddf43/pip/runtime/s3write/invoke.py", line 62, in invoke
self._pyiceberg_write_model(model)
File "/bpln/cba723bb/82eddf43/pip/runtime/s3write/invoke.py", line 137, in _pyiceberg_write_model
pyiceberg_table = catalog.create_table(
^^^^^^^^^^^^^^^^^^^^^
File "/tmp/runtime/shared/pyiceberg_patch.py", line 105, in create_table
iceberg_schema = self._convert_schema_if_needed(schema)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/catalog/__init__.py", line 559, in _convert_schema_if_needed
schema: Schema = visit_pyarrow(schema, _ConvertToIcebergWithoutIDs()) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/functools.py", line 909, in wrapper
return dispatch(args[0].__class__)(*args, **kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/io/pyarrow.py", line 682, in _
return visitor.schema(obj, visit_pyarrow(pa.struct(obj), visitor))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/functools.py", line 909, in wrapper
return dispatch(args[0].__class__)(*args, **kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/io/pyarrow.py", line 691, in _
result = visit_pyarrow(field.type, visitor)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/functools.py", line 909, in wrapper
return dispatch(args[0].__class__)(*args, **kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/io/pyarrow.py", line 726, in _
return visitor.primitive(obj)
^^^^^^^^^^^^^^^^^^^^^^
File "/bpln/cba723bb/82eddf43/pip/pyiceberg/pyiceberg/io/pyarrow.py", line 899, in primitive
raise TypeError(f"Unsupported type: {primitive}")
TypeError: Unsupported type: timestamp[ns]
So, timestamp_ns & timestamptz_ns has been added on the v3 of the iceberg specs, pyiceberg right now supports v1 &v2.
In my case, the column has been generated by this user-generated snipped:
df['ds'] = pd.to_datetime(df["pickup_datetime"]).dt.date
Unfortunately, I can not control what a user can write and how she produces the table.
What's the recommended solution for downcasting unsupported column types into something less precise, without raising an error?
Ciao @bigluck. Thanks for jumping in here. Until V3 is finalized, we can add a flag to cast a nanosecond to a microsecond precision. Would that work for you?
@Fokko it sounds good to me! :)
For anyone else that stumbles across this, you can:
timestamp_fields = [field.name for field in tbl.schema if pa.types.is_timestamp(field.type)]
null_fields = [field.name for field in tbl.schema if pa.types.is_null(field.type)]
fields = []
for field in tbl.schema:
if field.name in timestamp_fields:
fields.append(pa.field(field.name, pa.timestamp("us")))
elif field.name in null_fields:
fields.append(pa.field(field.name, pa.string()))
else:
fields.append(field)
tbl = tbl.cast(pa.schema(fields))
(where tbl is a pyarrow Table)
as nulls are also not supported