delta-rs
delta-rs copied to clipboard
Datafusion table provider: issues with timestamp types
In a recent attempt to write a where clause on a time stamp column in one of my favorite delta tables, I was not able to express that condition in delta-rs / datafusion. I believe this is due to a mismatch between the logical field type recorded in the delta log (and delivered to datafusion by delta-rs) and the expectation of datafusion to find the actual physical column type in the schema.
Concretely, given a delta table with a column that, according to (java) parquet tools, has the following type:
optional int64 endOfDsegTime (TIMESTAMP_MILLIS);
and according to delta-rs, the type:
SchemaField {
name: "endOfDsegTime",
type: primitive(
"timestamp",
),
nullable: true,
metadata: {},
},
When trying to use that field in a datafusion query endOfDsegTime >= TO_TIMESTAMP_MILLIS(\'2021-03-19T00:00:00\')
, I see: Error during planning: 'Timestamp(Nanosecond, None) >= Timestamp(Millisecond, None)' can't be evaluated because there isn't a common type to coerce the types to
, so datafusion seems to think that endOfDsegTime
is in nanoseconds precision, which it isn't.
If I rewrite the where clause to endOfDsegTime >= TO_TIMESTAMP(\'2021-03-19T00:00:00\')
, datafusion accepts the where clause but then fails somewhere deep in arrow compute territory:way to distinguish the different timestamp types from the metadata alone.
Cannot evaluate binary expression GtEq with types Timestamp(Millisecond, None) and Timestamp(Nanosecond, None)
I rewrote the table to int96 / nanoseconds; In that scenario, the second where clause works. I also noticed that according to delta-rs (and also, _delta_log/0.json
), the schema is identical between these two versions of the table - delta does not distinguish based on the different physical parquet type, only in parquet the actual concrete type manifests.
Notes:
- not sure how spark does it, but there "everything just works" irrespective of the column type in parquet
- reading either of the two delta tables using
datafusion::datasource::parquet
also just works, because the schema derived from the datasource is based on the parquet file, not on the meta data in the delta log - I'm not sure this can be solved solely on delta side - it seems that there's no way to distinguish the different timestamp types from the metadata alone.
There are two problems here. The first one is datafusion is not doing automatic type casting for your queries, which we really should. So please file an upstream ticket for that :)
The bigger problem is like you said our datafusion/arrow integration is not implemented correctly. We have the schema hardcoded to int96 nanosecond because that's what spark writes out by default: https://github.com/delta-io/delta-rs/commit/3d3a32ca1f8966c4f8129521092d0aa47fdaaf91.
However, I think the deltalake reader implementation should do the conversion to millisecond when reading them into memory so it's conforming to the deltatable metadata. @fvaleye also asked a similar question in the scala repo a long time ago: https://github.com/delta-io/delta/issues/643. I confirmed with @zsxwing last week that individual implementations should be able to support parquet files with different physical timestamp types.
From delta log https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types
timestamp | Microsecond precision timestamp without a timezone
From my personal delta-rs / spark interop, microsecond is the correct precision for timestamps which works with spark.
micro second is the right type to use for delta table's schema, but we need to update our pyarrow and datafusion integration to work with parquets that are written with both nanosecond and microsecond timestamp types.
I'm doing a poc with datafusion-ext. Loaded DeltaLake with nyc Taxi data using PySpark and tried to use datafusion to query it. There is an issue with timestamp problem discussed here.
I tried to store the column in different timestamp format but Spark is converting it into microseconds.
"Error: ArrowError(ExternalError(Execution("Failed to map column projection for field tpep_dropoff_datetime. Incompatible data types Timestamp(Nanosecond, None) and Timestamp(Microsecond, None)")))"
thanks @gopinathcs for the report.
Currently I am working on some updates of the datafusion APIs - #852. This also includes updates to latest datafusion and arrow.
That being said, the time representation in spark and arrow has been an issue, and I can likely take a closer look at this once the PR above is done. As this is quite a significant change to the datafusion APIs (not the TableProvider). If you have any feedback, we'd be happy to hear it.
thanks @roeap. We are planning to use delta-rs and datafusion with caching in our platform. I will continue with my exploration and let know of any feedback.
If spark is writing the timestamp in microsecond units, then it should work, see https://github.com/delta-io/delta-rs/blob/5ce4a40662e3490cab9956eacf76bfd7fff889af/rust/src/delta_arrow.rs#L131. @gopinathcs are you sure your pyspark code is not producing timestamps in int96 instead?
OK, this is what I did in the POC. Loaded the table in Arrow, casted to micro timestamp and wrote it to Parquet. Then read the parquet into Spark and created Delta Table. Then tried to run a "Select * from" query using datafusion-ext.
Let me recheck again. Thanks @houqp for pointer.
@houqp @roeap Here is the table schem of Datafusion "Show columns From demo", Seems delta Lake and Datafusion show it has microseconds.

Get the same error if run query "Select * from demo limit 5".
Error: ArrowError(ExternalError(Execution("Failed to map column projection for field tpep_dropoff_datetime. Incompatible data types Timestamp(Nanosecond, None) and Timestamp(Microsecond, None)")))
Same error converting from a pyarrow table with the following schema:
id_articulo: int64 id_color: int64 es_imagen: decimal128(1, 0) id_coleccion: int64 fecha_alta: timestamp[ms] usuario_alta: string fecha_baja: timestamp[ms] usuario_baja: string fecha_modificacion: timestamp[ms] usuario_modificacion: string
deltalake.PyDeltaTableError: Schema error: Invalid data type for Delta Lake: Timestamp(Millisecond, None)
Delta Lake format only supports microsecond precision timestamps. So it should work if you convert your columns to that. In the future, we might be able to try automatically casting timestamps for you (but would fail if out of representable bounds).
Yes, finally I manage to change the schema to use pa.timestamp('us') because the field was in nanoseconds, and then we must cast the table: table.cast(target_schema=schema_cast, safe=False) Thanks