Data Cookbook Kitchen

Let’s pick up one of my earlier blogs. In that blog, I took a very simple data set to show how Druid can aggregate data upon ingestion. Today, I will show how much easier this works with ClickHouse.

You can use any version of ClickHouse for this exercise. For instance, you can run ClickHouse locally on your laptop. Or check out the installation docs for other options.

In today’s tutorial, you will

  • create a detail table for stock ticker data using schema inference
  • create an aggregate table to store the latest price value per ticker symbol and time interval
  • create a materialized view to automatically populate the aggregate whenever new rows are added to the detail table
  • insert some data and query the aggregate table
  • insert some more data and observe how the query result changes.

Creating the sample data set

First, create a text file with these data:

timestamp,symbol,price
2022-08-10 10:01:00,AAAA,25.90
2022-08-10 10:11:00,AAAA,26.90
2022-08-10 11:55:00,AAAA,28.10

Save this file as ticker1.csv.

Likewise, create a file ticker2.csv with these data:

timestamp,symbol,price
2022-08-10 10:50:00,AAAA,23.90
2022-08-10 11:20:00,AAAA,22.10

Setting up the schema

Create a table using the first file as a template. We use the file just to infer the table schema, we don’t want to insert any data just yet. ClickHouse allows you to use CREATE AS SELECT with the EMPTY clause to do just that. This will be a MergeTree table that stores the detail rows.

CREATE TABLE ticker_data
ENGINE = MergeTree
ORDER BY (symbol, timestamp) EMPTY
AS SELECT *
FROM file('ticker1.csv')
SETTINGS schema_inference_make_columns_nullable = 0

Worth mentioning is the last line which makes the inferred columns non nullable. When inferring from a CSV file, by default ClickHouse makes all columns nullable. But columns that participate in the sort key cannot be nullable (this can be overridden but is probably not a good idea.) Also, since there are no NULL values in the data, the best way to manage this is to force all columns to be non-NULL.

Now define the aggregate table. This will be an AggregatingMergeTree table, where some of the aggregation magic happens inside the table engine:

CREATE TABLE ticker_data_agg
(
    `timestampHour` DateTime,
    `symbol` String,
    `price` AggregateFunction(argMax, Float64, Timestamp)
)
ENGINE = AggregatingMergeTree
ORDER BY (symbol, timestampHour)

We will aggregate the data by hour - the timestamp will be truncated as we will see in a moment.

Data aggregation is defined in three steps:

  1. In the aggregate table definition
    • the dimensions go into the order key
    • the measures rather than plain values, are defined as AggregateFunction types, which store intermediate state information. The AggregateFunction constructor is fed information about the aggregate function to be used, and the data types it operates on. In our case, this is argMax, which returns the value of its first argument where the second argument has a maximum. The second argument will be the timestamp.
  2. When populating the aggregate table
    • the dimensions go into the group by clause
    • measures use -State aggregators, in our case it will be argMaxState.
  3. Query the aggregated measures with -Merge aggregator functions, here: argMaxMerge.

Now let’s define a materialized view that will aggregate the ticker data:

CREATE MATERIALIZED VIEW ticker_data_mv TO ticker_data_agg
AS SELECT
    toStartOfHour(timestamp) AS timestampHour,
    symbol,
    argMaxState(price, timestamp) AS price
FROM ticker_data
GROUP BY
    timestampHour,
    symbol

This will truncate the timestamp to the desired granularity (hour) and perform the aggregation.

Populating the tables

Insert the first set of data:

INSERT INTO ticker_data SELECT * FROM file('ticker1.csv');

and query the aggregate at the configured time granularity:

:) SELECT timestampHour, symbol, argMaxMerge(price) AS latestPrice FROM ticker_data_agg GROUP BY timestampHour, symbol ORDER BY timestampHour, symbol;

SELECT
    timestampHour,
    symbol,
    argMaxMerge(price) AS latestPrice
FROM ticker_data_agg
GROUP BY
    timestampHour,
    symbol
ORDER BY
    timestampHour ASC,
    symbol ASC

Query id: b832dc41-a3db-4463-87a4-fe829bc120d0

   ┌───────timestampHour─┬─symbol─┬─latestPrice─┐
1. │ 2022-08-10 10:00:00 │ AAAA   │        26.9 │
2. │ 2022-08-10 11:00:00 │ AAAA   │        28.1 │
   └─────────────────────┴────────┴─────────────┘

2 rows in set. Elapsed: 0.012 sec.

and at a coarser granularity:

:) SELECT toStartOfDay(timestampHour) AS timestampDay, symbol, argMaxMerge(price) AS latestPrice FROM ticker_data_agg GROUP BY timestampDay, symbol ORDER BY timestampDay, symbol;

SELECT
    toStartOfDay(timestampHour) AS timestampDay,
    symbol,
    argMaxMerge(price) AS latestPrice
FROM ticker_data_agg
GROUP BY
    timestampDay,
    symbol
ORDER BY
    timestampDay ASC,
    symbol ASC

Query id: bb33cc74-73fe-4d35-8009-136ca73b6579

   ┌────────timestampDay─┬─symbol─┬─latestPrice─┐
1. │ 2022-08-10 00:00:00 │ AAAA   │        28.1 │
   └─────────────────────┴────────┴─────────────┘

1 row in set. Elapsed: 0.008 sec.

Adding more data

Insert the second batch

:) INSERT INTO ticker_data SELECT * FROM file('ticker2.csv');

If you query the data now with group by and merge, you get the updated rows

:) SELECT timestampHour, symbol, argMaxMerge(price) AS latestPrice FROM ticker_data_agg GROUP BY timestampHour, symbol ORDER BY timestampHour, symbol;

SELECT
    timestampHour,
    symbol,
    argMaxMerge(price) AS latestPrice
FROM ticker_data_agg
GROUP BY
    timestampHour,
    symbol
ORDER BY
    timestampHour ASC,
    symbol ASC

Query id: 2638d26a-ecdb-484f-9de7-3e8ca26e3724

   ┌───────timestampHour─┬─symbol─┬─latestPrice─┐
1. │ 2022-08-10 10:00:00 │ AAAA   │        23.9 │
2. │ 2022-08-10 11:00:00 │ AAAA   │        28.1 │
   └─────────────────────┴────────┴─────────────┘

2 rows in set. Elapsed: 0.020 sec.

Note how the 11am entry has not changed because the added data had an earlier timestamp than the data already present in the table. Thus, late arriving data is automatically handled correctly.

Also, now there are actually four rows in the aggregate table (at least until a background merge happens). A SELECT * statement shows this. It also shows that the aggregated measures are actually stored in a binary format that is output as garbled characters.

:) SELECT * FROM ticker_data_agg;

SELECT *
FROM ticker_data_agg

Query id: 27981039-a429-4399-a98c-fd2620b4b1b8

   ┌───────timestampHour─┬─symbol─┬─price──────┐
1. │ 2022-08-10 10:00:00 │ AAAA   │ fffff?7@8q?b │
2. │ 2022-08-10 11:00:00 │ AAAA   │ ?????6@@x?b      │
   └─────────────────────┴────────┴────────────┘
   ┌───────timestampHour─┬─symbol─┬─price─────┐
3. │ 2022-08-10 10:00:00 │ AAAA   │ fffff?:@h?b │
4. │ 2022-08-10 11:00:00 │ AAAA   │ ?????<@t??b      │
   └─────────────────────┴────────┴───────────┘

4 rows in set. Elapsed: 0.010 sec.

You can (but probably shouldn’t) force a background merge like so:

:) OPTIMIZE TABLE ticker_data_agg;

OPTIMIZE TABLE ticker_data_agg

Query id: 294b0d27-d826-4f72-899f-06b9ff80f159

Ok.

0 rows in set. Elapsed: 0.015 sec. 

:) SELECT * FROM ticker_data_agg;

SELECT *
FROM ticker_data_agg

Query id: 2f5c65ed-b3ef-4fa1-88cd-76ebdd194072

   ┌───────timestampHour─┬─symbol─┬─price──────┐
1. │ 2022-08-10 10:00:00 │ AAAA   │ fffff?7@8q?b │
2. │ 2022-08-10 11:00:00 │ AAAA   │ ?????<@t??b       │
   └─────────────────────┴────────┴────────────┘

2 rows in set. Elapsed: 0.006 sec. 

Now, there are only two rows left.

Conclusion

  • ClickHouse’s AggregatingMergeTree table engine allows aggregating data on the fly. Data should be merged upon query, and is compacted asynchronously in the background.
  • Each standard aggregation function has modifiers that allow it to store intermediate aggregation state inside the aggregate table.
  • Using materialized views, you can populate a detail and aggregate table simultaneously.

This image is taken from Page 377 of Praktisches Kochbuch für die gewöhnliche und feinere Küche” by Medical Heritage Library, Inc. is licensed under CC BY-NC-SA 2.0 .