This document covers the answers to the following questions:
- How data is stored on disk for MergeTree engine family tables
- What are
parts
,granules
andmarks
- How and why choosing the correct
ORDER BY
andPARTITION BY
in table definitions affects query performance - How to use
EXPLAIN
to understand what ClickHouse is doing - Difference between
PREWHERE
andWHERE
- Data compression
Introduction to MergeTree
Why is ClickHouse so fast? states:
ClickHouse was initially built as a prototype to do just a single task well: to filter and aggregate data as fast as possible.
Rather than force all possible tasks to be solved by singular tools, ClickHouse provides specialized "engines" that each solve specific problems.
MergeTree engine family tables are intended for ingesting large amounts of data, storing that data efficiently, and running analytical queries on it.
How MergeTree stores data
Consider the following (simplified) table for storing sensor events:
CREATE TABLE sensor_values (timestamp DateTime,site_id UInt32,event VARCHAR,uuid UUID,metric_value Int32)ENGINE = MergeTree()ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)SETTINGS index_granularity = 8192
Data for this table would be stored in parts, each part a separate directory on disk. Data for a given part is always sorted by the order set in ORDER BY
statement and compressed.
Parts can be Wide
or Compact
depending on its size. We'll be mostly dealing with Wide
parts as part of day-to-day operations.
Wide
parts are large and store each column in a separate binary data file, which are sorted and compressed.
ClickHouse also stores a sparse index for the part. A collection of rows with size equal to the index_granularity
setting is called a granule. For every granule, the primary
index stores a mark containing the value of the ORDER BY
statement as well as a pointer to where that mark is located in each data file.
💡 For better performance when running queries, it is not recommended to set
index_granularity
too low. The default value for engines in theMergeTree
family is 8192. An implication of this is that accessing data by primary key (in this case theORDER BY
clause is equivalent to the primary key) will not read just one row, but rather up toindex_granularity
number of rows. This is acceptable given ClickHouse is meant to perform well with aggregations, rather than point lookups.
Diving deeper into data-on-disk for a Wide part
This assumes you're using a docker-based ClickHouse installation and have clickhouse-client
running
Seeding data
INSERT INTO sensor_valuesSELECT *FROM generateRandom('timestamp DateTime, site_id UInt8, event VARCHAR, uuid UUID, metric_value Int32', NULL, 10)LIMIT 200000000
Looking at part data
system.parts
table contains a lot of metadata about every part.
To find out what type each part is, its size, and where on disk it's located, you can run the following query:
SELECTname,part_type,rows,marks,formatReadableSize(bytes_on_disk),formatReadableSize(data_compressed_bytes),formatReadableSize(data_uncompressed_bytes),formatReadableSize(marks_bytes),pathFROM system.partsWHERE active and table = 'sensor_values'FORMAT Vertical
The result might look something like this:
Row 1:──────name: all_12_17_1part_type: Widerows: 6291270marks: 769formatReadableSize(bytes_on_disk): 476.07 MiBformatReadableSize(data_compressed_bytes): 475.92 MiBformatReadableSize(data_uncompressed_bytes): 474.00 MiBformatReadableSize(marks_bytes): 90.12 KiBpath: /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/
Inspecting data on disk
⟩ docker exec -it posthog_clickhouse_1 ls -lhS /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/total 477M-rw-r----- 1 clickhouse clickhouse 308M Nov 2 07:33 event.bin-rw-r----- 1 clickhouse clickhouse 97M Nov 2 07:33 uuid.bin-rw-r----- 1 clickhouse clickhouse 25M Nov 2 07:33 metric_value.bin-rw-r----- 1 clickhouse clickhouse 25M Nov 2 07:33 timestamp.bin-rw-r----- 1 clickhouse clickhouse 25M Nov 2 07:33 site_id.bin-rw-r----- 1 clickhouse clickhouse 58K Nov 2 07:33 primary.idx-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 event.mrk2-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 metric_value.mrk2-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 site_id.mrk2-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 timestamp.mrk2-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 uuid.mrk2-rw-r----- 1 clickhouse clickhouse 494 Nov 2 07:33 checksums.txt-rw-r----- 1 clickhouse clickhouse 123 Nov 2 07:33 columns.txt-rw-r----- 1 clickhouse clickhouse 10 Nov 2 07:33 default_compression_codec.txt-rw-r----- 1 clickhouse clickhouse 7 Nov 2 07:33 count.txt
What are these files?
- For every column, there's a
{column_name}.bin
file, containing the compressed (LZ4 compression by default) data for that column. These take up most of the space. - For every column, there's a
{column_name}.mrk2
file, contains an index with data to locate each granule in{column_name}.bin
file primary.idx
contains information on ORDER BY column values for each granule. This is loaded into memory during queries.checksums.txt
,columns.txt
,default_compression_codec.txt
andcount.txt
contain metadata about this part.
You can read more on the exact structure of these files and how they're used in ClickHouse Index Design documentation.
What does the Merge stand for?
In every system, data must be ingested and kept up-to-date somehow. When data is inserted into MergeTree tables, each insert creates one or multiple parts for the data inserted.
As having a lot of small files would be disadvantageous for many reasons from query performance to storage, ClickHouse regularly merges small parts together until they reach a maximum size.
The merge combines the two parts into a new one. This is similar to how merge sort works and atomically replaces the two source parts.
Merges can be monitored using the system.merges
table.
Query execution
Aggregation supported by ORDER BY
Our sensor_values
table is set up in a way that queries similar to the following are really fast to execute.
SELECTtoStartOfDay(timestamp),event,sum(metric_value) as total_metric_valueFROM sensor_valuesWHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'GROUP BY toStartOfDay(timestamp), eventORDER BY total_metric_value DESCLIMIT 20
Executing this reports:
20 rows in set. Elapsed: 0.042 sec. Processed 90.11 thousand rows, 3.54 MB (2.13 million rows/s., 83.60 MB/s.)
Why can it be fast? Because ClickHouse:
- leverages the table
ORDER BY
clause (ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)
) to skip reading a lot of data - is fast and efficient about I/O and aggregation
Let's dig into how the primary index for this query is used by using EXPLAIN
.
EXPLAIN indexes=1, header=1 SELECTtoStartOfDay(timestamp),event,sum(metric_value) as total_metric_valueFROM sensor_valuesWHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'GROUP BY toStartOfDay(timestamp), eventORDER BY total_metric_value DESCLIMIT 20FORMAT LineAsString
Show full `EXPLAIN` output
Expression (Projection)Header: toStartOfDay(timestamp) DateTimeevent Stringtotal_metric_value Int64Limit (preliminary LIMIT (without OFFSET))Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Sorting (Sorting for ORDER BY)Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Expression (Before ORDER BY)Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64AggregatingHeader: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Expression (Before GROUP BY)Header: event Stringmetric_value Int32toStartOfDay(timestamp) DateTimeFilter (WHERE)Header: timestamp DateTimeevent Stringmetric_value Int32SettingQuotaAndLimits (Set limits and quota after reading from storage)Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32ReadFromMergeTreeHeader: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32Indexes:PrimaryKeyKeys:site_idtoStartOfDay(timestamp)Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))Parts: 2/2Granules: 11/24415
The full output of explain is obtuse, but the most important part is also the most deeply nested one:
ReadFromMergeTreeHeader: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32Indexes:PrimaryKeyKeys:site_idtoStartOfDay(timestamp)Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))Parts: 2/2Granules: 11/24415
At the start of the query, ClickHouse loaded the primary index of each part into memory. From this output, we know that the query first used the primary key to filter based on site_id
and timestamp
values stored in the index.
This allowed it to know that only 11 out of 24415 granules (0.05%) contained any relevant data.
From there it read those 11 granules (11 * 8192 rows) worth of data from timestamp
, side_id
, event
and metric_value
columns and did the
rest of filtering and aggregation on that data alone.
See this documentation for a guide on how to choose ORDER BY
.
"Point queries" not supported by ORDER BY
Consider this query:
SELECT * FROM sensor_values WHERE uuid = '69028f26-768f-afef-1816-521b22d281ca'
Executing this query reports:
1 row in set. Elapsed: 0.703 sec. Processed 200.00 million rows, 3.20 GB (304.43 million rows/s., 4.87 GB/s.)
While the overall execution time of this query is not bad thanks to fast I/O, it needed to read 2200x the amount of data from disk. As the dataset size or column sizes increase, this performance would get dramatically worse.
Why is this query slower? Because our ORDER BY
does not support fast filtering by uuid
and ClickHouse needs to read the whole
table to find a single record and read all columns.
ClickHouse provides some ways to make this faster (e.g. Projections) but in general these require extra disk space or have other trade-offs.
Thus, it's important to make sure the ClickHouse schema is aligned with queries that are being executed.
PARTITION BY
Another tool to make queries faster is PARTITION BY
. Consider the updated table definition:
CREATE TABLE sensor_values (timestamp DateTime,site_id UInt32,event VARCHAR,uuid UUID,metric_value Int32)ENGINE = MergeTree()PARTITION BY intDiv(toYear(timestamp), 10)ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)SETTINGS index_granularity = 8192
Here, ClickHouse would generate one partition per 10 years of data, allowing to skip reading even the primary index in some cases.
In the underlying data, each part would belong to a single partition and only parts within a partition would get merged.
One additional benefit of partitioning by a derivate of timestamp is that if most queries touch recent data, you can also set up rules to automatically move older parts and partitions to cheaper storage or drop them entirely.
Query analysis
Let's use an identical query as before to explain with the new dataset:
SELECTtoStartOfDay(timestamp),event,sum(metric_value) as total_metric_valueFROM sensor_valuesWHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'GROUP BY toStartOfDay(timestamp), eventORDER BY total_metric_value DESCLIMIT 20
Show full `EXPLAIN` output
Expression (Projection)Header: toStartOfDay(timestamp) DateTimeevent Stringtotal_metric_value Int64Limit (preliminary LIMIT (without OFFSET))Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Sorting (Sorting for ORDER BY)Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Expression (Before ORDER BY)Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64AggregatingHeader: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Expression (Before GROUP BY)Header: event Stringmetric_value Int32toStartOfDay(timestamp) DateTimeFilter (WHERE)Header: timestamp DateTimeevent Stringmetric_value Int32SettingQuotaAndLimits (Set limits and quota after reading from storage)Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32ReadFromMergeTreeHeader: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32Indexes:MinMaxKeys:timestampCondition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))Parts: 2/14Granules: 3589/24421PartitionKeys:intDiv(toYear(timestamp), 10)Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))Parts: 2/2Granules: 3589/3589PrimaryKeyKeys:site_idtoStartOfDay(timestamp)Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))Parts: 2/2Granules: 12/3589
The relevant part of EXPLAIN is again nested deep within:
ReadFromMergeTreeHeader: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32Indexes:MinMaxKeys:timestampCondition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))Parts: 2/14Granules: 3589/24421PartitionKeys:intDiv(toYear(timestamp), 10)Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))Parts: 2/2Granules: 3589/3589PrimaryKeyKeys:site_idtoStartOfDay(timestamp)Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))Parts: 2/2Granules: 12/3589
What this tells us is that ClickHouse:
- First leverages an internal MinMax index on
timestamp
to whittle down the number of parts to 2/14 and granules to 3589/24421 - Then it tries to filter via the partition key but this doesn't narrow things down further
- Then, it loads and leverages the Primary key as before to narrow data down to 12 granules.
- Lastly reads, filters and aggregates data in those 12 granules
The benefit here is that it could skip reading the primary key index for most of the parts that did not contain relevant data. If and how much this speeds up the query however depends on the size of the dataset.
Choosing a good PARTITION BY
Use partitions wisely - each INSERT should ideally only touch 1-2 partitions and too many partitions will cause issues around replication or prove useless for filtering.
Loading the primary index/marks file might not be the bottleneck you expect, so be sure to benchmark different schemas against each other.
See the following Altinity documentation for more guidance:
- How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree-family table
- How much is too much? ClickHouse limitations
Other notes on MergeTree
Data is expensive to update
Updating data in ClickHouse is expensive and analogous to a schema migration.
For example, to update an event's properties, ClickHouse frequently needs to:
- Scan all the data to find what parts contain the relevant data. This isn't often covered by
ORDER BY
and thus quite expensive. - Rewrite the whole part (including any columns) - this could be potentially up to 150GB of data rewritten for a single update.
This makes things operationally hard. We mitigate this by:
- Writing duplicated rows for new data, using other table engines (e.g. ReplacingMergeTree) and accounting for this duplication in our queries.
- Batching up GDPR or other data deletions and doing them on a schedule rather than immediately.
No query planner
ClickHouse doesn't have a query planner in the sense PostgreSQL or other databases do.
On the one hand, you often end up fighting the query planner in other databases. If we know how ClickHouse works internally and can develop that into intuition for how SQL is executed, we're well-equipped to deal with performance issues as they arise.
On the other, this means that we'll need to be careful writing SQL as small changes can have huge performance implications.
Examples:
- For best performance, ClickHouse requires you "push" predicates in WHERE clauses into sub-queries rather than filtering at the outermost query.
- In the
sensor_values
queries above, the execution plan would have been slightly more optimal if the filter condition ontoYear(timestamp)
rather thantimestamp
.
One notable exception to "no query planner" is that ClickHouse often pushes predicates from WHERE
into PREWHERE
. Filters in PREWHERE
are executed first and ClickHouse
moves columns it thinks are "cheaper" or "more selective" into it. However putting the wrong column (e.g. a fat column containing JSON) in PREWHERE
can cause performance to tank.
Read more on PREWHERE
in the ClickHouse docs.
Data compression
Compression means that if subsequent column values of a given column are often similar or identical, the data compresses really well. At PostHog we frequently see uncompressed / compressed ratios of 20x-40x for JSON columns and 300x-2000x for sparse small columns.
Compression ratios have direct impact on query performance: I/O is often the bottleneck, meaning that highly compressed data can be read faster from disk at the cost of more CPU work for decompression.
By default columns are compressed by the LZ4
algorithm. We've found good success using ZSTD(3)
for storing JSON columns - see
benchmarks for more information.
Another tip is to use ClickHouse's LowCardinality
data type modifier on schemas where a given column will store values with low cardinality i.e. the total number of values is low. An example of this would be "country name".
Weak JOIN support
ClickHouse excels at aggregating data from a single table at a time. If you however have a query with JOINs or subqueries, the right-hand-side of the JOIN would be loaded into memory first. Thus, you should always have the bigger table on the left side of left-hand-side!
This means that at scale JOINs can kill performance. Read more on the effect of removing JOINs from our events database here:
Suggested reading
- ClickHouse MergeTree docs
- Why is ClickHouse so fast?
- Overview of ClickHouse Architecture
- ClickHouse Index Design
- How much is too much? ClickHouse limitations
- How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree-family table
Next in the ClickHouse manual: Data replication