Send data from Apache Kafka® to Managed Service for ClickHouse® in DoubleCloud

  1. Prepare your workspace

  2. Configure Managed Service for Apache Kafka®

    1. Create an Apache Kafka® cluster

    2. Create a topic

  3. Create a Managed ClickHouse® cluster and configure settings

  4. Set up integration to store and process data from Apache Kafka® topic

    1. Access the cluster

    2. Create tables to receive and process messages from the topic

  5. Run the data pipeline

    1. Create a consumer and a producer

    2. Send data to the Apache Kafka® topic

    3. Query the data in your MergeTree table in ClickHouse®

Prepare your workspace

Before you start with this tutorial, you need to install the software to use later:

  1. Install the ClickHouse® client.

  2. Install the kafkacat (kcat) client to manage Apache Kafka®:

    Pull the kcat image available at Docker Hub. We use the 1.7.1 version, but you can use the latest one:

    docker pull edenhill/kcat:1.7.1
    

    Install kafkacat from your Linux distribution's repository:

    sudo apt install kafkacat
    
  3. Install jq to process JSON data:

    docker pull stedolan/jq
    

    Install jq from your Linux distribution's repository:

    sudo apt install jq -y
    
  4. Take a look at the JSON file that contains 100 objects with sample hits data. We'll access it later programmatically.

Configure Managed Service for Apache Kafka®

Create an Apache Kafka® cluster

  1. Go to the Clusters overview page in the console.

  2. Click Create cluster in the upper-right corner of the page.

  3. Select Apache Kafka®.

  4. Choose a provider and a region. Try to create your Apache Kafka® and ClickHouse® clusters in the same region.

    1. Under Resources:

      • Select the s2-c2-m4 preset for CPU, RAM capacity, and storage space to create a cluster with minimal configuration.

      • Select the number of zones and brokers. The number of zones is multiplied by the number of brokers and determines the number of hosts. This tutorial creates a cluster with two zones and two brokers.

    2. Under Basic settings:

      • Enter the cluster Name, for example, kafka-tutorial-cluster.

      • Select the version of Apache Kafka® for your cluster from the Version drop-down list. For most clusters, we recommend using the latest version.

    3. Click Submit.

Your cluster will appear with the Creating status on the Clusters page in the console. Setting everything up may take some time. When the cluster is ready, it changes its state to Alive.

Create a topic

When you've created a cluster, create a topic in it:

  1. On the cluster's page, go to the Topics tab.

  2. Click Create.

  3. Under Topic Settings, specify the connection properties:

    • Cleanup policy - Delete. This policy deletes log segments when their retention time or log size reaches the limit.

    • Compression Type - Uncompressed. We don't need compression for our use case. Let's disable it.

    • Retention Bytes - 1048576 (1 mb).

    • Retention Ms - 600000 (10 minutes).

  4. Specify the Basic Settings:

    • Name

      A topic's name. Let's call it hits_topic.

    • Partitions

      A number of topic's partitions. Keep 1 to create the simplest topic.

    • Replication factor

      Specifies the number of copies of a topic in a cluster. This parameter's value should not exceed the number of brokers in the cluster. Let's keep the default setting - 1.

  5. Click Submit.

Create a Managed ClickHouse® cluster and configure settings

  1. Go to the Clusters overview page in the console.

  2. Click Create cluster in the upper-right corner of the page.

    1. Select ClickHouse®.

    2. Choose a provider and a region. Try to create your Apache Kafka® and ClickHouse® clusters in the same region.

    3. Under Resources:

      • Select the s2-c2-m4 preset for CPU, RAM capacity, and storage space to create a cluster with minimal configuration.

      • Choose a number of replicas. Let's keep it as is with a single replica.

      • Select a number of shards. Keep a single shard.

    4. Under Basic settings:

      • Enter the cluster Name, in this tutorial - clickhouse-tutorial-cluster

      • From the Version drop-down list, select the ClickHouse® version the Managed ClickHouse® cluster will use. For most clusters, we recommend using the latest version.

    5. Under AdvancedCluster settingsclickhouseConfigkafka, specify the details of the connection to your previously created Apache Kafka® cluster:

      • securityProtocol: SASL_SSL

      • saslMechanism: SCRAM-SHA-512

      • saslUsername: Your Apache Kafka® cluster username

      • saslPassword: Your Apache Kafka® cluster password.

      Warning

      If you apply these settings to an already existing cluster, restart the cluster to apply the changes.

    6. Click Submit.

    Your cluster will appear with the Creating status on the Clusters page in the console. Setting everything up may take some time.

Your cluster will appear with the Creating status on the Clusters page in the console. Setting everything up may take some time. When the cluster is ready, it changes its state to Alive.

Tip

The DoubleCloud service creates the superuser admin and its password automatically. You can find both the User and the Password in the Overview tab on the cluster information page.

To create users for other roles, see Manage ClickHouse® users

Set up integration to store and process data from Apache Kafka® topic

Access the ClickHouse® cluster

Reading data from Apache Kafka® topics requires creating two different tables and a materialized view . The reason is that users can't query the Kafka table directly - it operates only as a consumer. To convert the data to the required format, create a materialized view that reads from the Kafka table. Finally, create a table with an engine that allows storing and processing data.

The table structure is the following:

CREATE TABLE [db.]table_name ON CLUSTER default
(
    name1 [type1],
    name2 [type2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]

Required parameters:

kafka_broker_list

A comma-separated list of brokers.

kafka_topic_list

A comma-separated list of Kafka topics.

kafka_group_name

A group of Kafka consumers. Reading margins for each group are tracked separately. If you don't want messages to be duplicated in the ClickHouse® cluster, use the same group name everywhere.

kafka_format

A message format .

Refer to the ClickHouse® documentation to see the full parameter list.

Create tables to receive and process messages from the topic

  1. Run the following command in your terminal to connect to your cluster. It contains the string from the Native interface field on the cluster Overview page:

    docker run --network host --rm -it clickhouse/<Native interface connection string>
    
    The complete Docker command structure
    docker run --network host --rm -it \ 
                clickhouse/clickhouse-client \
                --host <FQDN of your cluster> \
                --secure \
                --user <cluster user name> \
                --password <cluster user password> \
                --port 9440 
    
    <Native interface connection string>
    
  2. Create a table with the Kafka engine to receive messages from topics.

    Communication between different DoubleCloud services is usually done with private FQDN addresses and a special port. You can find them under the Connection stringsPrivate tab on the overview page.

    CREATE TABLE topic_messages ON CLUSTER default
       (
          Hit_ID Int32, 
          Date Date, 
          Time_Spent Float32, 
          Cookie_Enabled Bool, 
          Region_ID Int32, 
          Gender String, 
          Browser String, 
          Traffic_Source String, 
          Technology String
       )
       ENGINE = Kafka('<Public FQDN of your Kafka cluster>:9091', 'hits_topic', 'group1', 'JSONEachRow');
    
  3. Create a ReplicatedMergeTree table to permanently store and process data. After you create this table, it will be empty - the data will come from the materialized view.

    CREATE TABLE hits_storage ON CLUSTER default
       (
          Hit_ID Int32, 
          Date Date, 
          Time_Spent Float32, 
          Cookie_Enabled Bool, 
          Region_ID Int32, 
          Gender String, 
          Browser String, 
          Traffic_Source String, 
          Technology String
       )
       ENGINE = ReplicatedMergeTree()
       ORDER BY (Hit_ID, Date)
    
  4. Create a materialized view to copy messages data from the Kafka table to the MergeTree table. The materialized view collects data in the background. This allows you to continually receive messages from Kafka and convert them to the required format using the SELECT statement.

    CREATE MATERIALIZED VIEW materialized_messages TO hits_storage
    AS SELECT * FROM topic_messages;
    

Run the data pipeline

The pipeline consists of the following entities:

  • The Apache Kafka® producer

  • A ClickHouse® table with the Kafka engine as a consumer

  • A materialized view

  • A table to which the materialized view copies data.

To send data through this pipeline, access the JSON file from our Amazon S3 bucket and send the data from this file to the Apache Kafka® topic.

Create a consumer and a producer

  1. (Optional) Run an Apache Kafka® consumer to see if the data is successfully received by the Apache Kafka® cluster. The data will appear in your ClickHouse® anyway but it will be easier for you to track the process and debug it if needed.

    docker run --network host --rm -it \
                edenhill/kcat:1.7.1 -C \
                -b <Cluster FQDN from the Overview page>:9091 \    
                -t hits_topic \
                -X security.protocol=SASL_SSL \  
                -X sasl.mechanisms=SCRAM-SHA-512 \    
                -X sasl.username="<username>" \
                -X sasl.password="<your cluster password>"
    

    kafkacat -C \
             -b <Cluster FQDN from the Overview page>:9091 \    
             -t hits_topic \
             -X security.protocol=SASL_SSL \  
             -X sasl.mechanisms=SCRAM-SHA-512 \    
             -X sasl.username="<username>" \
             -X sasl.password="<your cluster password>"
    

    Now, when you run a separate terminal for the producer, the messages received will be displayed in the consumer terminal too.

  2. Execute the following command in a separate terminal instance to create a producer and push the data. This command also processes the JSON file the way it's required for:

    curl https://doublecloud-docs.s3.eu-central-1.amazonaws.com/data-sets/hits_sample.json | docker run -i stedolan/jq | docker run --name kcat --rm -i edenhill/kcat:1.7.1
          -P \
          -b <Cluster FQDN from the Overview page>:9091 \
          -t hits_topic \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="<username>" \
          -X sasl.password="<password>"
    
    curl https://doublecloud-docs.s3.eu-central-1.amazonaws.com/data-sets/hits_sample.json | jq -rc | kafkacat
          -P \
          -b <Cluster FQDN from the Overview page>:9091 \
          -t hits_topic \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="<username>" \
          -X sasl.password="<password>"
    

If you've completed all the steps successfully and have an opened consumer, the terminal will show the uploaded data:

},
{
   "Hit_ID": 40668,
   "Date": "2017-09-09",
   "Time_Spent": "730.875",
   "Cookie_Enabled": 0,
   "Redion_ID": 11,
   "Gender": "Female",
   "Browser": "Chrome",
   "Traffic_Source": "Social network",
   "Technology": "PC (Windows)"
}
]
% Reached end of topic first-topic [0] at offset 1102

Query the data in your ReplicatedMergeTree table

Open the terminal with the ClickHouse® connection. If the terminal isn't opened, connect to the Managed ClickHouse® cluster as shown in the previous step.

After that, send the following SELECT query:

SELECT * FROM hits_storage LIMIT 5

The output should be the following:

┌─Hit_ID─┬───────Date─┬─Time_Spent─┬─Cookie_Enabled─┬─Region_ID─┬─Gender──┬─Browser────────┬─Traffic_Source──┬─Technology───────────┐
│  14230 │ 2017-01-30 │  265.70175 │ true           │         2 │ Female  │ Firefox        │ Direct          │ PC (Windows)         │
│  14877 │ 2017-04-12 │  317.82758 │ false          │       229 │ Female  │ Firefox        │ Direct          │ PC (Windows)         │
│  14892 │ 2017-07-29 │   191.0125 │ true           │        55 │ Female  │ Safari         │ Recommendations │ Smartphone (Android) │
│  15071 │ 2017-06-11 │  148.58064 │ true           │       159 │ Female  │ Chrome         │ Ad traffic      │ PC (Windows)         │
│  15110 │ 2016-09-02 │  289.48334 │ true           │       169 │ Female  │ Chrome         │ Search engine   │ Smartphone (IOS)     │
└────────┴────────────┴────────────┴────────────────┴───────────┴─────────┴────────────────┴─────────────────┴──────────────────────┘

This is the end of the tutorial for the Apache Kafka® and ClickHouse® integration in DoubleCloud. Take a look at the articles listed below to see more about both services and other combinations.

See also

Managed Service for Apache Kafka®

Managed Service for ClickHouse®