Connect to an Apache Kafka® cluster

Note

Note that it's only possible to connect to your cluster hosts using an SSL connection.

Connect with CLI-based tools

We tested the connections in the following environments:

  • Ubuntu 20.04 LTS:

  • Windows 10 Enterprise 1909:

    • PowerShell: 7.2.4 OpenJDK 17.0.3 Apache Kafka® 3.0

DoubleCloud Managed Service for Apache Kafka® ports

You can connect to DoubleCloud Managed Service for Apache Kafka® clusters via the following ports:

All the above ports are SSL-encrypted.

Bash

  1. Install the kcat (formerly kafkacat) utility.

    sudo apt update && sudo apt install kafkacat
    
  2. Run the command below to create a consumer and receive messages from a topic. The consumer username and consumer password parameters are your cluster's username and password. Substitute the fields below with the appropriate information available on your cluster's Overview tab:

    kafkacat -C \
          -b <broker host domain name>:9091 \
          -t <topic name> \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="<cluster username>" \
          -X sasl.password="<cluster password>" \
          -Z
    
  3. Execute the following command to create a producer and send a text message. Pay attention - you need to run this command in a separate terminal. Substitute the fields below with the appropriate information available on your cluster's Overview tab:

    echo "test message" | kafkacat -P \
          -b <broker host domain name>:9091 \
          -t <topic name> \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="<cluster username>" \
          -X sasl.password="<cluster password>" \
    

    Warning

    The command above assumes that you have a topic with the Cleanup policy of the DELETE type. In case you set COMPACT or COMPACT_AND_DELETE types, specify keys for your messages as follows: "test_key:test message" and add the K: parameter to the commands above.

  4. If you've completed all the steps successfully, the terminal with the consumer will display "test message".

Docker

  1. (Optional) Start Docker if needed:

    sudo service docker start
    
  2. Pull the Edenhill kcat Docker container with a specified version:

    docker pull edenhill/kcat:<version>
    
  3. Run a command that contains a connection string in a container to create a consumer. You can use one of the Connection strings from the Overview tab on your cluster information page. The command will have the following structure:

    docker run --name kcat --rm -i -t edenhill/kcat:<version> -C \
               -b <broker FQDN>:9091 \
               -t <topic name> \
               -X security.protocol=SASL_SSL \
               -X sasl.mechanisms=SCRAM-SHA-512 \
               -X sasl.username="<cluster username>" \
               -X sasl.password="<cluster password>" \
               -Z
    
  4. Execute the following command to create a producer and push a text string:

    echo "test message" | docker run --name kcat --rm -i -t edenhill/kcat:<version> -P \
             -b <broker FQDN>:9091 \
             -t <topic name> \
             -k key \
             -X security.protocol=SASL_SSL \
             -X sasl.mechanisms=SCRAM-SHA-512 \
             -X sasl.username="<username>" \
             -X sasl.password="<password>" \
    

    Warning

    The command above assumes that you have a topic with the Cleanup policy of the DELETE type. In case you set COMPACT or COMPACT_AND_DELETE types, specify keys for your messages as follows: "test_key:test message" and add the K: parameter to the commands above.

  5. After you've completed all the above steps, the terminal with the consumer will show "test message".

Powershell

  1. Install the latest available version of Microsoft OpenJDK .

  2. Download and unpack the archive with binary files for the Apache Kafka® of the same version used by your cluster.

    Tip

    Unpack the Apache Kafka® files to the disk's root directory, for example, C:\kafka_2.12-2.6.0\.

    If the path to the Apache Kafka®'s executable and batch files is too long, you'll get an error when trying to run the files: The input line is too long.

  3. Type the following command to create a consumer:

    <path to the directory with Apache Kafka® files>\bin\windows\kafka-console-consumer.bat `
    --bootstrap-server <your Apache Kafka® cluster connection string> `
    --topic <topic name> `
    --consumer-property security.protocol=SASL_SSL `
    --consumer-property sasl.mechanism=SCRAM-SHA-512 `
    --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
    
  4. The following code creates a producer and pushes a text message:

    echo "key:test message" | <path to the directory with Apache Kafka® files>\bin\windows\kafka-console-producer.bat `
    --bootstrap-server <your Apache Kafka® cluster connection string> `
    --topic <topic name> `
    --property parse.key=true `
    --property key.separator=":" `
    --producer-property acks=all `
    --producer-property security.protocol=SASL_SSL `
    --producer-property sasl.mechanism=SCRAM-SHA-512 `
    --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
    
  5. If you've completed all the steps successfully, the terminal with the consumer will show "test message".

  1. Install the Snapd package:

    Installation prerequisites for Linux Mint users

    To allow installation of snapd in Linux Mint, execute the following in your terminal:

    rm /etc/apt/preferences.d/nosnap.pref
    
    sudo apt install snapd
    
  2. Install the latest version of PowerShell from Snap:

    sudo snap install powershell --classic
    
  3. Download and unpack the archive with binary files for the Apache Kafka® of the same version used by your cluster.

    Tip

    Unpack the Apache Kafka® files to your ~/home/<username> directory.

    If the path to the Apache Kafka®'s executable and batch files is too long, you'll get an error when trying to run the files: The input line is too long.

  4. Start a new PowerShell session:

    pwsh
    
  5. Type the following to create a consumer:

    .\<path to folder with unpacked Apache Kafka® files>\bin\kafka-console-consumer.sh `
    --bootstrap-server <your Apache Kafka® cluster connection string> `
    --topic <topic name> `
    --consumer-property security.protocol=SASL_SSL `
    --consumer-property sasl.mechanism=SCRAM-SHA-512 `
    --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
    
  6. Open a new terminal instance, start another PowerShell session, and type the following to create a producer:

    echo "key:test message" | .\<path to folder with unpacked Apache Kafka® files>\bin\kafka-console-producer.sh `
    --bootstrap-server <your Apache Kafka® cluster connection string> `
    --topic <topic name> `
    --property parse.key=true `
    --property key.separator=":" `
    --producer-property acks=all `
    --producer-property security.protocol=SASL_SSL `
    --producer-property sasl.mechanism=SCRAM-SHA-512 `
    --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
    
  7. After you've completed all the above steps, the terminal with the consumer will show the following output: test message.

Sample connection scripts

We tested the connections in the following environments:

  • Ubuntu 20.04 LTS:

    • Python: 3.8.10, pip: 20.0.2

    • NodeJS: 10.19.0 npm: 6.14.17

    • Go: 1.17.2

    • C#: 7.3 .NET Framework 7.0.101

    • Java: 11.0.17 Maven: 3.6.3

Python

  1. Install the software and dependencies:

    sudo apt update && sudo apt install -y python3 python3-pip && pip3 install requests
    
  2. Install the kafka-python library:

    pip install kafka-python
    
  3. The following code creates a consumer:

    consumer.py

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
       '<topic name>',
       bootstrap_servers='<your cluster connection string>',
       security_protocol="SASL_SSL",
       sasl_mechanism="SCRAM-SHA-512",
       sasl_plain_username='<username>',
       sasl_plain_password='<password>')
    
    for msg in consumer:
       print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
    
  4. The following code creates a producer and pushes a text message:

    producer.py

    from kafka import KafkaProducer
    
    producer = KafkaProducer(
       bootstrap_servers='<your cluster connection string>',
       security_protocol="SASL_SSL",
       sasl_mechanism="SCRAM-SHA-512",
       sasl_plain_username='<username>',
       sasl_plain_password='<password>')
    
    producer.send('<topic name>', b'test message', b'key')
    producer.flush()
    producer.close()
    
  5. Run the applications:

    python3 consumer.py
    
    python3 producer.py
    
  6. After you've completed all the above steps, the consumer will show "test message".

NodeJS

  1. Install the software and dependencies:

    sudo apt update && sudo apt install -y nodejs npm && \
    npm install kafkajs
    
  2. The following code creates a producer and pushes a text message:

    producer.js

    import { Kafka } from "kafkajs";
    
    const topic = "<topic name>";
    const message_count = 5;
    
    const kf = new Kafka({
       brokers: ["<your cluster connection string>"],
       clientId: "doublecloud-example-client",
       sasl: {
          mechanism: "scram-sha-512",
          username: process.env.KAFKA_USER,
          password: process.env.KAFKA_PASSWORD,
       },
       ssl: {
          rejectUnauthorized: true,
       },
    });
    
    const producer = kf.producer();
    
    async function run() {
       await producer.connect();
    
       const now = new Date();
       for (let i = 0; i < message_count; i++) {
          let message = {
                key: Buffer.from(`msg-key-${now.getTime() / 1000}-${i}`),
                value: Buffer.from(JSON.stringify({
                   "origin": "kafkajs",
                   "scheduled_at": now,
                }))
          };
    
          producer
                .send({topic: topic, messages: [message]})
                .then((metadata) => {
                   const md = metadata.pop();
                   console.log(`[Produced message] Key: ${message.key}, Offset: ${md.baseOffset}, Partition: ${md.partition}`);
                })
                .catch((err) => {
                   console.log(`[Producer Error] ${err}`)
                })
       }
    }
    
    run().catch((err) => {
       console.log("Unexpected Error: ", err);
    })
    
  3. The following code creates a consumer:

    consumer.js

    import { Kafka } from "kafkajs";
    
    const topic = "<topic name>";
    const kf = new Kafka({
       brokers: ["<your cluster connection string>"],
       clientId: "doublecloud-example-client",
       sasl: {
          mechanism: "scram-sha-512",
          username: process.env.KAFKA_USER,
          password: process.env.KAFKA_PASSWORD,
       },
       ssl: {
          rejectUnauthorized: true,
       },
    });
    
    const consumer = kf.consumer({
       groupId: "doublecloud-example-group.v1",
    });
    
    async function run() {
       await consumer.connect()
       await consumer.subscribe({
          topic: topic,
          fromBeginning: true,
       })
    
       return consumer.run({
          autoCommit: true,
          eachMessage: (payload) => {
                const message = payload.message;
                console.log(`[Consumed Message] k: ${message.key.toString()}, v: ${message.value.toString()}`);
          }
       })
    }
    
    run().catch((err) => {
       console.log("Unexpected Error: ", err);
    })
    
  4. In separate terminals, run applications:

    node consumer.js
    
    node producer.js
    
  5. After you've completed all the above steps, the producer will send one or more key:test message and the consumer will show the output.

Go

  1. Install the software and dependencies:

    sudo apt update && sudo apt install -y golang git && \
    go install github.com/Shopify/sarama@latest && \
    go install github.com/xdg/scram@latest/ && \
    go get github.com/shopify/sarama && \
    go get github.com/xdg/scram
    
  2. Create a directory for the project:

    cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
    
  3. Create the scram.go file with the code for running SCRAM . This code is the same for the producer application and consumer application:

    scram.go
    package scram
    
    import (
       "crypto/sha256"
       "crypto/sha512"
    
       "github.com/xdg-go/scram"
    )
    
    var (
       SHA256 scram.HashGeneratorFcn = sha256.New
       SHA512 scram.HashGeneratorFcn = sha512.New
    )
    
    type XDGSCRAMClient struct {
       *scram.Client
       *scram.ClientConversation
       scram.HashGeneratorFcn
    }
    
    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
       x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
       if err != nil {
          return err
       }
       x.ClientConversation = x.Client.NewConversation()
       return nil
    }
    
    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
       response, err = x.ClientConversation.Step(challenge)
       return
    }
    
    func (x *XDGSCRAMClient) Done() bool {
       return x.ClientConversation.Done()
    }
    
  4. Copy scram.go to the directory of the producer application and the consumer application:

    cp scram.go producer/scram.go && cp scram.go consumer/scram.go
    
  5. The following code creates a producer and pushes a text message:

    producer/main.go

    package main
    
    import (
       "fmt"
       "log"
       "time"
    
       "github.com/Shopify/sarama"
    
       "github.com/yadc-io/examples-kafka-go/scram"
    )
    
    const (
       clientID = "double-cloud-example.v1"
       topic    = "<topic name>"
    )
    
    func main() {
       cfg := sarama.NewConfig()
       cfg.ClientID = clientID
       cfg.Net.TLS.Enable = true
       cfg.Net.SASL.Enable = true
       cfg.Net.SASL.Handshake = true
       cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
       cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
          // See https://github.com/Shopify/sarama/blob/main/examples/sasl_scram_client/scram_client.go
          // This is workaround should be addressed in https://github.com/Shopify/sarama/pull/2086
          return &scram.XDGSCRAMClient{HashGeneratorFcn: scram.SHA512}
       }
       cfg.Net.SASL.User = "<username>"
       cfg.Net.SASL.Password = "<password>"
    
       brokers := []string{"<your cluster connection string>"}
    
       cfg.Producer.Return.Successes = true
       p, err := sarama.NewSyncProducer(brokers, cfg)
       if err != nil {
          panic(err)
       }
    
       msgCount := 5
       now := time.Now()
       for i := 0; i < msgCount; i++ {
          part, offset, err := p.SendMessage(&sarama.ProducerMessage{
             Topic:     topic,
             Key:       sarama.StringEncoder(fmt.Sprintf("sarama-msg-key-%d-%d", now.Unix(), i)),
             Value:     sarama.StringEncoder(fmt.Sprintf("sarama-msg-body-%s-%d", now, i)),
             Headers:   nil,
             Metadata:  nil,
             Offset:    0,
             Partition: 0,
             Timestamp: time.Time{},
          })
    
          if err != nil {
             panic(err)
          }
    
          log.Printf("[Published message] Partition: %d, Offset: %d", part, offset)
       }
    }
    
  6. The following code creates a consumer:

    consumer/main.go

    package main
    
    import (
       "context"
       "log"
       "time"
    
       "github.com/Shopify/sarama"
    
       "github.com/yadc-io/examples-kafka-go/scram"
    )
    
    const (
       clientID = "double-cloud-example.v1"
       groupID  = "double-cloud-example-group.v1"
       topic    = "<topic name>"
    )
    
    func main() {
       cfg := sarama.NewConfig()
       cfg.ClientID = clientID
       cfg.Net.TLS.Enable = true
       cfg.Net.SASL.Enable = true
       cfg.Net.SASL.Handshake = true
       cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
       cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
          // See https://github.com/Shopify/sarama/blob/main/examples/sasl_scram_client/scram_client.go
          // This is workaround should be addressed in https://github.com/Shopify/sarama/pull/2086
          return &scram.XDGSCRAMClient{HashGeneratorFcn: scram.SHA512}
       }
       cfg.Net.SASL.User = "<username>"
       cfg.Net.SASL.Password = "<password>"
    
       brokers := []string{"<your cluster connection string>"}
    
       cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
       c, err := sarama.NewConsumerGroup(brokers, groupID, cfg)
       if err != nil {
          panic(err)
       }
       defer c.Close()
    
       timeout := 30 * time.Second
       ctx, cancel := context.WithTimeout(context.Background(), timeout)
       defer cancel()
    
       log.Printf("Consuming messages for %0.0f seconds", timeout.Seconds())
       if err = c.Consume(ctx, []string{topic}, &PrintHandler{}); err != nil {
          panic(err)
       }
    }
    
    type PrintHandler struct{}
    
    func (h *PrintHandler) Setup(sess sarama.ConsumerGroupSession) error {
       return nil
    }
    
    func (h *PrintHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
       return nil
    }
    
    func (h *PrintHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
       for {
          select {
          case m := <-claim.Messages():
             log.Printf("[Message consumed] Key: %s, Value: %s", m.Key, m.Value)
             sess.MarkMessage(m, "")
          case <-sess.Context().Done():
             return nil
          }
       }
    }
    
  7. In separate terminals, open your consumer and producer folders and run the following command to create go modules:

    go mod init main.go
    
  8. After completing all the above steps, the producer will send one or more key:test message and the consumer will show the output.

C#

  1. Install the software and dependencies:

    wget https://packages.microsoft.com/config/ubuntu/20.04/packages-microsoft-prod.deb -O packages-microsoft-prod.deb && \
    sudo dpkg -i packages-microsoft-prod.deb && \
    sudo apt-get update && \
    sudo apt-get install -y apt-transport-https dotnet-sdk-7.0
    
  2. Create a directory for the project:

    cd ~/ && mkdir cs-project && cd cs-project && mkdir -p consumer producer && cd ~/cs-project
    
  3. Create a configuration file:

    App.csproj

    <Project Sdk="Microsoft.NET.Sdk">
    <PropertyGroup>
       <OutputType>Exe</OutputType>
       <TargetFramework>netcoreapp7.0</TargetFramework>
    </PropertyGroup>
    <ItemGroup>
       <PackageReference Include="Confluent.Kafka" Version="1.4.2" />
    </ItemGroup>
    </Project>
    
  4. Copy App.csproj to the directories of the producer application and consumer application:

    cp App.csproj producer/App.csproj && cp App.csproj consumer/App.csproj
    
  5. The following code creates a consumer:

    cs-project/consumer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace CCloud
    {
       class Program
       {
          public static void Main(string[] args)
          {
             string HOST = "<your cluster connection string>";
             string TOPIC = "<topic name>";
             string USER = "<username>";
             string PASS = "<password>";
    
             var consumerConfig = new ConsumerConfig(
                new Dictionary<string,string>{
                   {"bootstrap.servers", HOST},
                   {"security.protocol", "SASL_SSL"},
                   {"sasl.mechanisms", "SCRAM-SHA-512"},
                   {"sasl.username", USER},
                   {"sasl.password", PASS},
                   {"group.id", "demo"}
                }
             );
    
             var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
             consumer.Subscribe(TOPIC);
             try
             {
                while (true)
                {
                   var cr = consumer.Consume();
                   Console.WriteLine(cr.Message.Value);
                }
             }
             catch (OperationCanceledException)
             {
                // Ctrl-C pressed.
             }
             finally
             {
                consumer.Close();
             }
          }
       }
    }
    
  6. The following code creates a producer and pushes a text message:

    cs-project/producer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace App
    {
       class Program
       {
          public static void Main(string[] args)
          {
             int MSG_COUNT = 5;
    
             string HOST = "<your cluster connection string>";
             string TOPIC = "<topic name>";
             string USER = "<producer name>";
             string PASS = "<producer password>";
    
             var producerConfig = new ProducerConfig(
                new Dictionary<string,string>{
                   {"bootstrap.servers", HOST},
                   {"security.protocol", "SASL_SSL"},
                   {"sasl.mechanisms", "SCRAM-SHA-512"},
                   {"sasl.username", USER},
                   {"sasl.password", PASS}
                }
             );
    
             var producer = new ProducerBuilder<string, string>(producerConfig).Build();
    
             for(int i=0; i<MSG_COUNT; i++)
             {
                producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" },
                (deliveryReport) =>
                   {
                      if (deliveryReport.Error.Code != ErrorCode.NoError)
                      {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                         }
                         else
                         {
                            Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                         }
                   });
             }
             producer.Flush(TimeSpan.FromSeconds(10));
          }
       }
    }
    
  7. Build and launch both applications:

    cd ~/cs-project/consumer && dotnet build && \
    dotnet run bin/Debug/netcoreapp7.0/App.dll 
    
    cd ~/cs-project/producer && dotnet build && \
    dotnet run bin/Debug/netcoreapp7.0/App.dll
    
  8. After you've completed all the above steps, the producer will send one or more key:test message and the consumer will show the output.

Java

  1. Install the dependencies:

    sudo apt update && sudo apt install -y default-jdk maven
    
  2. Create a folder for the Maven project:

    cd ~/ && mkdir project && cd project && mkdir -p consumer/src/java/com/example producer/src/java/com/example && cd ~/project
    
  3. Create a configuration file for Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.example</groupId>
        <artifactId>app</artifactId>
        <packaging>jar</packaging>
        <version>0.1.0</version>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.30</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.6.0</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}-${project.version}</finalName>
            <sourceDirectory>src</sourceDirectory>
            <resources>
                <resource>
                    <directory>src</directory>
                </resource>
            </resources>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>attached</goal>
                            </goals>
                            <phase>package</phase>
                            <configuration>
                                <descriptorRefs>
                                    <descriptorRef>jar-with-dependencies</descriptorRef>
                                </descriptorRefs>
                                <archive>
                                    <manifest>
                                        <mainClass>com.example.App</mainClass>
                                    </manifest>
                                </archive>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.App</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    Current versions of Maven dependencies are available to view at:

  4. Copy pom.xml to the directories of the producer application and consumer application:

    cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
    
  5. The following code creates a producer and pushes a text message:

    producer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.clients.producer.*;
    
    public class App {
    
       public static void main(String[] args) {
    
          int MSG_COUNT = 5;
    
          String HOST = "<cluster connection string>";
          String TOPIC = "<topic name>";
          String USER = "<username>";
          String PASS = "<password>";
    
          String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
          String jaasCfg = String.format(jaasTemplate, USER, PASS);
          String KEY = "key";
    
          String serializer = StringSerializer.class.getName();
          Properties props = new Properties();
          props.put("bootstrap.servers", HOST);
          props.put("acks", "all");
          props.put("key.serializer", serializer);
          props.put("value.serializer", serializer);
          props.put("security.protocol", "SASL_SSL");
          props.put("sasl.mechanism", "SCRAM-SHA-512");
          props.put("sasl.jaas.config", jaasCfg);
    
          Producer<String, String> producer = new KafkaProducer<>(props);
    
          try {
             for (int i = 1; i <= MSG_COUNT; i++){
                producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
             System.out.println("Test message " + i);
             }
          producer.flush();
          producer.close();
          } catch (Exception ex) {
             System.out.println(ex);
             producer.close();
          }
       }
    }
    
  6. The following code creates a consumer:

    consumer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.clients.consumer.*;
    
    public class App {
    
       public static void main(String[] args) {
    
          String HOST = "<cluster connection string>";
          String TOPIC = "<topic name>";
          String USER = "<username>";
          String PASS = "<password>";
    
          String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
          String jaasCfg = String.format(jaasTemplate, USER, PASS);
          String GROUP = "demo";
    
          String deserializer = StringDeserializer.class.getName();
          Properties props = new Properties();
          props.put("bootstrap.servers", HOST);
          props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
          props.put("group.id", GROUP);
          props.put("key.deserializer", deserializer);
          props.put("value.deserializer", deserializer);
          props.put("security.protocol", "SASL_SSL");
          props.put("sasl.mechanism", "SCRAM-SHA-512");
          props.put("sasl.jaas.config", jaasCfg);
    
          Consumer<String, String> consumer = new KafkaConsumer<>(props);
          consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
    
          while(true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key() + ":" + record.value());
             }
          }
       }
    }
    
  7. Build applications:

    cd ~/project/producer && mvn clean package && \
    cd ~/project/consumer && mvn clean package
    
  8. Run applications:

    java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
    
    java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
    
  9. After you've completed all the above steps, the producer will send one or more key:test message and the consumer will show the output.

Connect with public and private connection strings

When you connect to a cluster via a peering connection from VPC, you need to use a private address instead of the normally used public address.

To obtain a cluster's private connection string, go to the cluster overview page. Under Connection strings, switch to the Private tab:

connection strings tabs

You can also connect to a certain host on your cluster. The structures of a cluster and a host connection string differ as follows:

  • Public address:

    rw.<cluster id>.at.double.cloud
    # or 
    <host name>.<cluster id>.at.double.cloud
    
  • Private address:

    rw.<cluster id>.private.at.double.cloud
    # or 
    <host name>.<cluster id>.private.at.double.cloud