If you work with Data - especially engineering - and are on LinkedIn, odds are you know who Zach Wilson is.
He's arguably the world's second most famous Data Engineer, losing only to Luigi Mangione. However, Zach is known because of data, not because of crime, so maybe he should be first on the fame list.
Anywaysss
Zach has worked on Facebook, Netflix, and Airbnb. The sheer volume of data his pipelines have processed makes his advice on scaling automatically valuable.
One of his pieces of advice is actually a GitHub repo with a technique he calls “cumulative table design.” This method aggregates data in a super efficient way, so of course, we're going to learn it together.
The premise
Let's say you need an aggregate metric that always looks 30 days in the past. The wasteful way of calculating this is to scan 30 days of data everyday. Zach points this as an inefficient way:
SELECT
COUNT(DISTINCT user_id) as num_monthly_active_users,
COUNT(like) as num_likes_30d,
[...]
FROM events
WHERE event_date BETWEEN DATE_SUB('2022-01-01', 30), AND '2022-01-01'
His advice is: instead of scanning 30 days, why don't we store the past 29 days and only scan a single day worth of data from the events?
To do that, we're going to create a table with aggregated and nested columns. For each dimension we're analyzing, we're going to create (1) an array
column and (2) an aggregate column with the sum of the array values.
The pipeline will have this logic:
Let's code
You can follow along my code in this GitHub repo I created. I used Poetry and DuckDB, which will make your life easy if you want to code along with me.
In order to really learn the concepts, I changed the domain of the original project. I used this dataset from Kaggle, containing results from International Football matches.
(This helped me because it forced me to understand the logic and apply to a different domain.)
Moving on, here's the metric I want to build: for every National Team, what's the amount of wins and goals in the past 7, 30 and 90 days?
Preparing raw data
My raw data is going to be processed by this script. Note that I'm using two files, but only one of them is going to be compatible with the cumulative design. More on that later.
In this script I apply the DDL to create the table, then I insert the data in it, with a generous filter.
conn.execute("""
INSERT INTO matches_silver
SELECT
md5( concat_ws('_', date, home_team, away_team) ) as match_id,
date as match_date,
home_team,
away_team,
home_score,
away_score
FROM '../data/bronze/results.csv'
WHERE date > '1960-01-01'
""")
Preparing the table X: the daily aggregate
The second step contains the logic to aggregate the events per day using the dimensions you choose. In our case, we chose goals and wins.
So our SQL logic for this table is this:
INSERT INTO team_won_daily
SELECT
home_team as team,
IF(home_score > away_score, 1, 0) as did_win,
home_score as goals_in_match,
match_date
FROM matches_silver
GROUP BY home_team, home_score, away_score, match_date
UNION
SELECT
away_team as team,
IF(away_score > home_score, 1, 0) as did_win,
away_score as goals_in_match,
match_date
FROM matches_silver
GROUP BY away_team, home_score, away_score, match_date
I created this UNION
logic to address the fact that each row of the source data contains two entities: one match contains results from two countries.
This step shows a small difference in the way I'm coding this. Zach's repo assumes a day-after-day logic. I'm doing it more in a batch/bulk way (which is going to teach us a thing or two about how to backfill this).
The final boss: the cumulated table
In this step, to successfully accumulate the data, we're going to compare the daily aggregate table day X with the cumulated table day X -1.
The intuition of this step is: the cumulated data contains array columns with past 7, 30 and 90 days. So, if today's data is day X, we'll compare it with the aggregated status of day X-1 and update it accordingly.
Here's the SQL for this phase:
INSERT INTO team_won_cumulated
WITH existing AS (
SELECT * FROM team_won_cumulated
WHERE snapshot_date = '{existing_day_str}'
),
to_add AS (
SELECT * FROM team_won_daily
WHERE match_date = '{day_to_add_str}'
),
combined AS (
SELECT
COALESCE(y.team, t.team) AS team,
COALESCE(
CASE WHEN array_length(y.wins_array) < 90 THEN
[COALESCE(t.did_win, 0)] || y.wins_array
ELSE
[COALESCE(t.did_win, 0)] || list_reverse(list_slice(list_reverse(y.wins_array), 2, 89))
END,
[t.did_win]
) AS wins_array,
COALESCE(
CASE WHEN array_length(y.goals_array) < 90 THEN
[COALESCE(t.goals_in_match, 0)] || y.goals_array
ELSE
[COALESCE(t.goals_in_match, 0)] || list_reverse(list_slice(list_reverse(y.goals_array), 2, 89))
END,
[t.goals_in_match]
) AS goals_array,
DATE '{day_to_add_str}' AS snapshot_date
FROM existing y
FULL OUTER JOIN to_add t ON y.team = t.team
)
-- building the final table from the CTEs
SELECT
team,
wins_array,
goals_array,
wins_array[1] AS won_last_match,
list_sum(list_slice(wins_array, 1, 7)) AS num_wins_7d,
list_sum(list_slice(goals_array, 1, 7)) AS num_goals_7d,
list_sum(list_slice(wins_array, 1, 30)) AS num_wins_30d,
list_sum(list_slice(goals_array, 1, 30)) AS num_goals_30d,
list_sum(wins_array) AS num_wins_90d,
list_sum(goals_array) AS num_goals_90d,
snapshot_date
FROM combined;
Note that there are many COALESCE operations. They are designed to handle the cases where a new entry appears in the table, so we'll need to (a) get the team name from the daily aggregate table or (b) create a new array with a single value.
This is Zach's explanation for these operations. Note that his code for this step differs from mine because I had to convert the functions to DuckDB syntax.
I'll give you a preview of what the end table looks like. Here's the result for this query:
SELECT * from team_won_cumulated
where team = 'Brazil'
and
snapshot_date BETWEEN '2024-10-10' and '2024-10-15'
Thoughts on backfilling
As Zach said, the main drawback is that backfilling is a pain in the ass, since you have to do it sequentially.
One way this could look like is in this file, where I create a range of 90 days and compare days i
and i-1
to feed the cumulated table.
Why couldn't I cumulate the goals table?
You'll see in my repo that I tried accumulating goals metrics as well. But this aggregation assumes that some snapshots will register the desired event, and some won't.
This is why the matches aggregation works: in some records Brazil will win, and in others it will not.
However, the goals table only registers the scores. It doesn't have goals=0
type of records.
One way we could make it work is to create a unique ID for matches and join goals
with matches
on match_id.
This way, we can assess whether a player scored or not in a given game.
Maybe you can do this as homework, lol.
Hope you liked this!
I had a great time learning this solution, and I can't wait to use it in production.