On Spark, MySQL, and Timezones

Proper time handling in data can be hard. On the surface it seems like an easy problem and in many cases there are straightforward solutions which work most of the time. But really "most of the time" is not enough.

I saw a post on timezone handling in Python on Hacker News and was reminded of the different Python libraries there are for handling timestamps with timezones. In the Java world there are plenty of projects which still rely on the old Joda Time. Although the newer java.time packages in Java 8 make that dependency, in most cases, not necessary anymore. That doesn't mean we don't have to watch out for common date, time, and timezone issues in the JVM world.

One such example is when using Spark SQL. On the one hand, Spark is older than the java.time API and it also needs to integrate completely with JDBC. It is therefore important to double check all time-handling code in Spark. There are many StackOverflow posts about time-data handling in Spark. Some very useful, others leading to more problems. In the past I have had to deal such problems and always found a solution which worked well enough in a lot of tests for that particular project.

This week I was once again faced with such a problem and I wanted to note down (today-I-learned style) some of the unexpected results I found. I did this in hope of finding a more structured way of handling dates in Spark. I did not find such a solution but ended up with a reference to fall back on when working with this particular combination of technologies in the future.

Technologies used

First I want to go over the software used and important links. First there is Spark. I am using a relatively recent version of Spark: 3.0. Luckily, Databricks (creators of Spark) have published a blog post about using timestamps in Spark. In there is already noted that full timestamp with timezone is not supported in Spark and timestamp without timezone can be handled by using a timestamp with a UTC session timezone in Spark. There is also information available for timestamps in the Databricks workspace, where they go a bit more into detail about how they rely on JVM's handling of time and repeat SQL's timestamp definitions.

The database from which I am extracting data is a MySQL Database (Docker mysql:8.0.21 to be exact). MySQL timestamp with a timezone seem to rely on the session timezone, and are always stored in UTC. A tutorial can be found here, and documentation here.

Finally the JDBC driver used by Spark in this case is the mysql-connector-java:8.0.23.

The setting

I load in some generated data into MySQL using a python script (see appendix). I either leave the setting to default, I set the connection specifically to UTC or specifically to UTC+01:00.

For retrieving the data in Spark I use a

sparkSession.read
    .format("jdbc")
    ....  // Settings
    .load()

For filtering the selection I use one of 3 methods. The main method I used was this one, which I think is quite common.

val startDateTime = "2020-02-02 00:00:00"
val endDateTime = "2020-02-03 00:00:00"
dataFrame.where(
        ($"event_time" >= lit(startDateTime)) and ($"event_time" < lit(
          endDateTime)))

One potential alternative is using java.sql.Timestamp values.

dataFrame.where(
        ($"event_time" >= Timestamp
          .valueOf(startDateTime)) and ($"event_time" < Timestamp.valueOf(
          endDateTime)))

Finally we can also change the load query.

// Inside the jdbc load settings
sparkSession.read
    .format("jdbc")
    ....  // other settings
    .option("query", "SELECT * FROM ts_table2 WHERE event_time >= '2020-02-02 00:00:00+01:00' AND event_time < '2020-02-03 00:00:00+01:00'")
    .load()

In this example we only care about the event_time (a timestamp column) and event_value a increasing unique integer. I write the data out to csv files but I've tested some of the results by writing to parquet files and reading them with Pandas + Pyarrow's parquet reader.

(Un-)surprising outcomes

First it's better to explicitly set the session timezone if you rely on timezones in MySQL. But in case you did not, and you wanted to use MySQL's System timezone. It could be surprising that Spark will add a timezone based on the settings used by Spark.

If I leave everything default, I get as the first row 2020-02-02T00:00:00.000+01:00,1440. Yet if I set a spark.sql.session.timeZone=UTC and a -Duser.timezone=UTC, I get a return value of 2020-02-02T00:00:00.000Z,1440. These are fundamentally 2 different points in time. This means my Spark timezone setting influences the very data I will have in my output. Note that these are also the results if I explicitly set the session timezone in the python script to UTC

Ok, so I set the timezone in my python script to +01:00 (Current offset in Germany). Now when I query the database it matters what I set my session timezone to. I apply the filter in SQL in the same session and get back as first row 2020-02-02 00:00:00,1440. If I set my session timezone to UTC, I get the same row back by going back one hour 2020-02-01 23:00:00,1440.

Now if I would use Spark to load this data, without changing the settings. I get as row back 2020-02-02T00:00:00.000+01:00,1500. The 1500 shows that the actual row is the one at 2020-02-02T00:00:00.00Z but that the +01:00 timezone was added after loading in the data filtered at UTC time. This same result I also get if I set user/session timezone to Europe/Berlin explicitly.

Loading with spark.sql.session.timeZone=UTC and -Duser.timezone=UTC showed the data correctly as 2020-02-02T00:00:00.000Z,1500 (but filtered in UTC of course). In this case it doesn't matter which type of filter I apply and I can even filter using the load query with an filter of event_time >= '2020-02-02 00:00:00Z'.

Curiously if I set spark.sql.session.timeZone=Europe/Berlin and -Duser.timezone=UTC, I get the correct value in my current timezone 2020-02-02T00:00:00.000+01:00,1440. But I'm not sure if this is behavior I can rely on across Spark database and file sources.

Another weird result was when I left my Spark and JVM settings to default, but tried to filter in the query using timezone. The query filter was as event_time >= '2020-02-02 00:00:00+01:00'. The first row in the output looked as follows: 2020-02-01T23:00:00.000+01:00,1440. The 1440 shows that the right row was retrieved, but somehow the event_date is not correct anymore.

What about Datetime

MySQL also has support for Datetime columns. These explicitly do not have a timezone, and setting a session timezone do not influence them at all.

In my tests the filter was always correctly applied to these datetime objects but the actual values were represented as timestamps in Spark. This means that if I load them with default settings it looks like 2020-02-02T00:00:00.000+01:00 and if I load them with a UTC set user/session timezone, then it looks like 2020-02-02T00:00:00.000Z which is kind of a shame.

Parquet sources

I quickly also tested writing some parquet files with Pandas and Pyarrow, and then filtering those with Spark. Here everything worked as expected. When I set Spark & JVM to UTC the filter was correctly applied in UTC time and when set to Europe/Germany it was correctly applied and represented in the data in +01:00.

Conclusion

In a roundabout way this lead me to a conclusion I have read a couple of times before. It's best to let Spark work with time data in UTC. Using UTC for all dates might help in making dates comparable but it is no panacea sadly. If on the operational side it makes more sense to work with a custom (non-UTC, non-unixtime) solution of storing timezone data, then it needs to be solved in a bespoke way during processing in Spark.

Appendix

MySQL Data Loader

import random
from datetime import datetime, timedelta

import numpy as np
import mysql.connector


start_date = "2020-02-01 00:00"
end_date = "2020-02-04 00:00"
table_name = "dt_table1"
set_connection_timezone = False
set_to_utc = False
print(start_date, end_date, table_name, str(set_connection_timezone), str(set_to_utc))

conn = mysql.connector.connect(
    host="localhost",
    database="tztest",
    port=3306,
    user="<username>",
    password="<password>",
)

if set_connection_timezone:
    if set_to_utc:
        conn.time_zone = "+00:00"
    else:
        conn.time_zone = "+01:00"
print("Timezone: " + str(conn.time_zone))

minute_range = np.arange(start_date, end_date, dtype="datetime64[m]")
vals = np.arange(len(minute_range))


query = "INSERT INTO {} (event_time,event_count) VALUES(%s,%s)".format(table_name)
cursor = conn.cursor()

if set_connection_timezone:
    if set_to_utc:
        init_command="SET SESSION time_zone='+00:00'"
    else:
        init_command="SET SESSION time_zone='+01:00'"
    cursor.execute(init_command)
    print("Session timezone set")

insert_list = []
i = 0


def execute_inserts():
    cursor.executemany(query, insert_list)
    conn.commit()


for (ts, val) in zip(minute_range, vals):
    dt = ts.astype(datetime).strftime("%Y-%m-%d %H:%M:%S")
    insert_list.append((dt, int(val)))
    i += 1
    if i % 30 == 0:
        execute_inserts()
        insert_list = []
        print(".", end="", flush=True)
execute_inserts()
cursor.close()
conn.close()