Use Managed Service for Apache Kafka® API

To get started with the service:

Before you start

Your primary tool to interact with the DoubleCloud is the console. We need to set it up and then configure it before moving on.

  1. Go to the console.

  2. If you already have an account, log in to DoubleCloud or sign up if you open the console for the first time.

    Warning

    The steps below show the sequence of setting up the kcat from Docker and with kafkacat on DEB-based but you can use other tools of your choice.

    For other connection options, see Connect to a Apache Kafka® cluster.

    Pull the kcat image available at Docker Hub. This command uses the 1.7.1 version, but you can use the latest one:

    docker pull edenhill/kcat:1.7.1
    

    Install kcat from your package repository:

    sudo apt install kafkacat
    
  3. Create a service account:

    1. Go to the Service accounts tab of the Members page in the console. You'll see the following dialog:

    create-service-account

    1. Name your service account.

    2. From the drop-down menu, select the Admin user role - we will need both read and write access.

    3. Click Submit. You'll see your new service account appear on the list.

  4. Issue an API key for your service account:

    1. Go to the Service accounts tab of the Members page in the console.

    2. Open the information page of the service account for which you want to create an API key.

    3. Under API keys, click Create key to create you account's first Secret key. You'll see the following dialog:

      isecret-key-dialog

    4. Click Download file with keys. You'll use it to authenticate API requests.

  5. Install the DoubleCloud API Python SDK.

Create your cluster

To create a Apache Kafka® cluster, use the ClusterService create method. The required parameters to create a functional cluster:

  • project_id - the ID of your project. You can get this value on your project's information page.

  • cloud_type - aws.

  • region_id - for this quickstart, use eu-central-1

  • name - quickstart-cluster.

  • resources - specify the following from the doublecloud.ckafka.v1.Cluster model:

    • resource_preset_id - for this quickstart, specify s2-c2-m4.

    • disk_size - 34359738368 bytes (32 GB).

    • broker_count - 1.

    • zone_count - 3.

  • You can also enable schema registry for your cluster: use the schema_registry_config object within the ClusterService Create method.

Kafka cluster request in JSON format
{
   "project_id": "<your Project ID>",
   "cloud_type": "aws",
   "region_id": "eu-central-1",
   "name": "quickstart-cluster",
   "resources": {
      "kafka": {
            "resource_preset_id": "s2-c2-m4",
            "disk_size": "34359738368",
            "broker_count": 1,
            "zone_count": 3
      }
   },
}
import json
import logging
import argparse

from google.protobuf.wrappers_pb2 import Int64Value

import doublecloud
from doublecloud.kafka.v1.cluster_pb2 import ClusterResources
from doublecloud.kafka.v1.cluster_service_pb2 import CreateClusterRequest
from doublecloud.kafka.v1.cluster_service_pb2_grpc import ClusterServiceStub
from doublecloud.kafka.v1.config_pb2 import SchemaRegistryConfig

def create_cluster(sdk, project_id, region_id, name, network_id):
    cluster_service = sdk.client(ClusterServiceStub)
    operation = cluster_service.Create(
        CreateClusterRequest(
            project_id=project_id,
            cloud_type="aws",
            region_id="eu-central-1",
            name="quickstart-cluster",
            resources=ClusterResources(
                kafka=ClusterResources.Kafka(
                    resource_preset_id="s2-c2-m4",
                    disk_size=Int64Value(value=34359738368),
                    broker_count=Int64Value(value=1),
                    zone_count=Int64Value(value=3),
                )
            ),
            network_id=network_id,
            schema_registry_config=SchemaRegistryConfig(enabled=True) # enable schema registry for the cluster
        )
    )
    logging.info("Creating initiated")
    return operation

Note

The DoubleCloud service creates the superuser admin and its password automatically. You can find both the User and the Password in the Overview tab on the cluster information page.

Create a topic

After you've created a cluster, you also need to create a topic for messages:

Use the TopicService create method and pass the following parameters:

  • cluster_id - the ID of the cluster in which you want to create a topic. To find the cluster ID, get a list of clusters in the project.

  • topic_spec - let's configure the required topic specifications:

    • name - specify the topic name, first-topic.

    • partitions - set the minimum number of partitions for this quickstart, 1.

    • replication_factor - go for the basic option here as well, specify 1.

    • topic_config_3: use the doublecloud.kafka.v1.TopicConfig3 model to set the further topic configuration for Apache Kafka® version 3 and above:

      • cleanup policy - set the cleanup policy for the topic, in this case CLEANUP_POLICY_DELETE.

      • compression_type - we don't need compression for this tutorial, specify COMPRESSION_TYPE_UNCOMPRESSED.

      • retention_bytes - 1048576 (1 Mb).

      • retention_ms - 600000 (10 minutes).

Connect to your cluster

When you have a cluster and a topic in it, connect to the cluster and transfer a text message between a consumer and a producer:

  1. Run a command that contains a connection string to create a consumer. You can use the Connection string from the Overview tab on your cluster information page. The command has the following structure:

    docker run --name kcat --rm -i -t edenhill/kcat:1.7.1 
          -C \
          -b <broker FQDN>:9091 \
          -t <topic name> \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="admin" \
          -X sasl.password="<cluster password>" \
          -Z
    
    kafkacat -C \
          -b <broker FQDN>:9091 \
          -t <topic name> \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="admin" \
          -X sasl.password="<cluster password>" \
          -Z
    

    You will see the following status message:

    % Reached end of topic first-topic [0] at offset 0
    
  2. Execute the following command in a separate terminal instance to create a producer and push the data:

    curl https://doublecloud-docs.s3.eu-central-1.amazonaws.com/data-sets/hits_sample.json | docker run --name kcat --rm -i edenhill/kcat:1.7.1
          -P \
          -b <broker FQDN>:9091 \
          -t <topic name> \
          -k key \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="<username>" \
          -X sasl.password="<password>"
    
    curl https://doublecloud-docs.s3.eu-central-1.amazonaws.com/data-sets/hits_sample.json | kafkacat
          -P \
          -b <broker FQDN>:9091 \
          -t <topic name> \
          -k key \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="<username>" \
          -X sasl.password="<password>"
    
  3. If you've completed all the steps successfully, the terminal with the consumer will show the uploaded data:

     {
          "Hit_ID": 40668,
          "Date": "2017-09-09",
          "Time_Spent": "730.875",
          "Cookie_Enabled": 0,
          "Redion_ID": 11,
          "Gender": "Female",
          "Browser": "Chrome",
          "Traffic_Source": "Social network",
          "Technology": "PC (Windows)"
     }
    % Reached end of topic first-topic [0] at offset 1102
    

Now you have an Apache Kafka® cluster with the working consumer and producer. See the links below to continue exploring:

Download the complete code example

You can download the full code listing for all the steps above from our Python SDK GitHub repository .

See also