Apache Kafka® connectors

Connector is a component of Kafka Connect used to perform continuous migration between Apache Kafka® and other systems, such as:

  • Databases
  • Cloud services
  • Search indexes
  • File systems
  • Key-value stores.

You can use connectors to perform continuous data migration to and from the DoubleCloud Managed Service for Apache Kafka® clusters.

Connectors perform the following functions:

  • Connecting Managed Service for Apache Kafka® clusters to other data stores.

  • Reading and writing data.

  • Coordinating data streaming.

For each connector, Kafka Connect starts a worker thread to perform the following duties:

  • Managing the task configuration.

    This includes performing validation, configuring each task, and reconfiguring upon request.

  • Handling the global state.

    Kafka Connect runtime ensures a single instance of the worker thread is available to perform the required actions. For example, this may be a task to constantly list topics on the source.

You can specify a task limit for any connector type to determine the number of workers to run in parallel and other connector-specific properties.

Supported connectors

DoubleCloud Managed Service for Apache Kafka® supports the following connector types:

Name

Description

Further information

MirrorMaker

This connector type replicates Apache Kafka® topics between clusters.

A worker accesses the Managed Service for Apache Kafka® cluster's broker hosts and replicates the topics specified in the connector filtering template. Depending on the connector's replication factor, it connects to one or more brokers.

Topic names in the target cluster are the same as in the source.

Geo-replication in Apache Kafka®

S3 Sink

This connector periodically requests data from Apache Kafka® and uploads it to a specified Amazon S3 storage.

For transmission, the connector splits data into chunks, S3 file objects. The size of each data chunk is determined by the maximum number of records it contains.

S3 connector GitHub repository

Connector parameters

Under ConnectorSpecConnectorConfigMirrorMakerSpec

  • topics: A template for selecting topics to replicate. Separate listed topic names with , or |. You can use the .* expression, for example, analysis.*. To migrate all topics, specify .*.

  • source_cluster - parameters for connecting to the source cluster, under doublecloud.kafka.v1.ClusterConnectionSpec:

    • alias: A prefix to indicate the source cluster in the connector settings.

      Note

      Topics in the target cluster are created with the indicated prefix.

    • this_cluster: Select this option to use the current cluster as a source, under ExternalClusterConnectionSpec:

      • bootstrap_services: A comma-separated list of the FQDNs of the source cluster's broker hosts with the port numbers to connect to. For example, broker1.example.com:9091,broker2.example.com.

      • sasl_username: The username for connecting the connector to the source cluster.

      • sasl_password: The user password for connecting the connector to the source cluster.

      • sasl_mechanism: Select a mechanism for name and password encryption.

      • security_protocol: Select a protocol for connecting the connector:

        • PLAINTEXT, SASL_PLAINTEXT - for non-SSL connections.
        • SSL, SASL_SSL - for SSL connections.
      • ssl_truststore_certificates: Provide a PEM certificate to access the external cluster, separate lines of the certificate with the \n.

  • target_cluster, specify the parameters for connecting to the target cluster:

    • external_cluster: Specify the cluster to use as a target, under ExternalClusterConnectionSpec:

      • bootstrap_services: A comma-separated list of the FQDNs of the source cluster's broker hosts with the port numbers to connect to. For example, broker1.example.com:9091,broker2.example.com.

      • sasl_username: The username for connecting the connector to the source cluster.

      • sasl_password: The user password for connecting the connector to the source cluster.

      • sasl_mechanism: Select a mechanism for name and password encryption.

      • security_protocol: Select a protocol for connecting the connector:

        • PLAINTEXT, SASL_PLAINTEXT: For non-SSL connections.
        • SSL, SASL_SSL: For SSL connections.
    • ssl_truststore_certificates: Provide a PEM certificate to access the external cluster, separate lines of the certificate with the \n.

  • replication_factor — the number of topic copies stored in the cluster.

For the list of common connector settings, see the official Apache Kafka® documentation .

Under ConnectorSpecConnectorConfigMirrorMakerSpec

  • topics: Template for selecting topics to replicate. Separate listed topics with , or |. You can use the .* expression, for example analysis.*. To migrate all topics, specify .*.

  • file_compression_type: Select the codec to compress messages:

    • none (default): No compression.
    • gzip: The gzip codec.
    • snappy: The snappy codec.
    • zstd: The zstd codec.

    You can't change this parameter after creating the cluster.

  • (Optional) file_max_records: Maximum number of records to write to a single file in S3-compatible storage.

  • s3_connection: specify the storage connection parameters, under doublecloud.kafka.v1.S3ConnectionSpec

    • bucket_name: Storage bucket name.
    • endpoint: Endpoint for storage access (get information on the endpoint from your storage provider).
    • (Optional) region: specify the endpoint's AWS region (us-east-1 by default). See the complete list of available regions in the AWS documentation .
    • (Optional) access_key_id, secret_access_key: provide the credentials for access as strings if your endpoint requires authorization.

    For the list of all connector settings, see the connector documentation .

    For the list of most common connector settings, see the official Apache Kafka® documentation .

Connector statuses

You can see a status indicator on the Connectors tab on your Apache Kafka® cluster's information page.

Status

Description

RUNNING

Connector is operating normally.

PAUSED

Connector is intact, paused by the user or the Managed Service for Apache Kafka®. You can resume the connector at any moment.

ERROR

Connector has encountered a problem and can't operate. To resolve the issue, write a technical support request:

  • Specify the cluster ID.
  • List the last operations performed on the cluster.

INVALID

COnnector state is unknown. o resolve the issue, write a technical support request:

  • Specify the cluster ID.
  • List the last operations performed on the cluster.

See also