Apache Kafka® connector
You can use this connector both for source and target endpoints.
Source endpoint
-
Under Connection settings, select the Connection type:
Managed cluster-
Select your Managed cluster from the drop-down list.
-
Select the Authentication method:
-
To verify your identity with SASL, click + SASL, and specify the credentials:
-
Your User Name,
-
Password for this user name,
-
-
Set this property to No Authentication if you don't need authentication.
-
On-premise-
Under Broker URLs, click + URL to add brokers.
Enter IP address or domain names (FQDN) of broker hosts.
Tip
If the Apache Kafka® port number isn't standard, separate it with a colon as follows:
<Broker host IP of FQDN>:<port number>
-
Click SSL if you want to encrypt your connection.
-
Add the PEM Certificate. Click Choose a file to upload a certificate file (public key) in PEM format or provide it as text.
-
Select the Authentication method:
-
To verify your identity with SASL, specify the following options:
-
Your User Name,
-
Password for this user name,
-
Encryption Mechanism.
-
-
Set this property to No Authentication if you don't need authentication.
-
-
-
Specify the Topic full name.
-
Configure Advanced settings → Conversion rules:
-
Click + Conversion rules.
-
Select the Data format. Currently, we support the
JSON
format. -
Choose the Data scheme:
Field list-
Under Field list → Field click Add Field and specify the field properties:
-
The name of the field.
-
Select the field type.
-
(optional) Check Key to make the field a table sorting key.
-
(optional) Check the Required box to make the field obligatory.
-
Provide the Path to redefine the names of columns in the table following
library.shelf[a].book[b].title`
JSON SpecClick Choose a file to provide a file with schema description in JSON format. The schema should look as follows:
[ { "name": "remote_addr", "type": "string" }, { "name": "remote_user", "type": "string" }, { "name": "time_local", "type": "string" }, { "name": "request", "type": "string" }, { "name": "status", "type": "int32" }, { "name": "bytes_sent", "type": "int32" }, { "name": "http_referer", "type": "string" }, { "name": "http_user_agent", "type": "string" } ]
-
-
Check the Add a column for missing keys box if you need to collect keys missing from the scheme.
-
Check Enable null values in keys if needed.
-
For an example of using this endpoint in a real scenario, see How to combine Apache Kafka® and ClickHouse® to create data streams and visualizations.
To create an Apache Kafka® source endpoint with API, use the endpoint.KafkaSource model.
Target endpoint
-
Under Connection settings, select the Connection type:
Managed cluster-
Select your Managed Apache Kafka® cluster from the drop-down list.
-
Select the Authentication method:
-
To verify your identity with SASL, click + SASL, and specify the credentials:
-
Your User Name,
-
Password for this user name.
-
-
Set this property to No Authentication if you don't need to provide credentials.
-
On-premise cluster-
Provide Broker URLs:
-
Click Broker URLs → + URL to add brokers.
Enter the IP addresses or domain names (FQDN) of broker hosts.
Tip
If the Apache Kafka® port number isn't standard, separate it with a colon as follows:
<Broker host IP of FQDN>:<port number>
-
Click SSL if you want to encrypt your connection.
-
Add the PEM Certificate. Click Choose a file to upload a certificate file (public key) in PEM format or paste it as text.
-
Select the Authentication method:
-
To verify your identity with SASL, specify the following options:
-
Your User Name,
-
Password for this user name,
-
Encryption Mechanism.
-
-
Set this property to No Authentication if you don't need to provide credentials.
-
-
-
Specify the Apache Kafka topic settings:
Topic full name- Specify the Topic full name in the target cluster as a full path.
Topic prefix- Specify the Topic prefix in the target cluster. The format is the following:
topic_prefix.schema.table_name
, it's similar to the Debezium
To parse multiple topic names in the above sections, you can use regular expressions:
Collection of regular expression patterns to parse table namesPattern Description Example abc
An explicit series of characters test
returns the table names containingtest
..
A character wildcard. Use it to match an expression with defined character positions. t..t
returnstest
,tent
,tart
etc.\
An escape character. Use it to match special characters. \_
returns the table names containing an underscore.?
Use it to express that a character (or a group of characters) is optional. c?.n
returnscan
,con
,in
,on
,en
, etc.+
Use it to express that a character (or a group of characters) can appear one and more times. -+
returns the table names containing-
,--
,---
etc.{n}
Use it to express that a character (or a group of characters) must appear explicitly n
times-{2}
returns the table names containing--
.{n,m}
Use it to express that a character (or a group of characters) must appear between n
andm
times._{1,3}
returns the table names containing_
,__
and___
.\w
An alphanumeric wildcard. Use it to match any alphanumeric characters. The match pattern is case-sensitive. \w+
returns the table names containing letters and/or digits.\W
A non-alphanumeric wildcard. Use it to match any non-alphanumeric character. The match pattern is case-sensitive. \W+
returns the table names containing characters other than letters or digits.\d
A digit wildcard. Use it to match any digit characters. The match pattern is case-sensitive. \d+
returns the table names containing digits.\D
A non-digit wildcard. Use it to match any non-digit characters. The match pattern is case-sensitive. \D+
returns the table names containing any characters other than digits.$
Use it to match the position after the table name's last character. metrics$
returns the table names ending withmetrics
.^
Use it to match the position before the table name's first character. This position is useful to define database names. For example, ^monthly_sales
returns all the tables from themonthly_sales
database. -
Configure Advanced settings:
-
Check the Save tx order box if you want to write database transactions in the same order.
When you enable this setting, the service writes all tables from the source database into a single partition
In the default mode, when this setting is disabled, the service splits the data you transfer by table names. Each table goes to a separate partition.
Warning
This setting applies only if a transfer meets both conditions below:
-
The source endpoint is PostgreSQL or MySQL.
-
You set
Topic prefix
in Apache Kafka topic settings.
-
-
To create a Apache Kafka® target endpoint with API, use the endpoint.KafkaTarget model.