Proper time handling in data can be hard. On the surface it seems like an easy problem and in many cases there are straightforward solutions which work most of the time. But really "most of the time" is not enough.
I saw a post on timezone handling in Python on Hacker News and was reminded of the different Python libraries there are for handling timestamps with timezones. In the Java world there are plenty of projects which still rely on the old Joda Time. Although the newer java.time packages in Java 8 make that dependency, in most cases, not necessary anymore. That doesn't mean we don't have to watch out for common date, time, and timezone issues in the JVM world.
One such example is when using Spark SQL. On the one hand, Spark is older than the java.time API and it also needs to integrate completely with JDBC. It is therefore important to double check all time-handling code in Spark. There are many StackOverflow posts about time-data handling in Spark. Some very useful, others leading to more problems. In the past I have had to deal such problems and always found a solution which worked well enough in a lot of tests for that particular project.
This week I was once again faced with such a problem and I wanted to note down (today-I-learned style) some of the unexpected results I found. I did this in hope of finding a more structured way of handling dates in Spark. I did not find such a solution but ended up with a reference to fall back on when working with this particular combination of technologies in the future.
Technologies used
First I want to go over the software used and important links. First there is Spark. I am using a relatively recent version of Spark: 3.0. Luckily, Databricks (creators of Spark) have published a blog post about using timestamps in Spark. In there is already noted that full timestamp with timezone is not supported in Spark and timestamp without timezone can be handled by using a timestamp with a UTC session timezone in Spark. There is also information available for timestamps in the Databricks workspace, where they go a bit more into detail about how they rely on JVM's handling of time and repeat SQL's timestamp definitions.
The database from which I am extracting data is a MySQL Database (Docker mysql:8.0.21 to be exact). MySQL timestamp with a timezone seem to rely on the session timezone, and are always stored in UTC. A tutorial can be found here, and documentation here.
Finally the JDBC driver used by Spark in this case is the mysql-connector-java:8.0.23
.
The setting
I load in some generated data into MySQL using a python script (see appendix). I either leave the setting to default,
I set the connection specifically to UTC
or specifically to UTC+01:00
.
For retrieving the data in Spark I use a
sparkSession.read
.format("jdbc")
.... // Settings
.load()
For filtering the selection I use one of 3 methods. The main method I used was this one, which I think is quite common.
val startDateTime = "2020-02-02 00:00:00"
val endDateTime = "2020-02-03 00:00:00"
dataFrame.where(
($"event_time" >= lit(startDateTime)) and ($"event_time" < lit(
endDateTime)))
One potential alternative is using java.sql.Timestamp
values.
dataFrame.where(
($"event_time" >= Timestamp
.valueOf(startDateTime)) and ($"event_time" < Timestamp.valueOf(
endDateTime)))
Finally we can also change the load query.
// Inside the jdbc load settings
sparkSession.read
.format("jdbc")
.... // other settings
.option("query", "SELECT * FROM ts_table2 WHERE event_time >= '2020-02-02 00:00:00+01:00' AND event_time < '2020-02-03 00:00:00+01:00'")
.load()
In this example we only care about the event_time
(a timestamp column) and event_value
a increasing unique integer.
I write the data out to csv files but I've tested some of the results by writing to parquet files and reading them with
Pandas + Pyarrow's parquet reader.
(Un-)surprising outcomes
First it's better to explicitly set the session timezone if you rely on timezones in MySQL. But in case you did not, and you wanted to use MySQL's System timezone. It could be surprising that Spark will add a timezone based on the settings used by Spark.
If I leave everything default, I get as the first row 2020-02-02T00:00:00.000+01:00,1440
. Yet if I set
a spark.sql.session.timeZone=UTC
and a -Duser.timezone=UTC
, I get a return value of 2020-02-02T00:00:00.000Z,1440
.
These are fundamentally 2 different points in time. This means my Spark timezone setting influences the very data
I will have in my output. Note that these are also the results if I explicitly set the session timezone in
the python script to UTC
Ok, so I set the timezone in my python script to +01:00
(Current offset in Germany). Now when I query the database
it matters what I set my session timezone to. I apply the filter in SQL in the same session and get back as
first row 2020-02-02 00:00:00,1440
. If I set my session timezone to UTC, I get the same row back by going
back one hour 2020-02-01 23:00:00,1440
.
Now if I would use Spark to load this data, without changing the settings. I get as row back
2020-02-02T00:00:00.000+01:00,1500
. The 1500
shows that the actual row is the one at 2020-02-02T00:00:00.00Z
but
that the +01:00
timezone was added after loading in the data filtered at UTC time. This same result I also get if
I set user/session timezone to Europe/Berlin
explicitly.
Loading with spark.sql.session.timeZone=UTC
and -Duser.timezone=UTC
showed the data correctly as
2020-02-02T00:00:00.000Z,1500
(but filtered in UTC
of course). In this case it doesn't matter which type of filter
I apply and I can even filter using the load query with an filter of event_time >= '2020-02-02 00:00:00Z'
.
Curiously if I set spark.sql.session.timeZone=Europe/Berlin
and -Duser.timezone=UTC
, I get the correct value
in my current timezone 2020-02-02T00:00:00.000+01:00,1440
. But I'm not sure if this is behavior I can rely on
across Spark database and file sources.
Another weird result was when I left my Spark and JVM settings to default, but tried to filter in the query using timezone.
The query filter was as event_time >= '2020-02-02 00:00:00+01:00'
. The first row in the output looked as
follows: 2020-02-01T23:00:00.000+01:00,1440
.
The 1440
shows that the right row was retrieved, but somehow the event_date is not correct anymore.
What about Datetime
MySQL also has support for Datetime columns. These explicitly do not have a timezone, and setting a session timezone do not influence them at all.
In my tests the filter was always correctly applied to these datetime objects but the actual values were represented
as timestamps in Spark. This means that if I load them with default settings it looks like 2020-02-02T00:00:00.000+01:00
and if I load them with a UTC set user/session timezone, then it looks like 2020-02-02T00:00:00.000Z
which is kind of
a shame.
Parquet sources
I quickly also tested writing some parquet files with Pandas and Pyarrow, and then filtering those with Spark.
Here everything worked as expected. When I set Spark & JVM to UTC
the filter was correctly applied in UTC
time
and when set to Europe/Germany
it was correctly applied and represented in the data in +01:00
.
Conclusion
In a roundabout way this lead me to a conclusion I have read a couple of times before. It's best to let Spark work with time data in UTC. Using UTC for all dates might help in making dates comparable but it is no panacea sadly. If on the operational side it makes more sense to work with a custom (non-UTC, non-unixtime) solution of storing timezone data, then it needs to be solved in a bespoke way during processing in Spark.
Appendix
MySQL Data Loader
import random
from datetime import datetime, timedelta
import numpy as np
import mysql.connector
start_date = "2020-02-01 00:00"
end_date = "2020-02-04 00:00"
table_name = "dt_table1"
set_connection_timezone = False
set_to_utc = False
print(start_date, end_date, table_name, str(set_connection_timezone), str(set_to_utc))
conn = mysql.connector.connect(
host="localhost",
database="tztest",
port=3306,
user="<username>",
password="<password>",
)
if set_connection_timezone:
if set_to_utc:
conn.time_zone = "+00:00"
else:
conn.time_zone = "+01:00"
print("Timezone: " + str(conn.time_zone))
minute_range = np.arange(start_date, end_date, dtype="datetime64[m]")
vals = np.arange(len(minute_range))
query = "INSERT INTO {} (event_time,event_count) VALUES(%s,%s)".format(table_name)
cursor = conn.cursor()
if set_connection_timezone:
if set_to_utc:
init_command="SET SESSION time_zone='+00:00'"
else:
init_command="SET SESSION time_zone='+01:00'"
cursor.execute(init_command)
print("Session timezone set")
insert_list = []
i = 0
def execute_inserts():
cursor.executemany(query, insert_list)
conn.commit()
for (ts, val) in zip(minute_range, vals):
dt = ts.astype(datetime).strftime("%Y-%m-%d %H:%M:%S")
insert_list.append((dt, int(val)))
i += 1
if i % 30 == 0:
execute_inserts()
insert_list = []
print(".", end="", flush=True)
execute_inserts()
cursor.close()
conn.close()