An experimental approach to migrate from ElasticSearch to ClickHouse quickly and easily
Written by: Eraldo Jaku, DoubleCloud Senior Backend Engineer.
October 2, 2023
10 mins to read
Migrating data from ElasticSearch to ClickHouse is quite easy with our ElasticSearch connector. It comes with handy functionalities that simplify the configuration process and relies on some basic assumptions for the limitations that we came across when working on ES documents.
So what exactly does this ElasticSearch connector look like?
Let’s start with the most important step of the process — the schema deduction. To sync data to a ClickHouse target, we need to provide the target with a schema:
The ElasticSearch connector in this case relies heavily on the data already present in the configured cluster to deduce the said schema.
The ElasticSearch connector uses the Get mapping API to extract the mappings of each index. Using this mapping, we then extract the corresponding column names and data types from the JSON key/value pairs.
Here’s what a simple mapping example could look like:
From these sample mappings, the ElasticSearch connector would extract the following column names: _id, age, email, employee_id and name. Currently, only first-level key-value pairs are considered, so nested objects are not extracted to their own columns. The odd one here is the _id column name — it is prepended by default to every schema as the first column and will be filled with the _id of each document in ElasticSearch.
Each column derived in the previous step also needs to be provided its corresponding data type. These data types are also extracted from the same mapping response. Internally we map each possible data type coming from ElasticSearch to one of our internal data types. For example, for the age column we would first extract the ElasticSearch integer type from the mappings, which internally maps to our int32 data type. For a more exhaustive list of these data type mappings see our Source data type mapping.
There are a few pitfalls and particularities here though, namely ElasticSearch Arrays and Alias data types.
ElasticSearch has the particularity that every field can contain zero or more values by default so potentially any field can be an array (see the ElasticSearch docs for more information on Arrays).This makes the whole processing quite tricky since at runtime a field from a document might return more than one value, at which point we can’t amend the table schema/column data type anymore. This would then result in a mismatch of extracted and deduced data type and will lead to an error being raised.
Since the mappings provided by ElasticSearch do not help in detecting such cases the only actual solution would be to iterate all the documents and analyze each field separately for array-ness. To avoid such a lengthy and inefficient process we decided to introduce a small but important assumption here, in order to avoid checking all the documents.
We assume the first document in the index (sorted by _id ASC) is our baseline and all following documents follow the same format as the baseline. We then analyze all fields in the baseline document and enrich the deduced schema by updating all the field data types where we encounter arrays to our own internal Any type, which can store JSON arrays.
All first-level fields which are of the Alias data type in the mappings need inferring of the actual data type in order to have a valid schema. The ElasticSearch connector will infer this data type from the mappings themselves by searching through the mappings for the actual data type. In this case, the searching also involves any possible nested key-value pairs until the correct value is found or an error occurs.
In general, the ElasticSearch connector uses the Scrolling Search Results mechanism to fetch all the documents of an index.
This can be a lengthy process for big indexes in ElasticSearch, however, since one document equals one row on ClickHouse. In order to increase the performance of our transfers, we rely on ElasticSearch’s Sliced Scrolling for parallel document processing.
The parallelization degree depends on the number of active shards in the source ElasticSearch cluster. This parallelization factor is then used to slice each index into chunks of data that can be ingested and processed in parallel, increasing our throughput.