Data Cookbook Kitchen

One of the most flexible ways to get data from Kafka into ClickHouse is the Kafka Table Engine. This allows to connect to Kafka in various ways, and it also supports multiple data formats. However, when you use Confluent’s Schema Registry (or a compatible service) to enforce a data contract, support is limited to Avro format.

What if one uses JSON with the Schema registry instead? In that case, Confluent prepends a binary metadata header to the JSON data. And the resulting mixed format cannot easily be ingested by ClickHouse.

ClickHouse 25.11 introduced a new setting for use with Kafka table engine, kafka_schema_registry_skip_bytes. It still does not interpret the schema registry header, but at least you can ignore the binary header. Let’s see how that looks in practice, and what else we can do around Schema Registry!

Kafka Engine Setup

As the documentation states, the header length is 5 bytes, so setting kafka_schema_registry_skip_bytes = 5 will do fine.

CREATE OR REPLACE TABLE json_sr_queue (
    ip String,
    userid Int32,
    remote_user String,
    time String,
    _time Int64,
    request String,
    status String,
    bytes UInt64,
    referrer String,
    agent String
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = '<Kafka broker>', 
    kafka_security_protocol = 'sasl_ssl', 
    kafka_sasl_mechanism = 'PLAIN', 
    kafka_sasl_username = '<Kafka API key>', 
    kafka_sasl_password = '<Kafka API secret>', 
    kafka_topic_list = 'hellmar_json_sr', 
    kafka_group_name = 'group1', 
    kafka_format = 'JSONEachRow',
    kafka_schema_registry_skip_bytes = 5;

CREATE OR REPLACE TABLE json_sr_landing (
    ip String,
    userid Int32,
    remote_user String,
    time String,
    _time Int64,
    request String,
    status String,
    bytes UInt64,
    referrer String,
    agent String
)
ENGINE = MergeTree ORDER BY userid;

DROP VIEW IF EXISTS json_sr_mv;
CREATE MATERIALIZED VIEW json_sr_mv TO json_sr_landing AS SELECT * FROM json_sr_queue;

Read the Schema ID

We can also read only the metadata header. We have to do a bit of data massaging to account for endianness.

CREATE OR REPLACE TABLE json_sr_readschema_queue (
    blob String
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = '<Kafka broker>', 
    kafka_security_protocol = 'sasl_ssl', 
    kafka_sasl_mechanism = 'PLAIN', 
    kafka_sasl_username = '<Kafka API key>', 
    kafka_sasl_password = '<Kafka API secret>', 
    kafka_topic_list = 'hellmar_json_sr', 
    kafka_group_name = 'group2', 
    kafka_format = 'RawBLOB';

CREATE OR REPLACE TABLE json_sr_readschema (
    magic UInt8,
    schema_id UInt32
)
ENGINE = MergeTree
ORDER BY schema_id;

DROP VIEW IF EXISTS json_sr_mv;
CREATE MATERIALIZED VIEW json_sr_mv TO json_sr_readschema AS
SELECT
    reinterpretAsUInt8(left(blob, 1)) AS magic,
    byteSwap(reinterpretAsUInt32(substring(blob, 2, 4))) AS schema_id
FROM json_sr_readschema_queue;

SELECT magic, schema_id FROM json_sr_readschema LIMIT 1;

Query the Schema Registry

We can also query the Schema Registry directly, for instance by Schema ID. Here is an example.

CREATE TABLE json_schema(
    schema String
)
ENGINE = MergeTree()
ORDER BY tuple();

INSERT INTO json_schema
SELECT json.schema AS schema
FROM url('https://<SR endpoint>/schemas',
    'JSONAsObject',
    headers('Authorization'=concat('Basic ', base64Encode('<SR API key>:<SR API secret>')))
)
WHERE json.id = 100014;

SELECT * FROM json_schema;

This image is taken from Page 377 of Praktisches Kochbuch für die gewöhnliche und feinere Küche” by Medical Heritage Library, Inc. is licensed under CC BY-NC-SA 2.0 .