Use Transfer API
Transfer is a tool that allows you to copy or replicate data between databases and stream processing services by creating endpoints and configuring transfers between them.
This tutorial transfers a CSV file from an Amazon S3 bucket to a Managed ClickHouse® cluster.
Before you start
-
Create a service account:
- Go to the Service accounts tab of the Members page in the console. You'll see the following dialog:
-
Name your service account.
-
From the drop-down menu, select the Admin user role - we will need both read and write access.
-
Click Submit. You'll see your new service account appear on the list.
-
Issue an API key for your service account:
-
Go to the Service accounts tab of the Members page in the console.
-
Open the information page of the service account for which you want to create an API key.
-
Under API keys, click Create key to create you account's first Secret key. You'll see the following dialog:
-
Click Download file with keys. You'll use it to authenticate API requests.
-
Prepare a Managed ClickHouse® cluster
Managed ClickHouse® clusters are one or more database hosts.
First of all, we need to create a cluster and prepare it for data upload.
Create a Managed ClickHouse® cluster
To create a ClickHouse® cluster, use the ClusterService
create method with the following parameters:
-
project_id
- the ID of your project. You can get this value on your project's information page. -
cloud_type
-aws
. -
region_id
- for the purpose of this quickstart, let's specifyeu-central-1
. -
name
-clickhouse-dev
. -
resources
- specify the following from the doublecloud.clickhouse.v1.ClusterResources model:-
resource_preset_id
-s2-c2-m4
. -
disk_size
- let's go with the default34359738368
bytes (32 GB). -
replica_count
-1
. -
shard_count
-1
.
-
import json
import logging
from google.protobuf.wrappers_pb2 import Int64Value
import doublecloud
from doublecloud.clickhouse.v1.cluster_pb2 import ClusterResources
from doublecloud.clickhouse.v1.cluster_service_pb2 import (
CreateClusterRequest,
DeleteClusterRequest,
)
from doublecloud.clickhouse.v1.cluster_service_pb2_grpc import ClusterServiceStub
def create_cluster(sdk, project_id, region_id, name, network_id):
cluster_service = sdk.client(ClusterServiceStub)
operation = cluster_service.Create(
CreateClusterRequest(
project_id=project_id,
cloud_type="aws",
region_id="eu-central-1",
name="clickhouse-dev",
resources=ClusterResources(
clickhouse=ClusterResources.Clickhouse(
resource_preset_id="s2-c2-m4",
disk_size=Int64Value(value=34359738368),
replica_count=Int64Value(value=1),
shard_count=Int64Value(value=1),
)
),
network_id=network_id,
)
)
logging.info("Creating initiated")
return operation
Connect to the cluster and create a database
Tip
This tutorial shows how to use a CLI client with Docker
For more connection options, see Connect to a ClickHouse® database.
- Install the ClickHouse® client.
Now you have a CLI tool to manage your Managed ClickHouse® cluster. Let's connect to it:
-
Go to the Clusters
-
Select the name of your cluster to open its information page. By default, you'll see the Overview tab.
-
Under Connection strings, find the Native interface string and click Copy.
-
Connect to your ClickHouse® cluster:
DockerNative clickhouse-clientdocker run --network host --rm -it clickhouse/<Native interface connection string>
The complete Docker command structure
docker run --network host --rm -it \ clickhouse/clickhouse-client \ --host <FQDN of your cluster> \ --secure \ --user <cluster user name> \ --password <cluster user password> \ --port 9440
<Native interface connection string>
-
After you have connected to the cluster, run the CREATE query:
CREATE DATABASE IF NOT EXISTS db_for_s3
-
Send the following query to make sure that your database is successfully created:
SHOW DATABASES
The terminal readout should be the following:
┌─name───────────────┐ │ INFORMATION_SCHEMA │ │ _system │ │ db_for_s3 │ │ default │ │ information_schema │ │ system │ └────────────────────┘
-
Keep this terminal opened - we'll need it later.
At this stage, go to the console and configure the Transfer.
Transfer data from S3
Now it's time to set up the tools that will get the data from a remote source and transfer it to your db_for_s3
ClickHouse® database.
Create a source endpoint
To create a source endpoint, let's use the EndpointService
create method and pass the following parameters:
-
project_id
- The ID of the project in which you want to create your endpoint. You can get this value on your project's information page. -
name
- the name of your new source endpoint,first-s3-endpoint
. -
settings
- the type of source endpoint. For this quickstart, use thes3_source
, as reflected in the S3Source endpoint model:-
dataset
-hits
, this is the name of our dataset at the S3 source. -
path_pattern
-data-sets/hits_sample.csv
, this is the path to our dataset at the S3 source. -
schema
- for this quickstart we don't need to define a schema, use the default value{}
. -
format
-csv
, this is the dataset's format. We need to pass an additional parameter for this data type: -
provider
-bucket
with the valuedoublecloud-docs
. As the source bucket is public, you don't need to provide any other parameters from the doublecloud.transfer.v1.endpoint.airbyte.S3Source.Provider model.
-
import argparse
import json
import logging
import doublecloud
from doublecloud.transfer.v1.endpoint.airbyte.s3_source_pb2 import S3Source
from doublecloud.transfer.v1.endpoint_service_pb2 import CreateEndpointRequest
from doublecloud.transfer.v1.endpoint_service_pb2_grpc import EndpointServiceStub
def create_s3_src_endpoint(sdk, project_id, name):
svc = sdk.client(EndpointServiceStub)
operation = svc.Create(
CreateEndpointRequest(
project_id=project_id,
name="first-s3-endpoint",
settings=EndpointSettings(
s3_source=S3Source(
dataset="hits",
path_pattern="data-sets/hits_sample.csv",
schema="{}",
format=S3Source.Format(csv=S3Source.Csv()),
provider=S3Source.Provider(bucket="doublecloud-docs"),
)
),
)
)
return operation
The data source endpoint is ready to go. Now we need to create an endpoint that will receive the data from a remote source.
Create a target endpoint
This is your receiver. It will acquire the data sent by the source endpoint and write it to the database on your Managed ClickHouse® cluster.
To create a target endpoint, let's use the EndpointService
create method and pass the following parameters:
-
project_id
- The ID of the project in which you want to create your endpoint. You can get this value on your project's information page. -
name
- the name of your new target endpoint,clickhouse-target-endpoint
. -
settings
- the type of target endpoint. For this quickstart, use theclickhouse_target
, as reflected in the clickhouse_target endpoint model:-
connection.connection_options
- your endpoint's connection settings:-
address
-mdb_cluster_id
in this quickstart, we'll connect to a managed ClickHouse® cluster, so we need to pass the cluster ID value.To get your cluster's ID, get a list of clusters in the project.
-
database
-db_for_s3
, this is your ClickHouse® database. -
user
-admin
, your ClickHouse® cluster username. -
password.value.raw
- your ClickHouse® cluster password.
-
-
-
clickhouse_cluster_name
-clickhouse-dev
, this is your cluster's name. -
cleanup_policy
-CLICKHOUSE_CLEANUP_POLICY_DROP
.
import json
import logging
import doublecloud
from doublecloud.transfer.v1.endpoint.clickhouse_pb2 import (
ClickhouseConnection,
ClickhouseConnectionOptions,
ClickhouseTarget,
)
from doublecloud.transfer.v1.endpoint.common_pb2 import Secret
from doublecloud.transfer.v1.endpoint_service_pb2 import CreateEndpointRequest
from doublecloud.transfer.v1.endpoint_service_pb2_grpc import EndpointServiceStub
def create_ch_dst_endpoint(sdk, project_id, name):
svc = sdk.client(EndpointServiceStub)
operation = svc.Create(
CreateEndpointRequest(
project_id=project_id,
name="clickhouse-target-endpoint",
settings=EndpointSettings(
clickhouse_target=ClickhouseTarget(
connection=ClickhouseConnection(
connection_options=ClickhouseConnectionOptions(
mdb_cluster_id="<your_cluster_id>",
database="db_for_s3",
user="admin",
password=Secret(raw="<cluster_password>"),
)
)
)
),
)
)
return operation
Good work. Now we've created an endpoint that will receive and write the data to your ClickHouse® database. All we need now is the service that will connect both endpoints and transfer the data.
Create a transfer
This is the service that activates the transfer process through the data pipeline. It will connect your endpoints and ensure the integrity of the data.
-
Let's create a transfer using the
TrasnferService
create method with the following parameters:-
source_id
- the endpoint ID for the source endpoint.To find the endpoint ID, get a list of endpoints in the project.
-
target_id
- the endpoint ID for the target endpoint. -
name
- the transfer name,transfer-quickstart
. -
project_id
- the ID of the project in which you create a transfer. You can get this value on your project's information page. -
type
-SNAPSHOT_ONLY
.
-
import json
import logging
import doublecloud
from doublecloud.transfer.v1.transfer_pb2 import TransferType
from doublecloud.transfer.v1.transfer_service_pb2 import CreateTransferRequest
from doublecloud.transfer.v1.transfer_service_pb2_grpc import TransferServiceStub
def create_transfer(sdk, project_id, name, src_id, dst_id):
svc = sdk.client(TransferServiceStub)
operation = svc.Create(
CreateTransferRequest(
source_id=<your_source_endpoint_id>,
target_id=<your_target_endpoint_id>,
name="transfer-quickstart",
project_id=<your_project_id>,
type=TransferType.SNAPSHOT_ONLY
)
)
return operation
Activate a transfer
Now, let's activate it using the TransferService
activate method and pass the transfer ID in the transfer_id
request parameter.
To find the transfer ID, get a list of transfers in the project.
import doublecloud
from doublecloud.transfer.v1.transfer_pb2 import TransferType
from doublecloud.transfer.v1.transfer_service_pb2 import ActivateTransferRequest
from doublecloud.transfer.v1.transfer_service_pb2_grpc import TransferServiceStub
def activate_transfer(svc, transfer_id: str):
return svc.Activate(ActivateTransferRequest(transfer_id=<your_transfer_id>))
Query the data in the Managed ClickHouse® cluster
Check the data transferred to your ClickHouse® database:
-
Open the terminal you used to create the database or connect to your cluster once again as we did above:
DockerNative clickhouse-clientdocker run --network host --rm -it clickhouse/<Native interface connection string>
The complete Docker command structure
docker run --network host --rm -it \ clickhouse/clickhouse-client \ --host <FQDN of your cluster> \ --secure \ --user <cluster user name> \ --password <cluster user password> \ --port 9440
<Native interface connection string>
-
Send the following query to check if your data exists in the cluster. The name of a table in the
db_for_s3
corresponds to the name of the source dataset (hits
).SELECT * FROM db_for_s3.hits
The terminal readout should display the following data:
┌─Browser─┬─Cookie_Enabled─┬─Date───────┬─Gender─┬─Hit_ID─┬─Region_ID─┬─Technology───────────┬─Time_Spent─────────┬─Traffic_Source─┐ │ Firefox │ 0 │ 2016-01-10 │ Male │ 67112 │ 184 │ PC (Mac) │ 388.93975903614455 │ Direct │ │ Chrome │ 0 │ 2016-03-12 │ Female │ 54562 │ 51 │ Smartphone (I0S) │ 325.20392156862744 │ Search engine │ │ Chrome │ 1 │ 2016-03-18 │ Female │ 63833 │ 51 │ Smartphone (I0S) │ 316.09774436090225 │ Search engine │ │ Firefox │ 1 │ 2016-03-24 │ Male │ 43941 │ 51 │ PC (Windows) │ 263.7365269461078 │ Search engine │ │ Safari │ 0 │ 2016-03-30 │ Female │ 38583 │ 51 │ Smartphone (Android) │ 363.8421052631579 │ Internal │
Nice work! You have all the data transferred from a remote source and replicated with complete integrity in your own ClickHouse® database. Now let's make this data earn its worth.
Download the complete code example
You can download the full code listing for all the steps above from our Python SDK GitHub repository