Connect to an Apache Kafka® cluster
Note
Note that it's only possible to connect to your cluster hosts using an SSL connection.
Connect with CLI-based tools
We tested the connections in the following environments:
-
Ubuntu 20.04 LTS:
- Bash (kafkacat):
5.0.17
- Docker:
20.10.12
, build:e91ed57
- PowerShell:
7.2.5
OpenJDK17.0.3
Apache Kafka®3.0
- Bash (kafkacat):
-
Windows 10 Enterprise 1909:
- PowerShell:
7.2.4
OpenJDK17.0.3
Apache Kafka®3.0
- PowerShell:
DoubleCloud Managed Service for Apache Kafka® ports
You can connect to DoubleCloud Managed Service for Apache Kafka® clusters via the following ports:
9091
- the Native interface port, use it to connect with the clickhouse-client19091
- the VPC Peering port.443
- the schema registry interface port.9363
- the metrics port to connect Prometheus or other third-party solutions.
All the above ports are SSL-encrypted.
Bash
-
Install the kcat (formerly kafkacat)
sudo apt update && sudo apt install kafkacat
-
Run the command below to create a consumer and receive messages from a topic. The
consumer username
andconsumer password
parameters are your cluster's username and password. Substitute the fields below with the appropriate information available on your cluster's Overview tab:kafkacat -C \ -b <broker host domain name>:9091 \ -t <topic name> \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="<cluster username>" \ -X sasl.password="<cluster password>" \ -Z
-
Execute the following command to create a producer and send a text message. Pay attention - you need to run this command in a separate terminal. Substitute the fields below with the appropriate information available on your cluster's Overview tab:
echo "test message" | kafkacat -P \ -b <broker host domain name>:9091 \ -t <topic name> \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="<cluster username>" \ -X sasl.password="<cluster password>" \
Warning
The command above assumes that you have a topic with the Cleanup policy of the
DELETE
type. In case you setCOMPACT
orCOMPACT_AND_DELETE
types, specify keys for your messages as follows:"test_key:test message"
and add theK:
parameter to the commands above. -
If you've completed all the steps successfully, the terminal with the consumer will display
"test message"
.
Docker
-
(Optional) Start Docker
sudo service docker start
-
Pull the Edenhill kcat
docker pull edenhill/kcat:<version>
-
Run a command that contains a connection string in a container to create a consumer. You can use one of the Connection strings from the Overview tab on your cluster information page. The command will have the following structure:
docker run --name kcat --rm -i -t edenhill/kcat:<version> -C \ -b <broker FQDN>:9091 \ -t <topic name> \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="<cluster username>" \ -X sasl.password="<cluster password>" \ -Z
-
Execute the following command to create a producer and push a text string:
echo "test message" | docker run --name kcat --rm -i -t edenhill/kcat:<version> -P \ -b <broker FQDN>:9091 \ -t <topic name> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="<username>" \ -X sasl.password="<password>" \
Warning
The command above assumes that you have a topic with the Cleanup policy of the
DELETE
type. In case you setCOMPACT
orCOMPACT_AND_DELETE
types, specify keys for your messages as follows:"test_key:test message"
and add theK:
parameter to the commands above. -
After you've completed all the above steps, the terminal with the consumer will show
"test message"
.
Powershell
-
Install the latest available version of Microsoft OpenJDK
-
Download and unpack the archive with binary files
Tip
Unpack the Apache Kafka® files to the disk's root directory, for example,
C:\kafka_2.12-2.6.0\
.If the path to the Apache Kafka®'s executable and batch files is too long, you'll get an error when trying to run the files:
The input line is too long
. -
Type the following command to create a consumer:
<path to the directory with Apache Kafka® files>\bin\windows\kafka-console-consumer.bat ` --bootstrap-server <your Apache Kafka® cluster connection string> ` --topic <topic name> ` --consumer-property security.protocol=SASL_SSL ` --consumer-property sasl.mechanism=SCRAM-SHA-512 ` --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
-
The following code creates a producer and pushes a text message:
echo "key:test message" | <path to the directory with Apache Kafka® files>\bin\windows\kafka-console-producer.bat ` --bootstrap-server <your Apache Kafka® cluster connection string> ` --topic <topic name> ` --property parse.key=true ` --property key.separator=":" ` --producer-property acks=all ` --producer-property security.protocol=SASL_SSL ` --producer-property sasl.mechanism=SCRAM-SHA-512 ` --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
-
If you've completed all the steps successfully, the terminal with the consumer will show
"test message"
.
-
Install the Snapd
Installation prerequisites for Linux Mint users
To allow installation of
snapd
in Linux Mint, execute the following in your terminal:rm /etc/apt/preferences.d/nosnap.pref
sudo apt install snapd
-
Install the latest version of PowerShell
sudo snap install powershell --classic
-
Download and unpack the archive with binary files
Tip
Unpack the Apache Kafka® files to your
~/home/<username>
directory.If the path to the Apache Kafka®'s executable and batch files is too long, you'll get an error when trying to run the files:
The input line is too long
. -
Start a new PowerShell session:
pwsh
-
Type the following to create a consumer:
.\<path to folder with unpacked Apache Kafka® files>\bin\kafka-console-consumer.sh ` --bootstrap-server <your Apache Kafka® cluster connection string> ` --topic <topic name> ` --consumer-property security.protocol=SASL_SSL ` --consumer-property sasl.mechanism=SCRAM-SHA-512 ` --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
-
Open a new terminal instance, start another PowerShell session, and type the following to create a producer:
echo "key:test message" | .\<path to folder with unpacked Apache Kafka® files>\bin\kafka-console-producer.sh ` --bootstrap-server <your Apache Kafka® cluster connection string> ` --topic <topic name> ` --property parse.key=true ` --property key.separator=":" ` --producer-property acks=all ` --producer-property security.protocol=SASL_SSL ` --producer-property sasl.mechanism=SCRAM-SHA-512 ` --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<username>' password='<password>';"
-
After you've completed all the above steps, the terminal with the consumer will show the following output:
test message
.
Sample connection scripts
We tested the connections in the following environments:
-
Ubuntu 20.04 LTS:
Python
-
Install the software and dependencies:
sudo apt update && sudo apt install -y python3 python3-pip && pip3 install requests
-
Install the
kafka-python
library:pip install kafka-python
-
The following code creates a consumer:
consumer.py
from kafka import KafkaConsumer consumer = KafkaConsumer( '<topic name>', bootstrap_servers='<your cluster connection string>', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<username>', sasl_plain_password='<password>') for msg in consumer: print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
-
The following code creates a producer and pushes a text message:
producer.py
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='<your cluster connection string>', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<username>', sasl_plain_password='<password>') producer.send('<topic name>', b'test message', b'key') producer.flush() producer.close()
-
Run the applications:
python3 consumer.py
python3 producer.py
-
After you've completed all the above steps, the consumer will show
"test message"
.
NodeJS
-
Install the software and dependencies:
sudo apt update && sudo apt install -y nodejs npm && \ npm install kafkajs
-
The following code creates a producer and pushes a text message:
producer.js
import { Kafka } from "kafkajs"; const topic = "<topic name>"; const message_count = 5; const kf = new Kafka({ brokers: ["<your cluster connection string>"], clientId: "doublecloud-example-client", sasl: { mechanism: "scram-sha-512", username: process.env.KAFKA_USER, password: process.env.KAFKA_PASSWORD, }, ssl: { rejectUnauthorized: true, }, }); const producer = kf.producer(); async function run() { await producer.connect(); const now = new Date(); for (let i = 0; i < message_count; i++) { let message = { key: Buffer.from(`msg-key-${now.getTime() / 1000}-${i}`), value: Buffer.from(JSON.stringify({ "origin": "kafkajs", "scheduled_at": now, })) }; producer .send({topic: topic, messages: [message]}) .then((metadata) => { const md = metadata.pop(); console.log(`[Produced message] Key: ${message.key}, Offset: ${md.baseOffset}, Partition: ${md.partition}`); }) .catch((err) => { console.log(`[Producer Error] ${err}`) }) } } run().catch((err) => { console.log("Unexpected Error: ", err); })
-
The following code creates a consumer:
consumer.js
import { Kafka } from "kafkajs"; const topic = "<topic name>"; const kf = new Kafka({ brokers: ["<your cluster connection string>"], clientId: "doublecloud-example-client", sasl: { mechanism: "scram-sha-512", username: process.env.KAFKA_USER, password: process.env.KAFKA_PASSWORD, }, ssl: { rejectUnauthorized: true, }, }); const consumer = kf.consumer({ groupId: "doublecloud-example-group.v1", }); async function run() { await consumer.connect() await consumer.subscribe({ topic: topic, fromBeginning: true, }) return consumer.run({ autoCommit: true, eachMessage: (payload) => { const message = payload.message; console.log(`[Consumed Message] k: ${message.key.toString()}, v: ${message.value.toString()}`); } }) } run().catch((err) => { console.log("Unexpected Error: ", err); })
-
In separate terminals, run applications:
node consumer.js
node producer.js
-
After you've completed all the above steps, the producer will send one or more
key:test message
and the consumer will show the output.
Go
-
Install the software and dependencies:
sudo apt update && sudo apt install -y golang git && \ go install github.com/Shopify/sarama@latest && \ go install github.com/xdg/scram@latest/ && \ go get github.com/shopify/sarama && \ go get github.com/xdg/scram
-
Create a directory for the project:
cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
-
Create the
scram.go
file with the code for running SCRAMscram.go
package scram import ( "crypto/sha256" "crypto/sha512" "github.com/xdg-go/scram" ) var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New ) type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn } func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) if err != nil { return err } x.ClientConversation = x.Client.NewConversation() return nil } func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { response, err = x.ClientConversation.Step(challenge) return } func (x *XDGSCRAMClient) Done() bool { return x.ClientConversation.Done() }
-
Copy
scram.go
to the directory of the producer application and the consumer application:cp scram.go producer/scram.go && cp scram.go consumer/scram.go
-
The following code creates a producer and pushes a text message:
producer/main.go
package main import ( "fmt" "log" "time" "github.com/Shopify/sarama" "github.com/yadc-io/examples-kafka-go/scram" ) const ( clientID = "double-cloud-example.v1" topic = "<topic name>" ) func main() { cfg := sarama.NewConfig() cfg.ClientID = clientID cfg.Net.TLS.Enable = true cfg.Net.SASL.Enable = true cfg.Net.SASL.Handshake = true cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { // See https://github.com/Shopify/sarama/blob/main/examples/sasl_scram_client/scram_client.go // This is workaround should be addressed in https://github.com/Shopify/sarama/pull/2086 return &scram.XDGSCRAMClient{HashGeneratorFcn: scram.SHA512} } cfg.Net.SASL.User = "<username>" cfg.Net.SASL.Password = "<password>" brokers := []string{"<your cluster connection string>"} cfg.Producer.Return.Successes = true p, err := sarama.NewSyncProducer(brokers, cfg) if err != nil { panic(err) } msgCount := 5 now := time.Now() for i := 0; i < msgCount; i++ { part, offset, err := p.SendMessage(&sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder(fmt.Sprintf("sarama-msg-key-%d-%d", now.Unix(), i)), Value: sarama.StringEncoder(fmt.Sprintf("sarama-msg-body-%s-%d", now, i)), Headers: nil, Metadata: nil, Offset: 0, Partition: 0, Timestamp: time.Time{}, }) if err != nil { panic(err) } log.Printf("[Published message] Partition: %d, Offset: %d", part, offset) } }
-
The following code creates a consumer:
consumer/main.go
package main import ( "context" "log" "time" "github.com/Shopify/sarama" "github.com/yadc-io/examples-kafka-go/scram" ) const ( clientID = "double-cloud-example.v1" groupID = "double-cloud-example-group.v1" topic = "<topic name>" ) func main() { cfg := sarama.NewConfig() cfg.ClientID = clientID cfg.Net.TLS.Enable = true cfg.Net.SASL.Enable = true cfg.Net.SASL.Handshake = true cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { // See https://github.com/Shopify/sarama/blob/main/examples/sasl_scram_client/scram_client.go // This is workaround should be addressed in https://github.com/Shopify/sarama/pull/2086 return &scram.XDGSCRAMClient{HashGeneratorFcn: scram.SHA512} } cfg.Net.SASL.User = "<username>" cfg.Net.SASL.Password = "<password>" brokers := []string{"<your cluster connection string>"} cfg.Consumer.Offsets.Initial = sarama.OffsetOldest c, err := sarama.NewConsumerGroup(brokers, groupID, cfg) if err != nil { panic(err) } defer c.Close() timeout := 30 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() log.Printf("Consuming messages for %0.0f seconds", timeout.Seconds()) if err = c.Consume(ctx, []string{topic}, &PrintHandler{}); err != nil { panic(err) } } type PrintHandler struct{} func (h *PrintHandler) Setup(sess sarama.ConsumerGroupSession) error { return nil } func (h *PrintHandler) Cleanup(sess sarama.ConsumerGroupSession) error { return nil } func (h *PrintHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for { select { case m := <-claim.Messages(): log.Printf("[Message consumed] Key: %s, Value: %s", m.Key, m.Value) sess.MarkMessage(m, "") case <-sess.Context().Done(): return nil } } }
-
In separate terminals, open your
consumer
andproducer
folders and run the following command to create go modules:go mod init main.go
-
After completing all the above steps, the producer will send one or more
key:test message
and the consumer will show the output.
C#
-
Install the software and dependencies:
wget https://packages.microsoft.com/config/ubuntu/20.04/packages-microsoft-prod.deb -O packages-microsoft-prod.deb && \ sudo dpkg -i packages-microsoft-prod.deb && \ sudo apt-get update && \ sudo apt-get install -y apt-transport-https dotnet-sdk-7.0
-
Create a directory for the project:
cd ~/ && mkdir cs-project && cd cs-project && mkdir -p consumer producer && cd ~/cs-project
-
Create a configuration file:
App.csproj
<Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> <OutputType>Exe</OutputType> <TargetFramework>netcoreapp7.0</TargetFramework> </PropertyGroup> <ItemGroup> <PackageReference Include="Confluent.Kafka" Version="1.4.2" /> </ItemGroup> </Project>
-
Copy
App.csproj
to the directories of the producer application and consumer application:cp App.csproj producer/App.csproj && cp App.csproj consumer/App.csproj
-
The following code creates a consumer:
cs-project/consumer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace CCloud { class Program { public static void Main(string[] args) { string HOST = "<your cluster connection string>"; string TOPIC = "<topic name>"; string USER = "<username>"; string PASS = "<password>"; var consumerConfig = new ConsumerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"sasl.mechanisms", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS}, {"group.id", "demo"} } ); var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(TOPIC); try { while (true) { var cr = consumer.Consume(); Console.WriteLine(cr.Message.Value); } } catch (OperationCanceledException) { // Ctrl-C pressed. } finally { consumer.Close(); } } } }
-
The following code creates a producer and pushes a text message:
cs-project/producer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace App { class Program { public static void Main(string[] args) { int MSG_COUNT = 5; string HOST = "<your cluster connection string>"; string TOPIC = "<topic name>"; string USER = "<producer name>"; string PASS = "<producer password>"; var producerConfig = new ProducerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"sasl.mechanisms", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS} } ); var producer = new ProducerBuilder<string, string>(producerConfig).Build(); for(int i=0; i<MSG_COUNT; i++) { producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); } }); } producer.Flush(TimeSpan.FromSeconds(10)); } } }
-
Build and launch both applications:
cd ~/cs-project/consumer && dotnet build && \ dotnet run bin/Debug/netcoreapp7.0/App.dll
cd ~/cs-project/producer && dotnet build && \ dotnet run bin/Debug/netcoreapp7.0/App.dll
-
After you've completed all the above steps, the producer will send one or more
key:test message
and the consumer will show the output.
Java
-
Install the dependencies:
sudo apt update && sudo apt install -y default-jdk maven
-
Create a folder for the Maven project:
cd ~/ && mkdir project && cd project && mkdir -p consumer/src/java/com/example producer/src/java/com/example && cd ~/project
-
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>app</artifactId> <packaging>jar</packaging> <version>0.1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency> </dependencies> <build> <finalName>${project.artifactId}-${project.version}</finalName> <sourceDirectory>src</sourceDirectory> <resources> <resource> <directory>src</directory> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <goals> <goal>attached</goal> </goals> <phase>package</phase> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.1.0</version> <configuration> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
Current versions of Maven dependencies are available to view at:
-
Copy
pom.xml
to the directories of the producer application and consumer application:cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
-
The following code creates a producer and pushes a text message:
producer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; public class App { public static void main(String[] args) { int MSG_COUNT = 5; String HOST = "<cluster connection string>"; String TOPIC = "<topic name>"; String USER = "<username>"; String PASS = "<password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String KEY = "key"; String serializer = StringSerializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put("acks", "all"); props.put("key.serializer", serializer); props.put("value.serializer", serializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 1; i <= MSG_COUNT; i++){ producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get(); System.out.println("Test message " + i); } producer.flush(); producer.close(); } catch (Exception ex) { System.out.println(ex); producer.close(); } } }
-
The following code creates a consumer:
consumer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.*; public class App { public static void main(String[] args) { String HOST = "<cluster connection string>"; String TOPIC = "<topic name>"; String USER = "<username>"; String PASS = "<password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String GROUP = "demo"; String deserializer = StringDeserializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("group.id", GROUP); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(new String[] {TOPIC})); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ":" + record.value()); } } } }
-
Build applications:
cd ~/project/producer && mvn clean package && \ cd ~/project/consumer && mvn clean package
-
Run applications:
java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
-
After you've completed all the above steps, the producer will send one or more
key:test message
and the consumer will show the output.
Connect with public and private connection strings
When you connect to a cluster via a peering connection from VPC, you need to use a private address instead of the normally used public address.
To obtain a cluster's private connection string, go to the cluster overview page. Under Connection strings, switch to the Private tab:
You can also connect to a certain host on your cluster. The structures of a cluster and a host connection string differ as follows:
-
Public address:
rw.<cluster_id>.at.double.cloud # or <host_name>.<cluster_id>.at.double.cloud
-
Private address:
rw.<cluster_id>.private.at.double.cloud # or <host_name>.<cluster_id>.private.at.double.cloud