spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-33863][PYTHON] Respect session timezone in udf workers

Open gaogaotiantian opened this issue 1 month ago • 16 comments

What changes were proposed in this pull request?

Respect spark.sql.session.timeZone in UDF workers.

This is discussed in #52980 but we decided to move it to a separate PR. There are still open questions left

  1. It seems like this method can't get the changes by spark.conf.set. I believe this is trivial to people who are familiar with the configs so I did not investigate too much.
  2. pandas/arrow UDFs are actually reading this config, but seems like that's only passed for those kind of UDFs. The message has no structure.

Why are the changes needed?

Relying on the timezone of local machine does not make any sense.

Does this PR introduce any user-facing change?

Yes. The UDF behavior regarding to timestamps and timezones will be changed.

How was this patch tested?

Manually

Was this patch authored or co-authored using generative AI tooling?

No

gaogaotiantian avatar Nov 21 '25 21:11 gaogaotiantian

@cloud-fan , @ueshin , @zhengruifeng we've discussed this but did not reach to a conclusion. I had a draft here and a few questions. We probably need to further discuss about the implementation and implication.

gaogaotiantian avatar Nov 21 '25 22:11 gaogaotiantian

can we add a test which sets spark.sql.session.timeZone to a different value than the CI machine local timezone?

cloud-fan avatar Nov 25 '25 02:11 cloud-fan

Okay now I rethink about this and I think our TimestampType is completely wrong. I don't believe we can patch it without breaking backward compatibility.

The key point of TimestampType is that it's timezone aware - it's impossible to make it correct, if we support naive timestamp (unless we are able to read session config inside the conversion function).

It's not possible to determine which timezone to use, when we get a naive datetime and try to convert it to a timezone aware data type.

The only way I think we can go is to enforce timezone for datetime.datetime objects if users want to convert it to a TimestampType. Raise an error if there is none.

If we can get the current session timezone when we try to convert to TimestampType, that would work too, but I don't think that's feasible. It also breaks OOP principles.

I don't want to fix this so that it's correct in some cases but become wrong in others. We should make a decision about whether we want to fix it (with the risk of breaking users code). If not, that's okay - we can have a timestamp system that sometimes work.

gaogaotiantian avatar Nov 25 '25 23:11 gaogaotiantian

unless we are able to read session config inside the conversion function

Where do we do the conversion? At lease for the UDF case the conversion should all happen within an active query which belongs to a session ?

cloud-fan avatar Nov 26 '25 06:11 cloud-fan

Where do we do the conversion? At lease for the UDF case the conversion should all happen within an active query which belongs to a session ?

We have to do it everywhere.

df = spark.createDataFrame([(datetime.datetime(1990, 8, 10, 0, 0),)], ["ts"])

Here we are trying to create a TimestampType with a naive datetime - how could we determine the timezone info? It's not correct to assume it belongs to any timezone.

There are two correct ways to do this:

  1. For every single conversion, we know session local timezone and we assume the naive datetime is that timezone
  2. We throw an error when the users try to convert a naive timestamp to TimestampType and suggest that they should use TimestampNTZType.

gaogaotiantian avatar Nov 26 '25 08:11 gaogaotiantian

@gaogaotiantian The key of Spark TimestampType is that it's an absolute timestamp. The session timezone only matters when we render the timestamp without timezone (e.g. df.show, or cast to string, or functions that get year/month/.../second fields from timestamp).

For the case of df = spark.createDataFrame([(datetime.datetime(1990, 8, 10, 0, 0),)], ["ts"]), we use a specific session spark to create the dataframe, and apparently we should respect its session timezone. We should convert datetime.datetime(1990, 8, 10, 0, 0) to an absolute timestamp by attaching the session timezone to it. Moreover, we can have a mix of python datetime.datetime objects which have different timezones or no timezone, and it's OK because we can still convert them to absolute timestamps.

A similar example is reading JDBC table that contains column with standard TIMESTAMP WITH TIMEZONE type. Each value can have a different timezone but it's still OK to read it as Spark TimestampType, because they can be converted to absolute timestamps.

Under the hood, TimestampType is stored as int64 in memory, which means number of microseconds from UTC epoch (1970-01-01 00:00:00 Z)

cloud-fan avatar Nov 26 '25 13:11 cloud-fan

@cloud-fan , I understand TimestampType under the hood is just a UTC epoch timestamp. We need to convert to UTC timestamp so we have to assume a timezone for naive timestamps - I don't believe we are doing that.

spark.conf.set("spark.sql.session.timeZone", "UTC")
df = spark.createDataFrame([
    (datetime.datetime(1990, 8, 10, 0, 0, tzinfo=datetime.timezone.utc),),
    (datetime.datetime(1990, 8, 10, 0, 0),)
], ["ts"])
df.show()

The two columns above are different - because we do not respect session timezone when converting them. Notice that UDF is not involved at this point.

https://github.com/apache/spark/blob/0908ec5272e7b790761f64b46553c4567ad96d01/python/pyspark/sql/types.py#L441

We don't check timezone info in toInternal and fromInternal. (I don't know if there's other secrets like changing the system timezone, but the result is different).

We can fix that with some hacks, if we really want to - that's the option 1 I mentioned above. Again, we need to do it everywhere.

However, that is not the full picture. We have an even worse issue about datetime.datetime - yes, internally we can convert it to an EPOCH timestamp, but the user might want to play with in in Python.

@udf(returnType=BooleanType())
def greater(ts):
    return ts > datetime.datetime(1990, 8, 10, 0, 0, tzinfo=datetime.timezone.utc)

The code above will raise an error, because we convert the TimestampType to a naive datetime - even though we claim that TimestampType is timezone aware. It's illegal to compare a naive timestamp with an aware timestamp in Python (you can do == check but it will always return False).

Also I found a issue with probably DST.

@udf(returnType=BooleanType())
def same(ts):
    return ts == datetime.datetime(1990, 8, 10, 0, 0)

df = spark.createDataFrame([
    (datetime.datetime(1990, 8, 10, 0, 0),)
], ["ts"])

df.select(same("ts")).show()

Even this returns False - there's an hour diff, probably due to some missing DST checks.

Back to my point - our TimestampType on Python is just broken - it will disappoint users when they try to do some manipulation on it. We can't mix naive and aware timestamps together because Python does not support it.

This is why I propose my second option - it's a bit aggressive but we can make it right - to always map TimestampType with aware datetime and TimestampNTZType with naive datetime. I believe that's the only chance that we can make it completely correct.

However, there is a risk that some of the existing user code can break. If that's a concern. We can just leave this broken. It still works in some occasions.

gaogaotiantian avatar Nov 26 '25 19:11 gaogaotiantian

ah this is tough. I agree with "always map TimestampType with aware datetime", but it can be a breaking change to python UDFs, as it's not only a data change, but also type change (It's illegal to compare a naive timestamp with an aware timestamp in Python).

How about arrow/pandas? Do they also rely on datetime object?

cloud-fan avatar Nov 28 '25 06:11 cloud-fan

Yeah this could be a breaking change, but this is the correct way to go. Mapping TimestampType to naive datetime object is technically not "safer" - it still can't be compared with an aware timestamp. It's not like naive timestamp has better compatibility - you have to choose one or the other.

I don't have the best knowledge of pandas, but it seems like they have similar concerns - https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html

I mean we can't really make it work properly if we mix them up. I can think of a few ways to make it less painful

  1. If the user uses a naive datetime and try to convert it to a TimestampType explicitly, we use utc for the naive timestamp instead of raising an error (configurable).
  2. When we infer types, we infer based on whether datetime has a timezone - do not automatically point to TimestampType.
  3. Provide a flag to keep the original behavior - name it something like keep_the_wrong_timestamp_behavior. If users are not ready, they need to explicitly set that flag.
  4. Generate warnings when users try to mix these things up.

I agree this could be interruptive, but we can't make it right - that's the problem. It's a whole big mess internally and we simply can't make it better while keeping backward compatibility.

gaogaotiantian avatar Nov 28 '25 07:11 gaogaotiantian

I thought about it and I have an alternative proposal. We can add a conf to enable "strict mode" for timestamps. Where we always pair the aware timestamp with TimestampType and naive timestamp with TimestampNTZType. This is off by default but strict and correct when enabled.

We have to hook this logic into the type conversion and I think the least intrusive way is to set it as a class variable of TimestampType - hook the change on conf change or something.

In this way, when the user asks about the weird timestamp behavior, we can at least say - well the default config will never work properly, but you can try the strict mode. We also have a chance to gradually switch to strict mode in the future.

gaogaotiantian avatar Dec 07 '25 18:12 gaogaotiantian

Let's clearly define this "strict mode". For the input side, Spark can be more lenient and allow mixed timezone aware and non-aware datetime objects, by using session timezone. For the output side, the "strict mode" will let Spark always produce timezone aware datetime objects for TimestampType?

cloud-fan avatar Dec 08 '25 01:12 cloud-fan

For the input side, Spark can be more lenient and allow mixed timezone aware and non-aware datetime objects, by using session timezone.

I think it's better to enforce the match, otherwise the user could potentially create a naive timestamp, send to UDF, then get an aware timestamp because we "output" an aware timestamp from TimestampType. Heuristics makes this this to difficult to get correct.

gaogaotiantian avatar Dec 08 '25 02:12 gaogaotiantian

ok so Spark should fail if naive and aware datetimes are mixed? It's another breaking change though...

cloud-fan avatar Dec 08 '25 02:12 cloud-fan

Well Python will fail when naive and aware datetimes are mixed - there's no way for spark to avoid it. It is a breaking change I agree, that's why I propose to but it under an optional flag. The reason we should have this is what I've mentioned above - we simply can't make it right when we mix them. Say the user creates a datetime then changes the session config, vs they create a session config then create a datetime - should the behavior be the same? We won't even be able to distinguish. The only way to make it work properly with easy to explain rules, is to strictly not mix them.

The existing code might work in some naive cases, we can keep it, but when the user tries to do something complicated with their timestamps, they will have issues. We should at least provide one way for them to always get the correct and consistent result.

gaogaotiantian avatar Dec 08 '25 02:12 gaogaotiantian

Let’s open a PR so we can discuss the details more closely.

cloud-fan avatar Dec 08 '25 02:12 cloud-fan

Okay sure. This won't be a trivial change but I can draft a PR. The tricky part is where to enforce this. I had a few thoughts in my mind:

  1. Check conf directly in fromInternal and toInternal - but this has to access the session conf of spark in datatype conversion, which is a new pattern and will couple the data type and the existence of a session too much
  2. Pass it as an argument to fromInternal and toInternal - still a new pattern and will have to change every container.
  3. (Might go this way) Add a class-level switch variable for TimestampType and TimestampNTZType and set it when we config session (probably will be a hook somewhere) - we need to pass the confs to workers as well to make it work properly.

gaogaotiantian avatar Dec 08 '25 02:12 gaogaotiantian