Connect to an Apache Kafka® cluster

Note

Note that it's only possible to connect to your cluster hosts using an SSL connection.

Connect with CLI-based tools

We tested the connections in the following environments:

  • Ubuntu 20.04 LTS:

  • Windows 10 Enterprise 1909:

    • PowerShell: 7.2.4 OpenJDK 17.0.3 Apache Kafka® 3.0

Bash

  1. Install the kcat (formerly kafkacat) utility.

    sudo apt update && sudo apt install kafkacat
    
  2. Run the command below to create a consumer and receive messages from a topic. The consumer username and consumer password parameters are your cluster's username and password. You can find them in your cluster's Overview tab.

    kafkacat -C \
          -b <broker host domain name>:9091 \
          -t <topic name> \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="<cluster username>" \
          -X sasl.password="<cluster password>" \
          -Z
    
  3. Execute the following command to create a producer and send a text message. Pay attention - you need to run this command in a separate terminal. You can find them in your cluster's Overview tab.

    echo "test message" | kafkacat -P \
          -b <broker host domain name>:9091 \
          -t <topic name> \
          -X security.protocol=SASL_SSL \
          -X sasl.mechanisms=SCRAM-SHA-512 \
          -X sasl.username="<cluster username>" \
          -X sasl.password="<cluster password>" \
    

    Warning

    The command above assumes that you have a topic with the Cleanup policy of the DELETE type. In case you set COMPACT or COMPACT_AND_DELETE types, specify keys for your messages as follows: "test_key:test message" and add the K: parameter to the commands above.

  4. If you've completed all the steps successfully, the terminal with the consumer will show "test message".

Docker

  1. (Optional) Start Docker if needed:

    sudo service docker start
    
  2. Pull the Edenhill kcat Docker container with a specified version:

    docker pull edenhill/kcat:<version>
    
  3. Run a command that contains a connection string in a container to create a consumer. You can use one of the Connection strings from the Overview tab on your cluster information page. The command will have the following structure:

    docker run --name kcat --rm -i -t edenhill/kcat:<version> -C \
               -b <broker FQDN>:9091 \
               -t <topic name> \
               -X security.protocol=SASL_SSL \
               -X sasl.mechanisms=SCRAM-SHA-512 \
               -X sasl.username="<cluster username>" \
               -X sasl.password="<cluster password>" \
               -Z
    
  4. Execute the following command to create a producer and push a text string:

    echo "test message" | docker run --name kcat --rm -i -t edenhill/kcat:<version> -P \
             -b <broker FQDN>:9091 \
             -t <topic name> \
             -k key \
             -X security.protocol=SASL_SSL \
             -X sasl.mechanisms=SCRAM-SHA-512 \
             -X sasl.username="<username>" \
             -X sasl.password="<password>" \
    

    Warning

    The command above assumes that you have a topic with the Cleanup policy of the DELETE type. In case you set COMPACT or COMPACT_AND_DELETE types, specify keys for your messages as follows: "test_key:test message" and add the K: parameter to the commands above.

  5. If you've completed all the steps successfully, the terminal with the consumer will show "test message".

Powershell

  1. Install the latest available version of Microsoft OpenJDK .

  2. Download and unpack the archive with binary files for the Apache Kafka® of the same version used by your cluster.

    Tip

    Unpack the Apache Kafka® files to the disk's root directory, for example, C:\kafka_2.12-2.6.0\.

    If the path to the Apache Kafka®'s executable and batch files is too long, you'll get an error when trying to run the files: The input line is too long.

  3. Type the following command to create a consumer:

    <path to the directory with Apache Kafka® files>\bin\windows\kafka-console-consumer.bat `
    --bootstrap-server <your Apache Kafka® cluster connection string> `
    --topic <topic name> `
    --consumer-property security.protocol=SASL_SSL `
    --consumer-property sasl.mechanism=SCRAM-SHA-512 `
    --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
    
  4. The following code creates a producer and pushes a text message:

    echo "key:test message" | <path to the directory with Apache Kafka® files>\bin\windows\kafka-console-producer.bat `
    --bootstrap-server <your Apache Kafka® cluster connection string> `
    --topic <topic name> `
    --property parse.key=true `
    --property key.separator=":" `
    --producer-property acks=all `
    --producer-property security.protocol=SASL_SSL `
    --producer-property sasl.mechanism=SCRAM-SHA-512 `
    --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
    
  5. If you've completed all the steps successfully, the terminal with the consumer will show "test message".

  1. Install the Snapd package:

    Installation prerequisites for Linux Mint users

    To allow installation of snapd in Linux Mint, execute the following in your terminal:

    rm /etc/apt/preferences.d/nosnap.pref
    
    sudo apt install snapd
    
  2. Install the latest version of PowerShell from Snap:

    sudo snap install powershell --classic
    
  3. Download and unpack the archive with binary files for the Apache Kafka® of the same version used by your cluster.

    Tip

    Unpack the Apache Kafka® files to your ~/home/<username> directory.

    If the path to the Apache Kafka®'s executable and batch files is too long, you'll get an error when trying to run the files: The input line is too long.

  4. Start a new PowerShell session:

    pwsh
    
  5. Type the following to create a consumer:

    .\<path to folder with unpacked Apache Kafka® files>\bin\kafka-console-consumer.sh `
    --bootstrap-server <your Apache Kafka® cluster connection string> `
    --topic <topic name> `
    --consumer-property security.protocol=SASL_SSL `
    --consumer-property sasl.mechanism=SCRAM-SHA-512 `
    --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
    
  6. Open a new terminal instance, start another PowerShell session, and type the following to create a producer:

    echo "key:test message" | .\<path to folder with unpacked Apache Kafka® files>\bin\kafka-console-producer.sh `
    --bootstrap-server <your Apache Kafka® cluster connection string> `
    --topic <topic name> `
    --property parse.key=true `
    --property key.separator=":" `
    --producer-property acks=all `
    --producer-property security.protocol=SASL_SSL `
    --producer-property sasl.mechanism=SCRAM-SHA-512 `
    --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
    
  7. If you've completed all the steps correctly, the terminal with the consumer will show the following output: test message.

Sample connection scripts

We tested the connections in the following environments:

  • Ubuntu 20.04 LTS:

    • Python: 3.8.10, pip: 20.0.2

    • NodeJS: 10.19.0 npm: 6.14.4

    • Go: 1.17.2

    • C#: 7.3 .NET Framework 4.7.2

    • Java: 11.0.8 Maven: 3.6.3

Python

  1. Install the software and dependencies:

    sudo apt update && sudo apt install -y python3 python3-pip && pip3 install requests
    
  2. The following code creates a consumer:

    consumer.py

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
       '<topic name>',
       bootstrap_servers='<FQDN of the broker>:9091',
       security_protocol="SASL_SSL",
       sasl_mechanism="SCRAM-SHA-512",
       sasl_plain_password='<username>',
       sasl_plain_username='<password>')
    
    for msg in consumer:
       print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
    
  3. The following code creates a producer and pushes a text message:

    producer.py

    from kafka import KafkaProducer
    
    producer = KafkaProducer(
       bootstrap_servers='<FQDN of the broker host>:9091',
       security_protocol="SASL_SSL",
       sasl_mechanism="SCRAM-SHA-512",
       sasl_plain_password='<username>',
       sasl_plain_username='<password>')
    
    producer.send('<topic name>', b'test message', b'key')
    producer.flush()
    producer.close()
    
  4. Running applications:

    python3 consumer.py
    
    python3 producer.py
    
  5. If you've completed all the steps successfully, the consumer will show "test message".

NodeJS

  1. Install the software and dependencies:

    sudo apt update && sudo apt install -y nodejs npm && \
    npm install node-rdkafka
    
  2. The following code creates a producer and pushes a text message:

    producer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<FQDN of the broker>:9091";
    const TOPIC = "<topic name>";
    const USER = "<username>";
    const PASS = "<password>";
    
    const producer = new Kafka.Producer({
       'bootstrap.servers': HOST,
       'sasl.username': USER,
       'sasl.password': PASS,
       'security.protocol': "SASL_SSL",
       'sasl.mechanisms': "SCRAM-SHA-512"
    });
    
    producer.connect();
    
    producer.on('ready', function() {
       try {
          for (let i = 0; i < MSG_COUNT; ++i) {
             producer.produce(TOPIC, -1, Buffer.from ("test     message"), "key");
             console.log("Produced: test message");
          }
    
          producer.flush(10000, () => {
             producer.disconnect();
          });
       } catch (err) {
          console.error('Error');
          console.error(err);
       }
    });
    
  3. The following code creates a consumer:

    consumer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<FQDN of the broker>:9091";
    const TOPIC = "<topic name>";
    const USER = "<username>";
    const PASS = "<password>";
    
    const consumer = new Kafka.Consumer({
       'bootstrap.servers': HOST,
       'sasl.username': USER,
       'sasl.password': PASS,
       'security.protocol': "SASL_SSL",
       'sasl.mechanisms': "SCRAM-SHA-512",
       'group.id': "demo"
    });
    
    consumer.connect();
    
    consumer
       .on('ready', function() {
          consumer.subscribe([TOPIC]);
          consumer.consume();
       })
       .on('data', function(data) {
          console.log(data.value.toString());
       });
    
    process.on('SIGINT', () => {
       console.log('\nDisconnecting consumer ...');
       consumer.disconnect();
    });
    
  4. Run applications:

    node consumer.js
    
    node producer.js
    
  5. If you've completed all the steps successfully, the producer will send one or more key:test message and the consumer will show the output.

Go

  1. Install the software and dependencies:

    sudo apt update && sudo apt install -y golang git && \
    go get github.com/Shopify/sarama && \
    go get github.com/xdg/scram
    
  2. Create a directory for the project:

    cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
    
  3. Create the scram.go file with the code for running SCRAM . This code is the same for the producer application and consumer application:

    scram.go
    package main
     
    import (
    "crypto/sha256"
    "crypto/sha512"
    "hash"
     
    "github.com/xdg/scram"
    )
     
    var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
    var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
     
    type XDGSCRAMClient struct {
       *scram.Client
       *scram.ClientConversation
       scram.HashGeneratorFcn
    }
     
    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
       x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
       if err != nil {
          return err
       }
       x.ClientConversation = x.Client.NewConversation()
       return nil
    }
     
    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
       response, err = x.ClientConversation.Step(challenge)
       return
    }
     
    func (x *XDGSCRAMClient) Done() bool {
       return x.ClientConversation.Done()
    }
    
  4. Copy scram.go to the directory of the producer application and the consumer application:

    cp scram.go producer/scram.go && cp scram.go consumer/scram.go
    
  5. The following code creates a producer and pushes a text message:

    producer/main.go

    package main
    
    import (
       "fmt"
       "crypto/tls"
       "crypto/x509"
       "io/ioutil"
       "os"
       "strings"
    
       "github.com/Shopify/sarama"
    )
    
    func main() {
       brokers := "<FQDN of the broker host>:9091"
       splitBrokers := strings.Split(brokers, ",")
       conf := sarama.NewConfig()
       conf.Producer.RequiredAcks = sarama.WaitForAll
       conf.Producer.Return.Successes = true
       conf.Version = sarama.V0_10_0_0
       conf.ClientID = "sasl_scram_client"
       conf.Net.SASL.Enable = true
       conf.Net.SASL.Handshake = true
       conf.Net.SASL.User = "<username>"
       conf.Net.SASL.Password = "<password>"
       conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
       conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
       conf.Net.TLS.Enable = true
    
       syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
       if err != nil {
          fmt.Println("Couldn't create producer: ", err.Error())
          os.Exit(0)
       }
       publish("test message", syncProducer)
    }
    
    func publish(message string, producer sarama.SyncProducer) {
    // publish sync
    msg := &sarama.ProducerMessage {
       Topic: "<topic name>",
       Value: sarama.StringEncoder(message),
    }
    p, o, err := producer.SendMessage(msg)
    if err != nil {
       fmt.Println("Error publish: ", err.Error())
    }
    
    fmt.Println("Partition: ", p)
    fmt.Println("Offset: ", o)
    }
    
  6. The following code creates a consumer:

    consumer/main.go

    package main
    
    import (
       "fmt"
       "crypto/tls"
       "crypto/x509"
       "io/ioutil"
       "os"
       "os/signal"
       "strings"
    
       "github.com/Shopify/sarama"
    )
    
    func main() {
       brokers := "<FQDN of the broker host>:9091"
       splitBrokers := strings.Split(brokers, ",")
       conf := sarama.NewConfig()
       conf.Producer.RequiredAcks = sarama.WaitForAll
       conf.Version = sarama.V0_10_0_0
       conf.Consumer.Return.Errors = true
       conf.ClientID = "sasl_scram_client"
       conf.Metadata.Full = true
       conf.Net.SASL.Enable = true
       conf.Net.SASL.User =  "<username>"
       conf.Net.SASL.Password = "<password>"
       conf.Net.SASL.Handshake = true
       conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
       conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
       conf.Net.TLS.Enable = true
    
       master, err := sarama.NewConsumer(splitBrokers, conf)
       if err != nil {
          fmt.Println("Coulnd't create consumer: ", err.Error())
          os.Exit(1)
       }
    
       defer func() {
          if err := master.Close(); err != nil {
             panic(err)
          }
       }()
    
       topic := "<topic name>"
    
       consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
       if err != nil {
          panic(err)
       }
    
       signals := make(chan os.Signal, 1)
       signal.Notify(signals, os.Interrupt)
    
       // Count how many message processed
       msgCount := 0
    
       // Get signal for finish
       doneCh := make(chan struct{})
       go func() {
          for {
             select {
             case err := <-consumer.Errors():
                fmt.Println(err)
             case msg := <-consumer.Messages():
                msgCount++
                fmt.Println("Received messages", string(msg.Key), string(msg.Value))
             case <-signals:
                fmt.Println("Interrupt is detected")
                   doneCh <- struct{}{}
             }
          }
       }()
    
       <-doneCh
       fmt.Println("Processed", msgCount, "messages")
    }
    
  7. If you've completed all the steps successfully, the producer will send one or more key:test message and the consumer will show the output.

C#

  1. Install the software and dependencies:

    wget https://packages.microsoft.com/config/ubuntu/20.04/packages-microsoft-prod.deb -O packages-microsoft-prod.deb && \
    sudo dpkg -i packages-microsoft-prod.deb && \
    sudo apt-get update && \
    sudo apt-get install -y apt-transport-https dotnet-sdk-5.0
    
  2. Create a directory for the project:

    cd ~/ && mkdir cs-project && cd cs-project && mkdir -p consumer producer && cd ~/cs-project
    
  3. Create a configuration file:

    App.csproj

    <Project Sdk="Microsoft.NET.Sdk">
       <PropertyGroup>
          <OutputType>Exe</OutputType>
          <TargetFramework>netcoreapp5.0</TargetFramework>
       </PropertyGroup>
    
       <ItemGroup>
          <PackageReference Include="Confluent.Kafka" Version="1.4.2" />
       </ItemGroup>
    </Project>
    
  4. Copy App.csproj to the directories of the producer application and consumer application:

    cp App.csproj producer/App.csproj && cp App.csproj consumer/App.csproj
    
  5. The following code creates a consumer:

    cs-project/consumer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace CCloud
    {
       class Program
       {
          public static void Main(string[] args)
          {
             string HOST = "<FQDN of the broker host>:9091";
             string TOPIC = "<topic name>";
             string USER = "<username>";
             string PASS = "<password>";
    
             var consumerConfig = new ConsumerConfig(
                new Dictionary<string,string>{
                   {"bootstrap.servers", HOST},
                   {"security.protocol", "SASL_SSL"},
                   {"sasl.mechanisms", "SCRAM-SHA-512"},
                   {"sasl.username", USER},
                   {"sasl.password", PASS},
                   {"group.id", "demo"}
                }
             );
    
             var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
             consumer.Subscribe(TOPIC);
             try
             {
                while (true)
                {
                   var cr = consumer.Consume();
                   Console.WriteLine(cr.Message.Value);
                }
             }
             catch (OperationCanceledException)
             {
                // Ctrl-C pressed.
             }
             finally
             {
                consumer.Close();
             }
          }
       }
    }
    
  6. The following code creates a producer and pushes a text message:

    cs-project/producer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace App
    {
       class Program
       {
          public static void Main(string[] args)
          {
             int MSG_COUNT = 5;
    
             string HOST = "<FQDN of the broker host>:9091";
             string TOPIC = "<topic name>";
             string USER = "<producer name>";
             string PASS = "<producer password>";
    
             var producerConfig = new ProducerConfig(
                new Dictionary<string,string>{
                   {"bootstrap.servers", HOST},
                   {"security.protocol", "SASL_SSL"},
                   {"sasl.mechanisms", "SCRAM-SHA-512"},
                   {"sasl.username", USER},
                   {"sasl.password", PASS}
                }
             );
    
             var producer = new ProducerBuilder<string, string>(producerConfig).Build();
    
             for(int i=0; i<MSG_COUNT; i++)
             {
                producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" },
                (deliveryReport) =>
                   {
                      if (deliveryReport.Error.Code != ErrorCode.NoError)
                      {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                         }
                         else
                         {
                            Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                         }
                   });
             }
             producer.Flush(TimeSpan.FromSeconds(10));
          }
       }
    }
    
  7. Build and launch both applications:

    cd ~/cs-project/consumer && dotnet build && \
    dotnet run bin/Debug/netcoreapp5.0/App.dll
    
    cd ~/cs-project/producer && dotnet build && \
    dotnet run bin/Debug/netcoreapp5.0/App.dll
    
  8. If you've completed all the steps successfully, the producer will send one or more key:test message and the consumer will show the output.

Java

  1. Install the dependencies:

    sudo apt update && sudo apt install -y default-jdk maven
    
  2. Create a folder for the Maven project:

    cd ~/ && mkdir project && cd project && mkdir -p consumer/src/java/com/example producer/src/java/com/example && cd ~/project
    
  3. Create a configuration file for Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.example</groupId>
        <artifactId>app</artifactId>
        <packaging>jar</packaging>
        <version>0.1.0</version>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.30</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.6.0</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}-${project.version}</finalName>
            <sourceDirectory>src</sourceDirectory>
            <resources>
                <resource>
                    <directory>src</directory>
                </resource>
            </resources>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>attached</goal>
                            </goals>
                            <phase>package</phase>
                            <configuration>
                                <descriptorRefs>
                                    <descriptorRef>jar-with-dependencies</descriptorRef>
                                </descriptorRefs>
                                <archive>
                                    <manifest>
                                        <mainClass>com.example.App</mainClass>
                                    </manifest>
                                </archive>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.App</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    Current versions of Maven dependencies are available to view at:

  4. Copy pom.xml to the directories of the producer application and consumer application:

    cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
    
  5. The following code creates a producer and pushes a text message:

    producer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.clients.producer.*;
    
    public class App {
    
       public static void main(String[] args) {
    
          int MSG_COUNT = 5;
    
          String HOST = "<FQDN of the broker>:9091";
          String TOPIC = "<topic name>";
          String USER = "<username>";
          String PASS = "<password>";
    
          String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
          String jaasCfg = String.format(jaasTemplate, USER, PASS);
          String KEY = "key";
    
          String serializer = StringSerializer.class.getName();
          Properties props = new Properties();
          props.put("bootstrap.servers", HOST);
          props.put("acks", "all");
          props.put("key.serializer", serializer);
          props.put("value.serializer", serializer);
          props.put("security.protocol", "SASL_SSL");
          props.put("sasl.mechanism", "SCRAM-SHA-512");
          props.put("sasl.jaas.config", jaasCfg);
    
          Producer<String, String> producer = new KafkaProducer<>(props);
    
          try {
             for (int i = 1; i <= MSG_COUNT; i++){
                producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
             System.out.println("Test message " + i);
             }
          producer.flush();
          producer.close();
          } catch (Exception ex) {
             System.out.println(ex);
             producer.close();
          }
       }
    }
    
  6. The following code creates a consumer:

    consumer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.clients.consumer.*;
    
    public class App {
    
       public static void main(String[] args) {
    
          String HOST = "<FQDN of the broker>:9091";
          String TOPIC = "<topic name>";
          String USER = "<username>";
          String PASS = "<password>";
    
          String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
          String jaasCfg = String.format(jaasTemplate, USER, PASS);
          String GROUP = "demo";
    
          String deserializer = StringDeserializer.class.getName();
          Properties props = new Properties();
          props.put("bootstrap.servers", HOST);
          props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
          props.put("group.id", GROUP);
          props.put("key.deserializer", deserializer);
          props.put("value.deserializer", deserializer);
          props.put("security.protocol", "SASL_SSL");
          props.put("sasl.mechanism", "SCRAM-SHA-512");
          props.put("sasl.jaas.config", jaasCfg);
    
          Consumer<String, String> consumer = new KafkaConsumer<>(props);
          consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
    
          while(true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key() + ":" + record.value());
             }
          }
       }
    }
    
  7. Build applications:

    cd ~/project/producer && mvn clean package && \
    cd ~/project/consumer && mvn clean package
    
  8. Run applications:

    java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
    
    java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
    
  9. If you've completed all the steps successfully, the producer will send one or more key:test message and the consumer will show the output.

Connect with public and private connection strings

When you connect to a cluster via a peering connection from VPC, you need to use a private address instead of the normally used public address.

To obtain a cluster's private connection string, go to the cluster overview page. Under Connection strings, switch to the Private tab:

connection strings tabs

You can also connect to a certain host on your cluster. The structures of a cluster and a host connection string differ as follows:

  • Public address:

    rw.<cluster id>.at.double.cloud
    # or 
    <host name>.<cluster id>.at.double.cloud
    
  • Private address:

    rw.<cluster id>.private.at.double.cloud
    # or 
    <host name>.<cluster id>.private.at.double.cloud