The Apache Kafka® connector

You can use this connector both for source and target endpoints.

Source endpoint

  1. Under Connection settings, select the Connection type:

    Managed cluster
    1. Select your Managed cluster from the drop-down list.

    2. Select the Authentication method:

      • To verify your identity with SASL, click + SASL, and specify the credentials:

        • Your User Name,

        • Password for this user name,

      • Set this property to No Authentication if you don't need authentication.

    On-premise
    1. Under Broker URLs, click + URL to add brokers.

      Enter IP address or domain names (FQDN) of broker hosts.

      Tip

      If the Apache Kafka® port number isn't standard, separate it with a colon as follows:

      <Broker host IP of FQDN>:<port number>

    2. Click SSL if you want to encrypt your connection.

    3. Add the PEM Certificate. Click Choose a file to upload a certificate file (public key) in PEM format or provide it as text.

    4. Select the Authentication method:

      1. To verify your identity with SASL, specify the following options:

        • Your User Name,

        • Password for this user name,

        • Encryption Mechanism.

      2. Set this property to No Authentication if you don't need authentication.

  2. Specify the Topic full name.

  3. Configure Advanced settingsConversion rules:

    1. Click + Conversion rules.

    2. Select the Data format. Currently, we support the JSON format.

    3. Choose the Data scheme:

      Field list
      1. Under Field listField click Add Field and specify the field properties:

      2. The name of the field.

      3. Select the field type.

      4. (optional) Check Key to make the field a table sorting key.

      5. (optional) Check the Required box to make the field obligatory.

      6. Provide the Path to redefine the names of columns in the table following

        library.shelf[a].book[b].title`
        
      JSON Spec

      Click Choose a file to provide a file with schema description in JSON format. The schema should look as follows:

      [
         {
            "name": "remote_addr",
            "type": "string"
         },
         {
            "name": "remote_user",
            "type": "string"
         },
         {
            "name": "time_local",
            "type": "string"
         },
         {
            "name": "request",
            "type": "string"
         },
         {
            "name": "status",
            "type": "int32"
         },
         {
            "name": "bytes_sent",
            "type": "int32"
         },
         {
            "name": "http_referer",
            "type": "string"
         },
         {
            "name": "http_user_agent",
            "type": "string"
         }
      ]
      
    4. Check the Add a column for missing keys box if you need to collect keys missing from the scheme.

    5. Check Enable null values in keys if needed.

For an example of using this endpoint in a real scenario, see How to combine Apache Kafka® and ClickHouse® to create data streams and visualizations.

To create an Apache Kafka® source endpoint with API, use the endpoint.KafkaSource model.

Target endpoint

  1. Under Connection settings, select the Connection type:

    Managed cluster
    1. Select your Managed Apache Kafka® cluster from the drop-down list.

    2. Select the Authentication method:

      • To verify your identity with SASL, click + SASL, and specify the credentials:

        • Your User Name,

        • Password for this user name.

      • Set this property to No Authentication if you don't need to provide credentials.

    On-premise cluster
    1. Provide Broker URLs:

    2. Click Broker URLs+ URL to add brokers.

      Enter the IP addresses or domain names (FQDN) of broker hosts.

      Tip

      If the Apache Kafka® port number isn't standard, separate it with a colon as follows:

      <Broker host IP of FQDN>:<port number>
      
    3. Click SSL if you want to encrypt your connection.

    4. Add the PEM Certificate. Click Choose a file to upload a certificate file (public key) in PEM format or paste it as text.

    5. Select the Authentication method:

      1. To verify your identity with SASL, specify the following options:

        • Your User Name,

        • Password for this user name,

        • Encryption Mechanism.

      2. Set this property to No Authentication if you don't need to provide credentials.

  2. Specify the Apache Kafka topic settings:

    Topic full name
    1. Specify the Topic full name in the target cluster as a full path.
    Topic prefix
    1. Specify the Topic prefix in the target cluster. The format is the following: topic_prefix.schema.table_name, it's similar to the Debezium settings.

    To parse multiple topic names in the above sections, you can use regular expressions:

    Collection of regular expression patterns to parse table names
    Pattern Description Example
    abc An explicit series of characters test returns the table names containing test.
    . A character wildcard. Use it to match an expression with defined character positions. t..t returns test, tent, tart etc.
    \ An escape character. Use it to match special characters. \_ returns the table names containing an underscore.
    ? Use it to express that a character (or a group of characters) is optional. c?.n returns can, con, in, on, en, etc.
    + Use it to express that a character (or a group of characters) can appear one and more times. -+ returns the table names containing -, --, --- etc.
    {n} Use it to express that a character (or a group of characters) must appear explicitly n times -{2} returns the table names containing --.
    {n,m} Use it to express that a character (or a group of characters) must appear between n and m times. _{1,3} returns the table names containing _, __ and ___.
    \w An alphanumeric wildcard. Use it to match any alphanumeric characters. Please note that the match pattern is case-sensitive. \w+ returns the table names containing letters and/or digits.
    \W A non-alphanumeric wildcard. Use it to match any non-alphanumeric character. Please note that the match pattern is case-sensitive. \W+ returns the table names containing characters other than letters or digits.
    \d A digit wildcard. Use it to match any digit characters. Please note that the match pattern is case-sensitive. \d+ returns the table names containing digits.
    \D A non-digit wildcard. Use it to match any non-digit characters. Please note that the match pattern is case-sensitive. \D+ returns the table names containing any characters other than digits.
    $ Use it to match the position after the table name's last character. metrics$ returns the table names ending with metrics.
    ^ Use it to match the position before the table name's first character. This position is useful to define database names. For example, ^monthly_sales returns all the tables from the monthly_sales database.
  3. Configure Advanced settings:

    1. Check the Save tx order box if you want to write database transactions in the same order.

      When you enable this setting, the service writes all tables from the source database into a single partition . The tables order is preserved.

      In the default mode, when this setting is disabled, the service splits the data you transfer by table names. Each table goes to a separate partition.

      Warning

      This setting applies only if a transfer meets both conditions below:

      • The source endpoint is PostgreSQL or MySQL.

      • You set Topic prefix in Apache Kafka topic settings.

To create a Apache Kafka® target endpoint with API, use the endpoint.KafkaTarget model.

See also