Pizza

Druid 24.0 brought the ability to ingest and store nested (JSON) columns directly. In most cases, this would eliminate the need to flatten JSON data during ingestion.

Inspired by real world experience, I am showing how to work with JSON columns and Kafka timestamps in the same ingestion spec, and how column flattening and nested columns complement each other to enable this use case.

This tutorial uses Aiven’s pizza simulator. I assume that you are running Kafka locally in a single broker configuration, and that you are producing data to a topic named pizza.

The tutorial uses Druid 24.0 quickstart.

The Data

The data generator simulates pizza orders. Here’s a sample record:

{
  "id": 14492,
  "shop": "Mammamia Pizza",
  "name": "Juan Gonzalez",
  "phoneNumber": "5474461494",
  "address": "462 Williams Tunnel Apt. 424\nNicoletown, WY 02258",
  "pizzas": [
    {
      "pizzaName": "Mari & Monti",
      "additionalToppings": [
        "🍅 tomato",
        "🧅 onion"
      ]
    },
    {
      "pizzaName": "Diavola",
      "additionalToppings": []
    },
    {
      "pizzaName": "Salami",
      "additionalToppings": [
        "🧄 garlic",
        "🍅 tomato",
        "🧅 onion",
        "🥚 egg"
      ]
    },
    {
      "pizzaName": "Margherita",
      "additionalToppings": [
        "🍅 tomato",
        "🍓 strawberry",
        "🍍 pineapple",
        "🐟 tuna"
      ]
    }
  ],
  "timestamp": 1669202722766
}

So each order record has an id field, some information about the customer, and an array of pizzas ordered. Each pizza has a pizzaName and an array of additionalToppings.

Ingesting the JSON data

Let’s first do an ordinary ingestion using the ingest wizard. Note how the pizzas field does not appear in the list in Parse data:

Parse Kafka fields

Don’t worry, we will fix that in a moment. Proceed to Configure schema, accepting the defaults.

Add a new dimension:

Add dimension

and choose the pizzas field and type json:

Configure JSON dimension

In the following steps, choose daily segments and name the datasource pizza-01.

Here is the ingestion spec:

{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "topic": "pizza",
      "inputFormat": {
        "type": "json"
      },
      "useEarliestOffset": true
    },
    "tuningConfig": {
      "type": "kafka"
    },
    "dataSchema": {
      "dataSource": "pizza-01",
      "timestampSpec": {
        "column": "timestamp",
        "format": "millis"
      },
      "dimensionsSpec": {
        "dimensions": [
          {
            "type": "long",
            "name": "id"
          },
          "shop",
          "name",
          "phoneNumber",
          "address",
          {
            "type": "json",
            "name": "pizzas"
          }
        ]
      },
      "granularitySpec": {
        "queryGranularity": "none",
        "rollup": false,
        "segmentGranularity": "day"
      }
    }
  }
}

Let’s query the data:

Query the simple data

The pizzas column has been ingested correctly. The tree symbol next to the column shows that it is a nested JSON column and you can query it further with JSON functions.

Adding the Kafka timestamp

Instead of the timestamp field inside the data, we would now like to use the Kafka builtin timestamp. This means we have to replace the inputFormat object. A Kafka inputFormat object can read all the data from a Kafka message, including:

  • the timestamp
  • the key
  • Kafka header fields.

It also needs nested input format objects that define how to parse the key and value of the Kafka message. The value format is json, the key is a single string, modeled as a degenerate csv format.

Edit the ingestion spec in the JSON editor. Find the inputFormat section and replace it with this snippet:

      "inputFormat": {
        "type": "kafka",
        "headerLabelPrefix": "kafka.header.",
        "timestampColumnName": "kafka.timestamp",
        "keyColumnName": "kafka.key",
        "headerFormat": {
          "type": "string"
        },
        "keyFormat": {
          "type": "csv",
          "columns": [
            "k"
          ]
        },
        "valueFormat": {
          "type": "json"
        }
      }

Also replace the timestampSpec with this:

      "timestampSpec": {
        "column": "kafka.timestamp",
        "format": "millis"
      }

Name the datasource for this experiment pizza-02. Submit the spec.

Kafka ingestion, the 1st

Oh no! The JSON data have not been picked up.

Fixing field discovery using flattenSpec

Here is where an explicit flattenSpec comes to the rescue. It is inserted into the value’s input format and uses the "root" field type to tell Druid to pick up whatever is in the field:

      "inputFormat": {
        "type": "kafka",
        "headerLabelPrefix": "kafka.header.",
        "timestampColumnName": "kafka.timestamp",
        "keyColumnName": "kafka.key",
        "headerFormat": {
          "type": "string"
        },
        "keyFormat": {
          "type": "csv",
          "columns": [
            "k"
          ]
        },
        "valueFormat": {
          "type": "json",
          "flattenSpec": {
            "useFieldDiscovery": true,
            "fields": [
              { "name": "pizzas", "type": "root" }
            ]
          }
        }

Name this datasource pizza-03. This time, the JSON data has been ingested correctly!

Final query

Here is the final ingestion spec:

{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "topic": "pizza",
      "inputFormat": {
        "type": "kafka",
        "headerLabelPrefix": "kafka.header.",
        "timestampColumnName": "kafka.timestamp",
        "keyColumnName": "kafka.key",
        "headerFormat": {
          "type": "string"
        },
        "keyFormat": {
          "type": "csv",
          "columns": [
            "k"
          ]
        },
        "valueFormat": {
          "type": "json",
          "flattenSpec": {
            "useFieldDiscovery": true,
            "fields": [
              {
                "name": "pizzas",
                "type": "root"
              }
            ]
          }
        }
      },
      "useEarliestOffset": true
    },
    "tuningConfig": {
      "type": "kafka"
    },
    "dataSchema": {
      "dataSource": "pizza-03",
      "timestampSpec": {
        "column": "kafka.timestamp",
        "format": "millis"
      },
      "dimensionsSpec": {
        "dimensions": [
          {
            "type": "long",
            "name": "id"
          },
          "shop",
          "name",
          "phoneNumber",
          "address",
          {
            "type": "json",
            "name": "pizzas"
          }
        ]
      },
      "granularitySpec": {
        "queryGranularity": "none",
        "rollup": false,
        "segmentGranularity": "day"
      }
    }
  }
}

Conclusion

  • Nested columns allow structured JSON data inside Druid datasources.
  • This can replace flattenSpec in some use cases.
  • However, with a kafka inputFormat, you need to define each nested column field explicitly in the flattenSpec.

"Pizza" by Katrin Gilger is licensed under CC BY-SA 2.0 .