Druid Data Cookbook: Cumulative Sums in Druid SQL
An often asked feature that is currently not present in Druid SQL are window functions. With these, you can create aggregations on levels other than what you are
GROUPing by, and you can relate rows within the same table to one another without resorting to a costly cartesian join. Until these capabilities are added, let’s look at a few use cases and find out how to model them in Druid SQL!
Many thanks to Gian Merlino and John Kowtko for inspiring the solutions I am summarizing in this blog.
The dataset for today’s tutorial looks like this:
ts,customer,revenue 2022-01-01,alice,10.50 2022-01-01,bob,11.50 2022-01-02,alice,12.50 2022-01-02,bob,13.50 2022-01-02,bob,14.00 2022-01-03,alice,14.50 2022-01-03,bob,15.50
Ingest it using the Druid console, using standard batch ingestion with a Paste source. I name the resulting Druid datasource
First, I want a report that shows, for each customer, how their cumulative revenue builds up over time, day by day. As a preparation, let’s aggregate our data to the level of granularity that we will need in the report:
SELECT DATE_TRUNC('DAY', "__time") AS date_day, customer, SUM(revenue) AS rev_daily FROM cume_data GROUP BY 1, 2
This is a common pattern with Druid: try to push groupings and filters down to the individual subqueries such that the final join can work on relatively small result sets.
Now, for each customer and date, join the current date against all dates up to and including that date, aggregating over current date and customer. Since the date condition is not a simple equals condition, it has to go into the
WITH cte AS ( SELECT DATE_TRUNC('DAY', "__time") AS date_day, customer, SUM(revenue) AS rev_daily FROM cume_data GROUP BY 1, 2 ) SELECT cte.date_day, cte.customer, SUM(t2.rev_daily) FROM cte INNER JOIN cte t2 ON cte.customer = t2.customer WHERE t2.date_day <= cte.date_day GROUP BY 1, 2 ORDER BY 1, 2
This way, I have built the equivalent of a
SUM() OVER (ROWS UNBOUNDED PRECEDING AND CURRENT ROW) aggregation.
I want a report that shows me the difference in spending to the previous day, per customer. The pattern is going to be similar to the previous one. Because I have, for each customer, a date that has no predecessor, I have to use a
LEFT JOIN clause though:
WITH cte AS ( SELECT DATE_TRUNC('DAY', "__time") AS date_day, customer, SUM(revenue) AS rev_daily FROM cume_data GROUP BY 1, 2 ) SELECT t2.date_day, t2.customer, t2.rev_daily - t1.rev_daily AS delta FROM cte t2 LEFT JOIN cte t1 ON t1.customer = t2.customer AND t2.date_day = t1.date_day + INTERVAL '1' DAY
For each customer, I join each day’s data row to the previous day’s row and compute the difference. I have just emulated a
Daily Contribution per Customer
Finally, I want to know the relative contribution of each customer to each day’s revenue. This is where, in many SQL dialects, you would use a window function like
SUM() OVER(PARTITION BY customer). We can emulate this by using grouping sets. Specifically, I am going to use
GROUP BY ROLLUP(), which treats the dimension list that I pass as a hierarchy.
Here’s the base query:
SELECT CASE WHEN "__time" IS NULL THEN NULL ELSE __time END AS date_day, customer, SUM(revenue) AS rev_daily, GROUPING(CASE WHEN "__time" IS NULL THEN NULL ELSE __time END, customer) AS rollup_bits FROM cume_data GROUP BY ROLLUP (1, 2)
Here’s the result:
Three things are worth noticing:
ROLLUPaggregation creates a grouping hierarchy. We group by all columns in the list, then by all but the last, then by all but the last two, and so forth up to the grand total. Each dimension that is left out gets a NULL in the respective place. (You can get all combinations with
CUBE, or an explicit list with
- In order to know which level a result row is aggregated to, you can use the special
GROUPING()aggregator. The result of
GROUPING()is a bitmask that has a
1bit for every level that has been rolled up - the most fine grained result rows give a
0value, the grand total is all ones. We will pick the rollup per day on one side of the join (this one will have a
1), and the detail rows (they have a
0) on the other side.
- The timestamp in Druid cannot be NULL. If you try to use it as part of a grouping sets aggregation, Druid complains about not being able to convert a
TIMESTAMP(3) NOT NULLto a
TIMESTAMP(3). I work around this with a somewhat unwieldy
CASEexpression that can (in theory) yield a NULL value.
With this, I am ready to assemble the final query:
WITH cte AS ( SELECT CASE WHEN "__time" IS NULL THEN NULL ELSE __time END AS date_day, customer, SUM(revenue) AS rev_daily, GROUPING(CASE WHEN "__time" IS NULL THEN NULL ELSE __time END, customer) AS rollup_bits FROM cume_data GROUP BY ROLLUP (1, 2) ) SELECT t1.date_day, t2.customer, 100.0 * t2.rev_daily / t1.rev_daily AS contribution_percent FROM cte t1 INNER JOIN cte t2 ON t1.date_day = t2.date_day WHERE t1.rollup_bits = 1 AND t2.rollup_bits = 0
Try it out!
While window aggregation functions are not explicitly available in Druid at the time of this writing, there are numerous ways to get to the desired results.
- Many queries that use window aggregation functions can be modeled in Druid using self joins.
- Make sure to push down aggregations and filters in order to keep the result sets manageable.
- Grouping set aggregations can be helpful when you need multiple aggregation levels within the same query.
“This image is taken from Page 500 of Praktisches Kochbuch für die gewöhnliche und feinere Küche” by Medical Heritage Library, Inc. is licensed under CC BY-NC-SA 2.0 .