ClickHouse Data Cookbook: Aggregating Latest Values
Let’s pick up one of my earlier blogs. In that blog, I took a very simple data set to show how Druid can aggregate data upon ingestion. Today, I will show how much easier this works with ClickHouse.
You can use any version of ClickHouse for this exercise. For instance, you can run ClickHouse locally on your laptop. Or check out the installation docs for other options.
In today’s tutorial, you will
- create a detail table for stock ticker data using schema inference
- create an aggregate table to store the latest price value per ticker symbol and time interval
- create a materialized view to automatically populate the aggregate whenever new rows are added to the detail table
- insert some data and query the aggregate table
- insert some more data and observe how the query result changes.
Creating the sample data set
First, create a text file with these data:
timestamp,symbol,price
2022-08-10 10:01:00,AAAA,25.90
2022-08-10 10:11:00,AAAA,26.90
2022-08-10 11:55:00,AAAA,28.10
Save this file as ticker1.csv
.
Likewise, create a file ticker2.csv
with these data:
timestamp,symbol,price
2022-08-10 10:50:00,AAAA,23.90
2022-08-10 11:20:00,AAAA,22.10
Setting up the schema
Create a table using the first file as a template. We use the file just to infer the table schema, we don’t want to insert any data just yet. ClickHouse allows you to use CREATE AS SELECT
with the EMPTY
clause to do just that. This will be a MergeTree table that stores the detail rows.
CREATE TABLE ticker_data
ENGINE = MergeTree
ORDER BY (symbol, timestamp) EMPTY
AS SELECT *
FROM file('ticker1.csv')
SETTINGS schema_inference_make_columns_nullable = 0
Worth mentioning is the last line which makes the inferred columns non nullable. When inferring from a CSV file, by default ClickHouse makes all columns nullable. But columns that participate in the sort key cannot be nullable (this can be overridden but is probably not a good idea.) Also, since there are no NULL values in the data, the best way to manage this is to force all columns to be non-NULL.
Now define the aggregate table. This will be an AggregatingMergeTree table, where some of the aggregation magic happens inside the table engine:
CREATE TABLE ticker_data_agg
(
`timestampHour` DateTime,
`symbol` String,
`price` AggregateFunction(argMax, Float64, Timestamp)
)
ENGINE = AggregatingMergeTree
ORDER BY (symbol, timestampHour)
We will aggregate the data by hour - the timestamp will be truncated as we will see in a moment.
Data aggregation is defined in three steps:
- In the aggregate table definition
- the dimensions go into the order key
- the measures rather than plain values, are defined as
AggregateFunction
types, which store intermediate state information. TheAggregateFunction
constructor is fed information about the aggregate function to be used, and the data types it operates on. In our case, this isargMax
, which returns the value of its first argument where the second argument has a maximum. The second argument will be the timestamp.
- When populating the aggregate table
- the dimensions go into the group by clause
- measures use -State aggregators, in our case it will be
argMaxState
.
- Query the aggregated measures with -Merge aggregator functions, here:
argMaxMerge
.
Now let’s define a materialized view that will aggregate the ticker data:
CREATE MATERIALIZED VIEW ticker_data_mv TO ticker_data_agg
AS SELECT
toStartOfHour(timestamp) AS timestampHour,
symbol,
argMaxState(price, timestamp) AS price
FROM ticker_data
GROUP BY
timestampHour,
symbol
This will truncate the timestamp to the desired granularity (hour) and perform the aggregation.
Populating the tables
Insert the first set of data:
INSERT INTO ticker_data SELECT * FROM file('ticker1.csv');
and query the aggregate at the configured time granularity:
:) SELECT timestampHour, symbol, argMaxMerge(price) AS latestPrice FROM ticker_data_agg GROUP BY timestampHour, symbol ORDER BY timestampHour, symbol;
SELECT
timestampHour,
symbol,
argMaxMerge(price) AS latestPrice
FROM ticker_data_agg
GROUP BY
timestampHour,
symbol
ORDER BY
timestampHour ASC,
symbol ASC
Query id: b832dc41-a3db-4463-87a4-fe829bc120d0
┌───────timestampHour─┬─symbol─┬─latestPrice─┐
1. │ 2022-08-10 10:00:00 │ AAAA │ 26.9 │
2. │ 2022-08-10 11:00:00 │ AAAA │ 28.1 │
└─────────────────────┴────────┴─────────────┘
2 rows in set. Elapsed: 0.012 sec.
and at a coarser granularity:
:) SELECT toStartOfDay(timestampHour) AS timestampDay, symbol, argMaxMerge(price) AS latestPrice FROM ticker_data_agg GROUP BY timestampDay, symbol ORDER BY timestampDay, symbol;
SELECT
toStartOfDay(timestampHour) AS timestampDay,
symbol,
argMaxMerge(price) AS latestPrice
FROM ticker_data_agg
GROUP BY
timestampDay,
symbol
ORDER BY
timestampDay ASC,
symbol ASC
Query id: bb33cc74-73fe-4d35-8009-136ca73b6579
┌────────timestampDay─┬─symbol─┬─latestPrice─┐
1. │ 2022-08-10 00:00:00 │ AAAA │ 28.1 │
└─────────────────────┴────────┴─────────────┘
1 row in set. Elapsed: 0.008 sec.
Adding more data
Insert the second batch
:) INSERT INTO ticker_data SELECT * FROM file('ticker2.csv');
If you query the data now with group by and merge, you get the updated rows
:) SELECT timestampHour, symbol, argMaxMerge(price) AS latestPrice FROM ticker_data_agg GROUP BY timestampHour, symbol ORDER BY timestampHour, symbol;
SELECT
timestampHour,
symbol,
argMaxMerge(price) AS latestPrice
FROM ticker_data_agg
GROUP BY
timestampHour,
symbol
ORDER BY
timestampHour ASC,
symbol ASC
Query id: 2638d26a-ecdb-484f-9de7-3e8ca26e3724
┌───────timestampHour─┬─symbol─┬─latestPrice─┐
1. │ 2022-08-10 10:00:00 │ AAAA │ 23.9 │
2. │ 2022-08-10 11:00:00 │ AAAA │ 28.1 │
└─────────────────────┴────────┴─────────────┘
2 rows in set. Elapsed: 0.020 sec.
Note how the 11am entry has not changed because the added data had an earlier timestamp than the data already present in the table. Thus, late arriving data is automatically handled correctly.
Also, now there are actually four rows in the aggregate table (at least until a background merge happens). A SELECT *
statement shows this. It also shows that the aggregated measures are actually stored in a binary format that is output as garbled characters.
:) SELECT * FROM ticker_data_agg;
SELECT *
FROM ticker_data_agg
Query id: 27981039-a429-4399-a98c-fd2620b4b1b8
┌───────timestampHour─┬─symbol─┬─price──────┐
1. │ 2022-08-10 10:00:00 │ AAAA │ fffff?7@8q?b │
2. │ 2022-08-10 11:00:00 │ AAAA │ ?????6@@x?b │
└─────────────────────┴────────┴────────────┘
┌───────timestampHour─┬─symbol─┬─price─────┐
3. │ 2022-08-10 10:00:00 │ AAAA │ fffff?:@h?b │
4. │ 2022-08-10 11:00:00 │ AAAA │ ?????<@t??b │
└─────────────────────┴────────┴───────────┘
4 rows in set. Elapsed: 0.010 sec.
You can (but probably shouldn’t) force a background merge like so:
:) OPTIMIZE TABLE ticker_data_agg;
OPTIMIZE TABLE ticker_data_agg
Query id: 294b0d27-d826-4f72-899f-06b9ff80f159
Ok.
0 rows in set. Elapsed: 0.015 sec.
:) SELECT * FROM ticker_data_agg;
SELECT *
FROM ticker_data_agg
Query id: 2f5c65ed-b3ef-4fa1-88cd-76ebdd194072
┌───────timestampHour─┬─symbol─┬─price──────┐
1. │ 2022-08-10 10:00:00 │ AAAA │ fffff?7@8q?b │
2. │ 2022-08-10 11:00:00 │ AAAA │ ?????<@t??b │
└─────────────────────┴────────┴────────────┘
2 rows in set. Elapsed: 0.006 sec.
Now, there are only two rows left.
Conclusion
- ClickHouse’s AggregatingMergeTree table engine allows aggregating data on the fly. Data should be merged upon query, and is compacted asynchronously in the background.
- Each standard aggregation function has modifiers that allow it to store intermediate aggregation state inside the aggregate table.
- Using materialized views, you can populate a detail and aggregate table simultaneously.
“This image is taken from Page 377 of Praktisches Kochbuch für die gewöhnliche und feinere Küche” by Medical Heritage Library, Inc. is licensed under CC BY-NC-SA 2.0 .