Work with Apache Kafka® schema registry

DoubleCloud Managed Service for Apache Kafka® uses Karapace to standardize the message structure.

Schema examples

Here is an example of a Reservation object with name and number of persons represented in all the supported schema formats. All the fields are required.

{
  "type": "record",
  "name": "Reservation",
  "fields": [
      {
        "type": "string",
        "name": "name"
      },
      {
        "type": "int",
        "name": "persons"
      }
  ]
}
{
  "type": "object",
  "title": "Reservation",
  "properties": {
    "name": {
      "type": "string"
    },
    "persons": {
      "type": "integer"
    }
  },
  "required": ["name", "table"],
  "additionalProperties": false
}
syntax = "proto3";

message Reservation {
  string name = 1;
  int32 persons = 2;
}

Schema backwards compatibility

All the versions of your schema need to be backwards-compatible. This means that you must be able to correctly apply the previous version of the schema to the current data.

For example, these schema iterations are compatible. The v2 schema includes all the fields from the v1, and adds a new optional field after. Consumers using the new schema can read data written by producers using the latest registered schema:

v1

v2

{
  "type": "object",
  "title": "Reservation",
  "properties": {
    "name": {
      "type": "string"
    },
    "persons": {
      "type": "integer"
    }
  },
  "required": ["name", "table"],
  "additionalProperties": false
}
{
  "type": "object",
  "title": "Reservation",
  "properties": {
    "name": {
      "type": "string"
    },
    "persons": {
      "type": "integer"
    },
    "comment": {
      "type": "string"
    }
  },
  "required": ["name", "table"],
  "additionalProperties": false
}

Below is an example of incompatible versions. Now the added field is required. This won't allow to read incoming messages by applying the previous version of the schema. Thus, it'll return an error, and the new version of the schema wouldn't apply:

v1

v2

{
  "type": "object",
  "title": "Reservation",
  "properties": {
    "name": {
      "type": "string"
    },
    "persons": {
      "type": "integer"
    }
  },
#>"required": ["name", "table"],
  "additionalProperties": false
}
{
  "type": "object",
  "title": "Reservation",
  "properties": {
    "name": {
      "type": "string"
    },
    "persons": {
      "type": "integer"
    },
    "comment": {
      "type": "string"
    }
  },
#>"required": ["name", "table", "comment"],
  "additionalProperties": false
}

Authorizing in Schema Registry

To send requests to Schema Registry, you must use the basic access authentication . Specify the username and password of the Apache Kafka® user using the Authorization header.

To get the username and password for your Apache Kafka® cluster:

  1. Go to the Clusters page in the console.

  2. Click the cluster for which you want to know the credentials.

  3. You'll see the user and password on the Overview tab.

Reading a schema

To read a specific version of your schema, use the following query:

GET <kafka_username>:<kafka_cluster_password> \
https://<your_kafka_host>/subjects/{subject}/versions/{version}/schema

To read the latest version of your schema:

GET <kafka_username>:<kafka_cluster_password> \
https://<your_kafka_host>/subjects/{subject}/versions/latest/schema

To read the schema with a specific ID:

GET <kafka_username>:<kafka_cluster_password> \
https://<your_kafka_host>/schemas/ids/{schema_id}

For example, you can read the available schema using curl :

curl -u admin:<kafka_cluster_password> \
https://<your_kafka_host>/subjects/{subject}/versions/latest/schema

Posting a schema

To send a schema to the registry, you should specify application/vnd.schemaregistry.v1+json content type and wrap schema in the following JSON object:

{"schemaType": "<SCHEMA_TYPE>", "schema": "<SCHEMA_AS_STRING>"}

The above SCHEMA_TYPE can be one of the following:

  • JSON
  • AVRO
  • PROTOBUF

To upload the new schema version, use the POST method:

POST <kafka_username>:<kafka_cluster_password> \
https://<your_kafka_host>/subjects/{subject}/versions/

To check the compatibility of a schema without uploading it, use the following query:

POST <kafka_username>:<kafka_cluster_password> \ https://<your_kafka_host>/compatibility/subjects/<subject_name>/versions/<version_number>

See also