📢 Upcoming webinar | Migrating from Rockset to ClickHouse made easy Register now →

Unlocking the power duo: Kafka and ClickHouse for lightning-fast data processing

Written by: Andrei Tserakhau, DoubleCloud Tech Lead

What we wanted to solve

Let’s delve into a real-world scenario:
Imagine you are tasked with aggregating vast amounts of data generated by multiple point-of-sale systems. This data needs to be processed in real-time and displayed on a top-level analytical dashboard to provide comprehensive insights.

In the realm of data processing, speed is paramount. ClickHouse stands out as the king of speed —
it never slows down and is exceptionally fast. Its effectiveness in concurrent processing and affordability makes it a top choice for building rapid data insights.

This brings us to a naive solution:

For each POS, we add a terminal code which inserts data into ClickHouse.

💡 Simple?
Yes.

💡 Will it work?
No.

Why it’s hard to write into Clickhouse

This naive solution is an entry ticket to your first Clickhouse deadly sin: here’s an entire list of sins.
This error will often be experienced when inserting data and will be present in ClickHouse logs or in a response to an INSERT request. To understand this error, users need to have a basic understanding of the concept of a part in ClickHouse:

Managed Service for ClickHouse

Fully managed service from the creators of the world’s 1st managed ClickHouse. Backups, 24/7 monitoring, auto-scaling, and updates.

Let’s not get too deep in the technical details; let’s simply acknowledge this fact. Writing to ClickHouse thrives on controlled speed and parallelism. ClickHouse loves it when your ingestion process looks like this:

In an ideal scenario, one significant supplier would insert ALL data at any speed, but with controlled parallelism and buffering. This aligns perfectly with ClickHouse’s preferences.

That’s why, in practice, it’s common to introduce a buffer before ClickHouse:

This is where we introduce Kafka, the beacon of data buffering solutions. With its seamless ability to act as a buffer, Kafka swoops in as the ultimate sidekick to complement ClickHouse’s might. So, after iterating, our solution looks as follows:

We incorporate some code to write data to Kafka from the POS systems, and then set up a delivery from Kafka to ClickHouse. There is still some magic on this diagram, as the delivery from Kafka to ClickHouse is a challenge in itself, but we will cover this later.

💡 Powerful?
Yes.

💡 Scalable?
Definitely.

💡 Simple?
Not really.

How to take delivery data between Kafka and ClickHouse

The critical stages in delivering data from Kafka to ClickHouse involve reading Kafka topics, transforming data into ClickHouse-compatible formats, and writing this formatted data into ClickHouse tables. The trade-off here lies in deciding where to perform each stage.

Each stage consumes some resources:

Reading Stage: This initial phase consumes CPU and network bandwidth to pull in data from Kafka topics.
Transformation Process: Transforming the data requires CPU and memory usage. It’s a straightforward resource utilization phase, where computational power reshapes the data to fit ClickHouse’s specifications.
Writing Stage: The final act involves writing data into ClickHouse tables, which also requires CPU power and network bandwidth. It’s a routine process, ensuring the data finds its place in ClickHouse’s storage with allocated resources.

Each integration method comes with its own tradeoffs, so you should choose wisely.
Let’s explore different options for implementing the connection between Kafka and ClickHouse:

ClickHouse Kafka Engine

Utilize Kafka’s built-in ClickHouse engine to write data directly into ClickHouse tables. This is how it looks from up high level:

Managed Service for Apache Kafka

Fully managed, secure, and highly available service for distributed delivery, storage, and real-time data processing.

Let’s imagine that our POS terminals generate JSON data with a new line delimiter.


{"user_ts": "SOME_DATE", "id": 123, "message": "SOME_TEXT"}            
{"user_ts": "SOME_DATE", "id": 1234, "message": "SOME_TEXT"}

Let’s implement this Kafka Engine:

First, we need to create a wrapper around the topic inside ClickHouse via Kafka Engine:
example kafka_stream_engine.sql

   -- Clickhouse queue wrapper
    CREATE TABLE demo_events_queue ON CLUSTER '{cluster}' (
   -- JSON content schema
    user_ts String, 
    id UInt64, 
    message String
    ) ENGINE = Kafka SETTINGS kafka_broker_list = 'KAFKA_HOST:9091', 
      kafka_topic_list = 'TOPIC_NAME', 
      kafka_group_name = 'uniq_group_id', 
      kafka_format = 'JSONEachRow';

In this query, we set up three main things:

  1. Data schema: a table with 3 columns.
  2. Data format: JSON Each Row.
  3. Kafka host + Kafka topic.
    Next, we need to specify the target table that will host the resulting data:
    /example_projects/clickstream/kafka_stream_engine.sql#L12-L23

   -- Table to store data
      CREATE TABLE demo_events_table ON CLUSTER '{cluster}' (
              topic String,
              offset UInt64,
              partition UInt64,
              timestamp DateTime64,
              user_ts DateTime64,
              id UInt64,
              message String
      ) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/demo_events_table', '{replica}')
       PARTITION BY toYYYYMM(timestamp)
       ORDER BY (topic, partition, offset);

This table saves the same data in ReplicatedMergeTree, but with some extra columns. These columns would be fetched from KafkaEngine metadata.

/example_projects/clickstream/kafka_stream_engine.sql#L25-L34

  -- Delivery pipeline
     CREATE MATERIALIZED VIEW readings_queue_mv TO demo_events_table AS
     SELECT
       -- kafka engine virtual column
         _topic as topic,
         _offset as offset,
         _partition as partition,
         _timestamp as timestamp,
         -- example of complex date parsing
         toDateTime64(parseDateTimeBestEffort(user_ts), 6, 'UTC') as user_ts,
         id,
        message
      FROM demo_events_queue;

As a final step, create a materialized view that connects the KafkaEngine table with the target table.
All of these steps come together to produce the final result:

 SELECT count(*)
 FROM demo_events_table
   Query id: f2637cee-67a6-4598-b160-b5791566d2d8

    ┌─count()─┐
    │    6502 │
    └─────────┘

    1 row in set. Elapsed: 0.336 sec.

In this option, all three stages occur inside ClickHouse. This is suitable for smaller workloads but may lead to unreliable performance on a large scale. Additionally, ClickHouse tends to prioritize query workloads over non-query workloads when facing resource deficits, potentially creating additional delivery latencies under high loads.

While the utilization of KafkaEngine is robust, it introduces unresolved challenges:

  • Offset Management: In the event of malformed data entering Kafka, ClickHouse may become unresponsive until a manual offset deletion by an administrator, which is an inherently labor-intensive task.

  • Limited Observability: Monitoring poses a challenge as all operations occur within ClickHouse, necessitating reliance on ClickHouse logs as the sole means of gaining insights into system activities.

  • Scalability Concerns: Handling parsing and reading within the ClickHouse cluster itself may impede the seamless scaling of read and write operations during periods of heightened demand, potentially leading to CPU and I/O concurrency issues.

Inside Kafka Connect

On the flip side, Kafka Connect flips the script, shifting complexity from ClickHouse to Kafka.

It’s a game of strategy, deciding where to house the intricacies of data management. We place the reader/parsing/writing efforts inside Kafka Connect, which will be hosted by Kafka itself.

The pros/cons here are pretty much the same, just shifting the extra load from storage to the buffer side. You can see an example of how to connect here.

External Writer

For those willing to invest, the External Writer option emerges as the crème de la crème, promising unparalleled performance at the cost of a few extra coins. An oversimplified solution may look like:

Let’s attempt to establish this delivery using DoubleCloud Data Transfer.

The resource model consists of two endpoints (source and destination) and a transfer. To create them, we will utilize the Terraform provider.

The crucial part in the configuration is the parser rule for the Source endpoint:
/example_projects/clickstream/transfer.tf#L16-L43

    parser {
    json {
    schema {
      fields {
       field {
         name     = "user_ts"
         type     = "datetime"
         key      = false
         required = false
      }
      field {
          name     = "id"
          type     = "uint64"
          key      = false
          required = false
      }
      field {
         name     = "message"
         type     = "utf8"
         key      = false
         required = false
      }
     }
   }
      null_keys_allowed = false
      add_rest_column   = true
     }
   }

This parser is akin to what we specify inside ClickHouse via DDL.

Once we create the source, adding a target database is a straightforward process:

/example_projects/clickstream/transfer.tf#L54-L63

          clickhouse_target {
             clickhouse_cleanup_policy = "DROP"
             connection {
             address {
                 cluster_id = doublecloud_clickhouse_cluster.target-clickhouse.id
              }
             database = "default"
             user     = "admin"
                }
              }        

Finally, link them together into a transfer:
/example_projects/clickstream/transfer.tf#L67-L75

      resource "doublecloud_transfer" "clickstream-transfer" {
               name       = "clickstream-transfer"
               project_id = var.project_id
               source     = doublecloud_transfer_endpoint.clickstream-source[count.index].id
               target     = doublecloud_transfer_endpoint.clickstream-target[count.index].id
               type       = "INCREMENT_ONLY"
                activated  = true
        }

And there you go! Your delivery is up and running.

No-code ELT tool: Data Transfer

A cloud agnostic service for aggregating, collecting, and migrating data from various sources.

We at DoubleCloud have designed our own robust EL (t) engine, Transfer, with a pivotal feature, Queue Engine -> ClickHouse delivery. In developing this delivery mechanism, we have proactively addressed the persistent challenges:

  • Automated Offset Management: We have implemented automatic unparsed tables, streamlining the handling of corrupt data and eliminating the need for manual intervention in offset management.

  • Enhanced Observability: To overcome the limited visibility inherent in ClickHouse, we have developed dedicated dashboards and alerts that provide real-time insights into specific delivery metrics. This includes comprehensive monitoring of data lag, delivered rows, and delivered bytes.

  • Dynamic Scalability: Transfer deploys delivery jobs externally, within Kubernetes, EC2, or GCP instances, allowing for independent scaling separate from the ClickHouse cluster. This ensures optimal scalability to meet varying demands without compromising performance.

Additionally, Transfer offers out-of-the-box support for:

  • Automatic schema evolution: Backward-compatible schema changes are automatically synchronized for the target storage.

  • Automatic dead-letter queue: Any corrupt data is handled by Transfer and organized into the DLQ ClickHouse table.

External writer via ClickPipes

ClickPipes is a managed integration platform that makes ingesting data from a diverse set of sources as simple as a few clicks. Designed for the most demanding workloads, ClickPipes’s robust and scalable architecture ensures consistent performance and reliability.

I won’t copy the full instructions on how to set up this writer, but you can find a comprehensive guide here.
This writer is similar to any DoubleCloud Transfer, but without automatic scheme evolution.

Summary

But how does one choose the right path in this labyrinth of possibilities?

Cue the Graph is your trusty compass in this landscape of choices. Visualizing the pros and cons, the trade-offs and benefits, a graph becomes your guiding star, illuminating the ideal route for your specific needs.

In this comprehensive guide, we’ll explore each aspect of the Kafka-ClickHouse love, delving into the intricacies, highlighting the pitfalls, and providing a roadmap for making that crucial decision. Get ready to unravel the secrets behind this dynamic duo, as we navigate the fast-paced world of data delivery and processing.

To explore the power of Kafka + Clickhouse, feel free to explore DoubleCloud stack, we have a nice Terraform examples.

Get started with DoubleCloud

Sign in to save this post