Work with Apache Kafka® schema registry
DoubleCloud Managed Service for Apache Kafka® uses Karapace
Schema examples
Here is an example of a Reservation object with name and number of persons represented in all the supported schema formats. All the fields are required.
{
"type": "record",
"name": "Reservation",
"fields": [
{
"type": "string",
"name": "name"
},
{
"type": "int",
"name": "persons"
}
]
}
{
"type": "object",
"title": "Reservation",
"properties": {
"name": {
"type": "string"
},
"persons": {
"type": "integer"
}
},
"required": ["name", "table"],
"additionalProperties": false
}
syntax = "proto3";
message Reservation {
string name = 1;
int32 persons = 2;
}
Schema backwards compatibility
All the versions of your schema need to be backwards-compatible. This means that you must be able to correctly apply the previous version of the schema to the current data.
For example, these schema iterations are compatible. The v2
schema includes all the fields from the v1
, and adds a new optional field after. Consumers using the new schema can read data written by producers using the latest registered schema:
v1 |
v2 |
|
|
Below is an example of incompatible versions. Now the added field is required. This won't allow to read incoming messages by applying the previous version of the schema. Thus, it'll return an error, and the new version of the schema wouldn't apply:
v1 |
v2 |
|
|
Authorizing in Schema Registry
To send requests to Schema Registry, you must use the basic access authentication Authorization
header.
To get the username and password for your Apache Kafka® cluster:
-
Go to the Clusters
-
Click the cluster for which you want to know the credentials.
-
You'll see the user and password on the Overview tab.
Reading a schema
To read a specific version of your schema, use the following query:
GET <kafka_username>:<kafka_cluster_password> \
https://<your_kafka_host>/subjects/{subject}/versions/{version}/schema
To read the latest version of your schema:
GET <kafka_username>:<kafka_cluster_password> \
https://<your_kafka_host>/subjects/{subject}/versions/latest/schema
To read the schema with a specific ID:
GET <kafka_username>:<kafka_cluster_password> \
https://<your_kafka_host>/schemas/ids/{schema_id}
For example, you can read the available schema using curl
curl -u admin:<kafka_cluster_password> \
https://<your_kafka_host>/subjects/{subject}/versions/latest/schema
Posting a schema
To send a schema to the registry, you should specify application/vnd.schemaregistry.v1+json
content type and wrap schema in the following JSON object:
{"schemaType": "<SCHEMA_TYPE>", "schema": "<SCHEMA_AS_STRING>"}
The above SCHEMA_TYPE
can be one of the following:
JSON
AVRO
PROTOBUF
To upload the new schema version, use the POST
method:
POST <kafka_username>:<kafka_cluster_password> \
https://<your_kafka_host>/subjects/{subject}/versions/
To check the compatibility of a schema without uploading it, use the following query:
POST <kafka_username>:<kafka_cluster_password> \ https://<your_kafka_host>/compatibility/subjects/<subject_name>/versions/<version_number>