Use Transfer API

Transfer is a tool that allows you to copy or replicate data between databases and stream processing services by creating endpoints and configuring transfers between them.

This tutorial transfers a CSV file from an Amazon S3 bucket to a Managed ClickHouse® cluster.

Before you start

  1. Create a service account:

    1. Go to the Service accounts tab of the Members page in the console. You'll see the following dialog:

    create-service-account

    1. Name your service account.

    2. From the drop-down menu, select the Admin user role - we will need both read and write access.

    3. Click Submit. You'll see your new service account appear on the list.

  2. Issue an API key for your service account:

    1. Go to the Service accounts tab of the Members page in the console.

    2. Open the information page of the service account for which you want to create an API key.

    3. Under API keys, click Create key to create you account's first Secret key. You'll see the following dialog:

      isecret-key-dialog

    4. Click Download file with keys. You'll use it to authenticate API requests.

  3. Install the DoubleCloud API Python SDK.

Prepare a Managed ClickHouse® cluster

Managed ClickHouse® clusters are one or more database hosts.

First of all, we need to create a cluster and prepare it for data upload.

Create a Managed ClickHouse® cluster

To create a ClickHouse® cluster, use the ClusterService create method with the following parameters:

  • project_id - the ID of your project. You can get this value on your project's information page.

  • cloud_type - aws.

  • region_id - for the purpose of this quickstart, let's specify eu-central-1.

  • name - clickhouse-dev.

  • resources - specify the following from the doublecloud.clickhouse.v1.ClusterResources model:

    • resource_preset_id - s2-c2-m4.

    • disk_size - let's go with the default 34359738368 bytes (32 GB).

    • replica_count - 1.

    • shard_count - 1.

import json
import logging

from google.protobuf.wrappers_pb2 import Int64Value

import doublecloud
from doublecloud.clickhouse.v1.cluster_pb2 import ClusterResources
from doublecloud.clickhouse.v1.cluster_service_pb2 import (
    CreateClusterRequest,
    DeleteClusterRequest,
)
from doublecloud.clickhouse.v1.cluster_service_pb2_grpc import ClusterServiceStub

def create_cluster(sdk, project_id, region_id, name, network_id):
    cluster_service = sdk.client(ClusterServiceStub)
    operation = cluster_service.Create(
        CreateClusterRequest(
            project_id=project_id,
            cloud_type="aws",
            region_id="eu-central-1",
            name="clickhouse-dev",
            resources=ClusterResources(
                clickhouse=ClusterResources.Clickhouse(
                    resource_preset_id="s2-c2-m4",
                    disk_size=Int64Value(value=34359738368),
                    replica_count=Int64Value(value=1),
                    shard_count=Int64Value(value=1),
                )
            ),
            network_id=network_id,
        )
    )
    logging.info("Creating initiated")
    return operation

Connect to the cluster and create a database

Tip

This tutorial shows how to use a CLI client with Docker and with clickhouse-client on DEB-based and RPM-based Linux distributions. You can also use other tools of your choice.

For more connection options, see Connect to a ClickHouse® database.

  1. Install the ClickHouse® client.

Now you have a CLI tool to manage your Managed ClickHouse® cluster. Let's connect to it:

  1. Go to the Clusters page in the console.

  2. Select the name of your cluster to open its information page. By default, you'll see the Overview tab.

  3. Under Connection strings, find the Native interface string and click Copy.

  4. Connect to your ClickHouse® cluster:

    docker run --network host --rm -it clickhouse/<Native interface connection string>
    
    The complete Docker command structure
    docker run --network host --rm -it \ 
                clickhouse/clickhouse-client \
                --host <FQDN of your cluster> \
                --secure \
                --user <cluster user name> \
                --password <cluster user password> \
                --port 9440 
    
    <Native interface connection string>
    
  5. After you have connected to the cluster, run the CREATE query:

    CREATE DATABASE IF NOT EXISTS db_for_s3
    
  6. Send the following query to make sure that your database is successfully created:

    SHOW DATABASES
    

    The terminal readout should be the following:

    ┌─name───────────────┐
    │ INFORMATION_SCHEMA │
    │ _system            │
    │ db_for_s3          │
    │ default            │
    │ information_schema │
    │ system             │
    └────────────────────┘
    
  7. Keep this terminal opened - we'll need it later.

At this stage, go to the console and configure the Transfer.

Transfer data from S3

Now it's time to set up the tools that will get the data from a remote source and transfer it to your db_for_s3 ClickHouse® database.

Create a source endpoint

To create a source endpoint, let's use the EndpointService create method and pass the following parameters:

  • project_id - The ID of the project in which you want to create your endpoint. You can get this value on your project's information page.

  • name - the name of your new source endpoint, first-s3-endpoint.

  • settings - the type of source endpoint. For this quickstart, use the s3_source, as reflected in the S3Source endpoint model:

    • dataset - hits, this is the name of our dataset at the S3 source.

    • path_pattern - data-sets/hits_sample.csv, this is the path to our dataset at the S3 source.

    • schema - for this quickstart we don't need to define a schema, use the default value {}.

    • format - csv, this is the dataset's format. We need to pass an additional parameter for this data type:

    • provider - bucket with the value doublecloud-docs. As the source bucket is public, you don't need to provide any other parameters from the doublecloud.transfer.v1.endpoint.airbyte.S3Source.Provider model.

import argparse
import json
import logging

import doublecloud

from doublecloud.transfer.v1.endpoint.airbyte.s3_source_pb2 import S3Source
from doublecloud.transfer.v1.endpoint_service_pb2 import CreateEndpointRequest
from doublecloud.transfer.v1.endpoint_service_pb2_grpc import EndpointServiceStub

def create_s3_src_endpoint(sdk, project_id, name):
    svc = sdk.client(EndpointServiceStub)
    operation = svc.Create(
        CreateEndpointRequest(
            project_id=project_id,
            name="first-s3-endpoint",
            settings=EndpointSettings(
                s3_source=S3Source(
                    dataset="hits",
                    path_pattern="data-sets/hits_sample.csv",
                    schema="{}",
                    format=S3Source.Format(csv=S3Source.Csv()),
                    provider=S3Source.Provider(bucket="doublecloud-docs"),
                )
            ),
        )
    )
    return operation

The data source endpoint is ready to go. Now we need to create an endpoint that will receive the data from a remote source.

Create a target endpoint

This is your receiver. It will acquire the data sent by the source endpoint and write it to the database on your Managed ClickHouse® cluster.

To create a target endpoint, let's use the EndpointService create method and pass the following parameters:

  • project_id - The ID of the project in which you want to create your endpoint. You can get this value on your project's information page.

  • name - the name of your new target endpoint, clickhouse-target-endpoint.

  • settings - the type of target endpoint. For this quickstart, use the clickhouse_target, as reflected in the clickhouse_target endpoint model:

    • connection.connection_options - your endpoint's connection settings:

      • address - mdb_cluster_id in this quickstart, we'll connect to a managed ClickHouse® cluster, so we need to pass the cluster ID value.

        To get your cluster's ID, get a list of clusters in the project.

      • database - db_for_s3, this is your ClickHouse® database.

      • user - admin, your ClickHouse® cluster username.

      • password.value.raw - your ClickHouse® cluster password.

  • clickhouse_cluster_name - clickhouse-dev, this is your cluster's name.

  • cleanup_policy - CLICKHOUSE_CLEANUP_POLICY_DROP.

import json
import logging

import doublecloud

from doublecloud.transfer.v1.endpoint.clickhouse_pb2 import (
    ClickhouseConnection,
    ClickhouseConnectionOptions,
    ClickhouseTarget,
)
from doublecloud.transfer.v1.endpoint.common_pb2 import Secret
from doublecloud.transfer.v1.endpoint_service_pb2 import CreateEndpointRequest
from doublecloud.transfer.v1.endpoint_service_pb2_grpc import EndpointServiceStub

def create_ch_dst_endpoint(sdk, project_id, name):
    svc = sdk.client(EndpointServiceStub)
    operation = svc.Create(
        CreateEndpointRequest(
            project_id=project_id,
            name="clickhouse-target-endpoint",
            settings=EndpointSettings(
                clickhouse_target=ClickhouseTarget(
                    connection=ClickhouseConnection(
                        connection_options=ClickhouseConnectionOptions(
                            mdb_cluster_id="<your_cluster_id>",
                            database="db_for_s3",
                            user="admin",
                            password=Secret(raw="<cluster_password>"),
                        )
                    )
                )
            ),
        )
    )
    return operation

Good work. Now we've created an endpoint that will receive and write the data to your ClickHouse® database. All we need now is the service that will connect both endpoints and transfer the data.

Create a transfer

This is the service that activates the transfer process through the data pipeline. It will connect your endpoints and ensure the integrity of the data.

  1. Let's create a transfer using the TrasnferService create method with the following parameters:

    • source_id - the endpoint ID for the source endpoint.

      To find the endpoint ID, get a list of endpoints in the project.

    • target_id - the endpoint ID for the target endpoint.

    • name - the transfer name, transfer-quickstart.

    • project_id - the ID of the project in which you create a transfer. You can get this value on your project's information page.

    • type - SNAPSHOT_ONLY.

import json
import logging

import doublecloud

from doublecloud.transfer.v1.transfer_pb2 import TransferType
from doublecloud.transfer.v1.transfer_service_pb2 import CreateTransferRequest
from doublecloud.transfer.v1.transfer_service_pb2_grpc import TransferServiceStub

def create_transfer(sdk, project_id, name, src_id, dst_id):
    svc = sdk.client(TransferServiceStub)
    operation = svc.Create(
        CreateTransferRequest(
            source_id=<your_source_endpoint_id>, 
            target_id=<your_target_endpoint_id>, 
            name="transfer-quickstart", 
            project_id=<your_project_id>, 
            type=TransferType.SNAPSHOT_ONLY
        )
    )
    return operation

Activate a transfer

Now, let's activate it using the TransferService activate method and pass the transfer ID in the transfer_id request parameter.

To find the transfer ID, get a list of transfers in the project.

import doublecloud

from doublecloud.transfer.v1.transfer_pb2 import TransferType
from doublecloud.transfer.v1.transfer_service_pb2 import ActivateTransferRequest
from doublecloud.transfer.v1.transfer_service_pb2_grpc import TransferServiceStub

def activate_transfer(svc, transfer_id: str):
    return svc.Activate(ActivateTransferRequest(transfer_id=<your_transfer_id>))

Query the data in the Managed ClickHouse® cluster

Check the data transferred to your ClickHouse® database:

  1. Open the terminal you used to create the database or connect to your cluster once again as we did above:

    docker run --network host --rm -it clickhouse/<Native interface connection string>
    
    The complete Docker command structure
    docker run --network host --rm -it \ 
                clickhouse/clickhouse-client \
                --host <FQDN of your cluster> \
                --secure \
                --user <cluster user name> \
                --password <cluster user password> \
                --port 9440 
    
    <Native interface connection string>
    
  2. Send the following query to check if your data exists in the cluster. The name of a table in the db_for_s3 corresponds to the name of the source dataset (hits).

    SELECT * FROM db_for_s3.hits
    

    The terminal readout should display the following data:

    ┌─Browser─┬─Cookie_Enabled─┬─Date───────┬─Gender─┬─Hit_ID─┬─Region_ID─┬─Technology───────────┬─Time_Spent─────────┬─Traffic_Source─┐
    │ Firefox │              0 │ 2016-01-10 │ Male   │  67112 │       184 │ PC (Mac)             │ 388.93975903614455 │ Direct         │
    │ Chrome  │              0 │ 2016-03-12 │ Female │  54562 │        51 │ Smartphone (I0S)     │ 325.20392156862744 │ Search engine  │ 
    │ Chrome  │              1 │ 2016-03-18 │ Female │  63833 │        51 │ Smartphone (I0S)     │ 316.09774436090225 │ Search engine  │ 
    │ Firefox │              1 │ 2016-03-24 │ Male   │  43941 │        51 │ PC (Windows)         │  263.7365269461078 │ Search engine  │ 
    │ Safari  │              0 │ 2016-03-30 │ Female │  38583 │        51 │ Smartphone (Android) │  363.8421052631579 │ Internal       │
    

Nice work! You have all the data transferred from a remote source and replicated with complete integrity in your own ClickHouse® database. Now let's make this data earn its worth.

Download the complete code example

You can download the full code listing for all the steps above from our Python SDK GitHub repository .

See also