Test Tubes

In addition to segmenting data by time, Druid allows to introduce secondary partitioning within each time chunk. There are various strategies:

  • dynamic partitioning
  • hash partitioning
  • single dim partitioning
  • new in Druid 0.22: range (or multi dim) partitioning.

I am going to take a look at each of these in the next few posts.

Let’s use the Wikipedia sample data to do some experiments. You can use the Druid quickstart for this tutorial.

Dynamic Partitioning

Generally speaking, dynamic partitioning does not do a lot to the data with regards to organizing or reordering them. You will get one or more partitions per input data blob for batch ingestion, and one or more partitions per Kafka partition for realtime Kafka ingestion. Often times, the resulting segment files are smaller than desired. The upside of this strategy is that ingestion with dynamic partitioning is faster than any other strategy, and it doesn’t need to buffer or shuffle data. The downside is that it doesn’t do anything to optimize query performance.

For the first experiment, follow the quickstart tutorial step by step, with one exception: Set the maximum number of rows per segment to 14,000, because we want to have more than one partition:

Setting the partition size

Also, name the datasource wikipedia-dynamic-1 (we will create more versions of this.) Run the ingestion, it will finish within a few seconds. Now let’s look what we have:

Segment list

Because the sample has roughly 40,000 rows of data, Druid has created three partitions. Let’s take a closer look at the partitions.

In the quickstart setup, the segments live in var/druid/segments/, and here’s the structure Druid has created for the new datasource:

wikipedia-dynamic-1
└── 2015-09-12T00:00:00.000Z_2015-09-13T00:00:00.000Z
    └── 2022-01-05T16:03:35.016Z
        ├── 0
        │   └── index.zip
        ├── 1
        │   └── index.zip
        └── 2
            └── index.zip

So, we have datasource-name / time chunk / version timestamp / partition number. Each leaf file is a zip archive with the segment data, the format is described here.

Druid comes with a dump-segment tool, which has a number of interesting options. But in order to use it, we need to unzip the index.zip files first. Let’s dump the data and look at the distribution of one dimension: I am going to pick the channel dimension, because it is well populated and has a limited number of values that occur frequently.

#!/bin/bash

# path settings for default quickstart install

DRUID_HOME="${HOME}/apache-druid-0.22.1"
DRUID_DATADIR="${DRUID_HOME}/var/druid/segments"
DRUID_DATASOURCE="wikipedia-dynamic-1"
DUMP_TMPDIR="${HOME}/tmpwork"

TOPN=5

FILES=$(cd ${DRUID_DATADIR} && find ${DRUID_DATASOURCE} -type f)

for f in $FILES; do
  FDIR=$(dirname $f)
  FBASE=$(basename $f)
  PARTN=$(cut -d "/" -f 4 <<<$f)
  ODIR=${DUMP_TMPDIR}/$FDIR
  mkdir -p $ODIR
  unzip -q -o ${DRUID_DATADIR}/$f -d $ODIR

  java -classpath "${DRUID_HOME}/lib/*" -Ddruid.extensions.loadList="[]" org.apache.druid.cli.Main \
    tools dump-segment \
    --directory $ODIR \
    --out $ODIR/output.txt

  # Top Countries Report
  echo Partition $PARTN:
  jq -r ".channel" $ODIR/output.txt | sort | uniq -c | sort -nr | head -$TOPN

done

And here is the output that I get:

Partition 0:
4476 #vi.wikipedia
4061 #en.wikipedia
 650 #zh.wikipedia
 638 #de.wikipedia
 514 #it.wikipedia
Partition 1:
3854 #en.wikipedia
3152 #vi.wikipedia
1080 #de.wikipedia
 921 #fr.wikipedia
 597 #it.wikipedia
Partition 2:
3634 #en.wikipedia
2119 #vi.wikipedia
 872 #uz.wikipedia
 805 #de.wikipedia
 730 #fr.wikipedia

The most frequent values are all scattered over all partitions! This way, even if a query filters on a single channel value, it will have to read all segments. And for a group by query, all the groups will have to be assembled in the query node. This partitioning strategy does not make queries any faster.

Hash Partitioning

Now, let’s try something different.

Hash partitioning computes a hash key based on a set of one or more dimension values, and assigns the partition based on that key. What are the consequences?

Set the partitioning to hashed, and enter channel for the dimensions. Also set the target rows to 14,000, so we get three partitions again.

Hash Partitioning

For good measure, name this datasource wikipedia-hashed. Running the same analysis as above gives this result:

Partition 0:
11549 #en.wikipedia
2099 #fr.wikipedia
1126 #zh.wikipedia
 472 #pt.wikipedia
 445 #nl.wikipedia
Partition 1:
2523 #de.wikipedia
1386 #ru.wikipedia
1256 #es.wikipedia
 533 #ko.wikipedia
 478 #ca.wikipedia
Partition 2:
9747 #vi.wikipedia
1383 #it.wikipedia
 983 #uz.wikipedia
 749 #ja.wikipedia
 565 #pl.wikipedia

We got segments of roughly the same size, and each value of channel ends up in only one segment. However, as a rule, similar values of the partition key do not end up in the same segment. Thus, range queries will not benefit. Queries with a single value filter can be faster, though. Also, TopN queries will give more accurate results than with dynamic partitioning.

Conclusion

So, why would you even use dynamic partitioning? There are two reasons, or maybe three:

  • If you are ingesting realtime data, you need to use dynamic partitioning.
  • Also, you can always add (append) data to an existing time chunk with dynamic partitioning.
  • The third reason: you do not need to reshuffle data between partitions, so ingestion is faster and less resource intensive than with other partitioning schemes.

Hash partitioning helps ensure a proper distribution of the data, but has limited query performance benefits.

For query optimization, we have to look at the remaining partitioning schemes. Stay tuned!


“Remedy Tea Test tubes” by storyvillegirl is licensed under CC BY-SA 2.0, cropped and edited by me