REST API reference

Topics

The Topics resource allows you to receive information about the topics in your Apache Kafka® cluster and their current state. It also lets you produce messages by making POST requests to specific topics.

GET / topics

Get a list of Apache Kafka® topics on the cluster.

  • Response JSON object

    Object Data type Description
    topics array List of topic names

Example request

GET /topics HTTP/1.1
Host: double.cloud
Accept: application/vnd.kafka.v2+json

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

["topic1", "topic2"]

GET / topics / topic_name

Get metadata about a specified topic

  • Parameters

    Parameter Data type Description
    topic_name string The name of the topic for which you want to get metadata
  • Response JSON object

    Object Data type Description
    name string The name of the topic
    configs map Available per-topic configuration overrides
    partitions array List of all available partitions in the specified topic
    partitions → partition int the identifier (ID) of the current partition
    partitions → leader int the broker ID of the leader of the current partition
    partitions → replicas array list of replicas of the current partition, including the leader
    partitions → replicas → leader bool Returns true if the current replica is the leader of the partition
    partitions → replicas → in_sync bool Returns true if the current replica is in sync with the leader
  • Error codes

    Status Error code Error message
    404 Not found 40401 Topic not found

Example request

GET /topics/test HTTP/1.1
Accept: application/vnd.kafka.v2+json

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "name": "test",
  "configs": {
     "cleanup.policy": "compact"
  },
  "partitions": [
    {
      "partition": 1,
      "leader": 1,
      "replicas": [
        {
          "broker": 1,
          "leader": true,
          "in_sync": true,
        },
        {
          "broker": 2,
          "leader": false,
          "in_sync": true,
        }
      ]
    },
    {
      "partition": 2,
      "leader": 2,
      "replicas": [
        {
          "broker": 1,
          "leader": false,
          "in_sync": true,
        },
        {
          "broker": 2,
          "leader": true,
          "in_sync": true,
        }
      ]
    }
  ]
}

POST / topics / topic_name

This request allows you to produce messages to a topic, optionally specifying keys or partitions for the messages.

  • If no partition is provided, one will be chosen based on the key hash.

  • If no key is provided, the partition will be chosen for each message based in the schedule quota process (similar to round-robin ).

For the Avro , JSON Schema , and Protobuf embedded formats, provide information about schemas. You also need to configure the REST Proxy with the URL to access Schema Registry (schema.registry.url).

Schemas may be provided as the full schema encoded as a string. Alternatively, you can provide it as the schema ID returned with the first response after the initial request.

  • Parameters

    Parameter Data type Description
    topic_name string The name of the topic for which you want to produce messages
  • Request JSON object

    Object Data type Description
    key_schema string Full schema encoded as a string (for example, a serialized JSON for Avro schema)
    key_schema_id int The identifier (ID) returned by a previous request that used the same schema. This ID corresponds to the ID of the schema in the registry.
    value_schema string Full schema encoded as a string (for example, a serialized JSON for Avro schema)
    value_schema_id int The identifier (ID) returned by a previous request that used the same schema. This ID corresponds to the ID of the schema in the registry.
  • Request JSON array of objects

    Object Data type Description
    records A list of records to produce to the topic
    records → key object Message key according to the declared format (Avro , JSON Schema , or Protobuf ), null to omit a key
    records → value object Message value according to the specified format (Avro , JSON Schema , or Protobuf )
    records → partition int (optional) The number of the partition which will hold the message
  • Response JSON object

    Object Data type Description
    key_schema_id int The ID of the schema used to produce keys, or null if no keys used
    value_schema_id int THe ID of the schema used to produce values
  • Response JSON array of objects

    Object Data type Description
    offsets object List of offsets and partitions to which the messages were published
    offsets → partition int The partition to which the message was published, null if the message failed to publish
    offsets → offset long The offset of the message, null if the message failed to publish
    offsets → error_code long null if operation is successful or an error code classifying the failure type
    offsets → error string null if successful or an error message with further information on the failure
  • Error codes

    Status Error code Error message
    404 Not found 40401 Topic not found
    408 Request timeout 40801 Schema registration lookup failed
    422 Unprocessable entity 42201 Request includes keys and uses a format that requires schemas, but doesn't include the key_schema or key_schema_id fields
    42202 Request includes values and uses a format that requires schemas, but doesn't include the value_schema or value_schema_id fields
    42205 Request includes invalid schema

Example request

POST /topics/test HTTP/1.1
Host: double.cloud
Content-Type: application/vnd.kafka.binary.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "a2V5",
      "value": "Y29uZmx1ZW50"
    },
    {
      "value": "a2Fma2E=",
      "partition": 1
    },
    {
      "value": "bG9ncw=="
    }
  ]
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 2,
      "offset": 100
    },
    {
      "partition": 1,
      "offset": 101
    },
    {
      "partition": 2,
      "offset": 102
    }
  ]
}

Example Avro request

POST /topics/test HTTP/1.1
Host: double.cloud
Content-Type: application/vnd.kafka.avro.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}",
  "records": [
    {
      "value": 12
    },
    {
      "value": 24,
      "partition": 1
    }
  ]
}

Example Avro response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 2,
      "offset": 103
    },
    {
      "partition": 1,
      "offset": 104
    }
  ]
}

Partitions

The Partitions resource provides per-partition metadata, including the current leaders and replicas for each partition. It also allows you to consume and produce messages to a single partition using GET and POST requests.

GET topics / topic_name / partitions

This request allows you to get a list of partitions to the topic.

  • Parameters

    Parameter Data type Description
    topic_name string The name of the topic for which you want to get the list of partitions
  • Response JSON Array of Objects

    Object Data type Description
    partition int Partition ID
    leader int Broker ID for the leader of the current partition
    replicas array List of brokers acting as replicas for the current partition
    replicas → broker int Broker ID of the replica
    replicas → leader bool Returns true if the current broker is the leader of the current partition
    replicas → in sync bool Returns true if the current replica is in sync with the leader
  • Error codes

    Status Error code Error message
    404 Not found 40401 Topic not found

Example request

GET /topics/test/partitions HTTP/1.1
Host: double.cloud
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

[
  {
    "partition": 1,
    "leader": 1,
    "replicas": [
      {
        "broker": 1,
        "leader": true,
        "in_sync": true,
      },
      {
        "broker": 2,
        "leader": false,
        "in_sync": true,
      },
      {
        "broker": 3,
        "leader": false,
        "in_sync": false,
      }
    ]
  },
  {
    "partition": 2,
    "leader": 2,
    "replicas": [
      {
        "broker": 1,
        "leader": false,
        "in_sync": true,
      },
      {
        "broker": 2,
        "leader": true,
        "in_sync": true,
      },
      {
        "broker": 3,
        "leader": false,
        "in_sync": false,
      }
    ]
  }
]

GET / topics / topic_name / partitions / partition_id

This request allows you to get metadata about a specific partition in the topic.

  • Parameters

    Parameter Data type Description
    topic_name string The name of the topic containing the partition in question
    partition_id int ID of the partition for which you want to get metadata
  • Request JSON Object

    Object Data type Description
    partition int Partition ID
    leader int Broker ID for the leader of the current partition
    replicas array List of brokers acting as replicas for the current partition
    replicas → broker int Broker ID of the replica
    replicas → leader bool Returns true if the current broker is the leader of the current partition
    replicas → in sync bool Returns true if the current replica is in sync with the leader
  • Error codes

    Status Error code Error message
    404 Not found 40401 Topic not found
    40402 Partition not found

Example request

GET /topics/test/partitions/1 HTTP/1.1
Host: double.cloud
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "partition": 1,
  "leader": 1,
  "replicas": [
    {
      "broker": 1,
      "leader": true,
      "in_sync": true,
    },
    {
      "broker": 2,
      "leader": false,
      "in_sync": true,
    },
    {
      "broker": 3,
      "leader": false,
      "in_sync": false,
    }
  ]
}

GET / topics / topic_name / partitions / partition_id / offsets

This request allows you to get a summary of the offsets in the specified topic partition.

  • Parameters

    Parameter Data type Description
    topic_name string The name of the topic containing the partition in question
    partition_id int ID of the partition for which you want to summarize the offsets
  • Response JSON Object

    Object Data type Description
    beginning_offset int First offset in the current partition
    end_offset int Final offset in the current partition
  • Error codes

    Status Error code Error message
    404 Not found 40401 Topic not found

Example request

GET /topics/test/partitions/1/offsets HTTP/1.1
Host: double.cloud
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "beginning_offset": 10,
  "end_offset": 50,
}

POST / topics / topic_name / partitions / partition_id

Produce messages to the specified partition of the topic.

For the Avro, JSON Schema, and Protobuf embedded formats, you must provide information about schemas. This may be provided as the full schema encoded as a string, or, after the initial request may be provided as the schema ID returned with the first response.

  • Parameters

    Parameter Data type Description
    topic_name string The name of the topic to which you want to produce messages
    partition_id int ID of the partition for which you want to produce messages
  • Request JSON object

    Object Data type Description
    key_schema string Full schema encoded as a string (for example, a serialized JSON for Avro schema)
    key_schema_id int The identifier (ID) returned by a previous request that used the same schema. This ID corresponds to the ID of the schema in the registry.
    value_schema string Full schema encoded as a string (for example, a serialized JSON for Avro schema)
    value_schema_id int The identifier (ID) returned by a previous request that used the same schema. This ID corresponds to the ID of the schema in the registry.
    records A list of records to produce to the partition
  • Request JSON array of objects

    Object Data type Description
    records → key object Message key according to the declared format (Avro , JSON Schema , or Protobuf ), null to omit a key
    records → value object Message value according to the specified format (Avro , JSON Schema , or Protobuf )
  • Response JSON object

    Object Data type Description
    key_schema_id int The ID of the schema used to produce keys, or null if no keys used
    value_schema_id int THe ID of the schema used to produce values
  • Response JSON array of objects

    Object Data type Description
    offsets object List of offsets and partitions to which the messages were published
    offsets → partition int The partition to which the message was published. This will be the same as the partition_id parameter and is provided only to maintain consistency with responses from producing to a topic
    offsets → offset long The offset of the message, null if the message failed to publish
    offsets → error_code long null if operation is successful or an error code classifying the failure type
    offsets → error string null if successful or an error message with further information on the failure
  • Error codes

    Status Error code Error message
    404 Not found 40401 Topic not found
    40402 Partition not found
    422 Unprocessable entity 42201 Request includes keys and uses a format that requires schemas, but doesn't include the key_schema or key_schema_id fields
    42202 Request includes values and uses a format that requires schemas, but doesn't include the value_schema or value_schema_id fields
    42205 Request includes invalid schema

Example request

POST /topics/test/partitions/1 HTTP/1.1
Host: double.cloud
Content-Type: application/vnd.kafka.binary.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "a2V5",
      "value": "Y29uZmx1ZW50"
    },
    {
      "value": "a2Fma2E="
    }
  ]
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example request

POST /topics/test/partitions/1 HTTP/1.1
Host: double.cloud
Content-Type: application/vnd.kafka.avro.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
  "records": [
    {
      "value": 25
    },
    {
      "value": 26
    }
  ]
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example request

POST /topics/test/partitions/1 HTTP/1.1
Host: double.cloud
Content-Type: application/vnd.kafka.json.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "somekey",
      "value": {"foo": "bar"}
    },
    {
      "value": 53.5
    }
  ]
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example request

POST /topics/test/partitions/1 HTTP/1.1
Content-Type: application/vnd.kafka.protobuf.v2+json
Accept: application/vnd.kafka.v2+json, application/json

{
  "value_schema": "syntax=\"proto3\"; message Foo { string f1 = 1; }"
  "records": [{"value": {"f1": "foo"}}]
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example request

POST /topics/test/partitions/1 HTTP/1.1
Content-Type: application/vnd.kafka.jsonschema.v2+json
Accept: application/vnd.kafka.v2+json, application/json

{
  "value_schema": "{\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}",
  "records": [{"value": {"f1": "bar"}}]
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Consumers

The Consumers resource provides access to the current state of consumer groups, allowing you to do the following:

  • Create a consumer in a consumer group
  • Consume messages from topics and partitions.

REST Proxy can convert data stored in Apache Kafka® in serialized form into a JSON-compatible embedded format. We support the following formats:

  • Raw binary data as base64 strings
  • Avro data is converted into embedded JSON objects
  • JSON is embedded directly
  • Protobuf
  • JSON Schema

As consumers are stateful, any consumer instances created with the REST API are tied to a specific REST Proxy instance.

A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will result in 404 errors because the consumer instance will not be found.

If a REST Proxy instance is shut down, it will attempt to cleanly destroy any consumers before it's terminated.

POST / consumers / group_name

Create a new consumer instance in the specified consumer group.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group in which you want to create a new consumer instance
  • Request JSON object

    Object Data type Description
    name string Name of the consumer instance to create. Must be unique within the cluster. Leave it empty to generate an automatic name
    format string The format of the consumed messages used to convert them into a JSON-compatible form. Possible values: avro, binary, json, jsonschema, protobuf.
    auto.offset.reset string Sets the auto.offset.reset parameter for the consumer
    auto.commit.enable string Sets the auto.commit.enableparameter for the consumer
    fetch.min.bytes string Sets the fetch.min.bytes parameter for the consumer
    consumer.request.timeout.ms string Sets the consumer.request.timeout parameter in milliseconds for the consumer
  • Response JSON object

    Object Data type Description
    instance_id string The unique ID of the consumer instance created in the specified group
    base_uri string Base URI used in constructing the requests to the created consumer instance.
  • Error codes

    Status Error code Error message
    409 Conflict 40902 Consumer instance with the specified name already exists
    422 Unprocessable entity 42204 Invalid consumer configuration. One of the settings specified in the request contained an invalid value.

Example request

POST /consumers/testgroup/ HTTP/1.1
Host: double.cloud
Content-Type: application/vnd.kafka.v2+json


{
  "name": "my_consumer",
  "format": "binary",
  "auto.offset.reset": "earliest",
  "auto.commit.enable": "false"
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "instance_id": "my_consumer",
  "base_uri": "http://proxy-instance.double.cloud/consumers/testgroup/instances/my_consumer"
}

Example request

POST /consumers/testgroup/ HTTP/1.1
Host: double.cloud
Content-Type: application/vnd.kafka.protobuf.v2+json


{
  "name": "my_consumer",
  "format": "protobuf",
  "auto.offset.reset": "earliest",
  "auto.commit.enable": "false"
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.protobuf.v2+json

{
  "instance_id": "my_consumer",
  "base_uri": "http://proxy-instance.double.cloud/consumers/my_protobuf_consumer"
}

Example request

POST /consumers/testgroup/ HTTP/1.1
Host: double.cloud
Content-Type: application/vnd.kafka.jsonschema.v2+json

{
  "name": "my_consumer",
  "format": "jsonschema",
  "auto.offset.reset": "earliest",
  "auto.commit.enable": "false"
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.jsonschema.v2+json

{
  "instance_id": "my_consumer",
  "base_uri": "http://proxy-instance.double.cloud/consumers/my_jsonschema_consumer"
}

DELETE / consumers / group_name / instance / instance_name

Delete the specified consumer instance.

Note

You must make this request to the specific REST Proxy instance holding the consumer instance in question.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group from which you want to delete a consumer instance
    instance string The ID of the consumer instance you want to delete
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found

Example request

DELETE /consumers/testgroup/instances/my_consumer HTTP/1.1
Host: proxy-instance.double.cloud
Content-Type: application/vnd.kafka.v2+json

Example response

HTTP/1.1 204 No Content

POST / consumers / group_name / instances / instance_name / offsets

Commit a list of offsets for the consumer.

When the post body is empty, it commits all the records fetched by the consumer instance.

Note

You must make this request to the specific REST Proxy instance holding the consumer instance in question.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group from which you want to delete a consumer instance
    instance string The ID of the consumer instance you want to delete
  • Request JSON array of objects

    Object Data type Description
    offsets List of offsets to commit for partitions
    offsets → topic string Topic name
    offsets → partition Int Partition ID
    offset The offset to commit
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found

Example request

POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.double.cloud
Content-Type: application/vnd.kafka.v2+json

{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 20
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": 30
    }
  ]
}

GET / consumers / group_name / instances / instance_name / offsets

Get the last committed offsets for the specified partitions (whether the commit happened by the current process or another).

Note

You must make this request to the specific REST Proxy instance holding the consumer instance in question.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group
    instance string The ID of the consumer instance
  • Request JSON array of objects

    Object Data type Description
    partitions List of partitions for which you want to view the last committed offsets
    partitions → topic string Topic name
    partitions → partition int Partition ID
  • Response JSON array of objects

    Object Data type Description
    offsets List of committed offsets
    offsets → topic string Name of the topic for which the offset was committed
    offsets → partition int ID of the partition for which the offset was committed
    offsets → offset int The committed offset
    offsets → metadata string Metadata for the committed offset
  • Error codes

    Status Error code Error message
    404 Not found 40402 Partition not found
    40403 Consumer instance not found

Example request

GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.double.cloud
Content-Type: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{"offsets":
[
  {
    "topic": "test",
    "partition": 0,
    "offset": 21,
    "metadata":""
  },
  {
    "topic": "test",
    "partition": 1,
    "offset": 31,
    "metadata":""
  }
]
}

POST / consumers / group_name / instances / instance / subscription

Subscribe to the specified list of topics or a topic pattern to get dynamically assigned partitions.

If a prior subscription exists, it will be replaced by the latest subscription.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group
    instance string The ID of the consumer instance
  • Request JSON array of objects

    Object Data type Description
    topics List of topics to subscribe
    topics → topic string Topic name
  • Request JSON object

    Object Data type Description
    topic_pattern string List of committed offsets
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found
    409 Conflict 40903 Subscriptions to topics, partitions and pattern are mutually exclusive

Example request

POST /consumers/testgroup/instances/my_consumer/subscription HTTP/1.1
Host: proxy-instance.double.cloud
Content-Type: application/vnd.kafka.v2+json

{
  "topics": [
    "test1",
    "test2"
  ]
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{"offsets":
[
  {
    "topic": "test",
    "partition": 0,
    "offset": 21,
    "metadata":""
  },
  {
    "topic": "test",
    "partition": 1,
    "offset": 31,
    "metadata":""
  }
]
}

GET / consumers / group_name / instances / instance / assignments

Get the list of partitions manually assigned to the specified consumer.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group
    instance string The ID of the consumer instance
  • Request JSON array of objects

    Object Data type Description
    partitions List of partitions manually assigned to the specified consumer
    partitions → topic string Topic name
    partitions → partition int Partition ID
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found

Example request

GET /consumers/testgroup/instances/my_consumer/assignments HTTP/1.1
Host: proxy-instance.double.cloud
Accept: application/vnd.kafka.v2+json

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

POST / consumers / group_name / instances / instance / assignments

Manually assign a list of partitions to the specified consumer.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group
    instance string The ID of the consumer instance
  • Request JSON array of objects

    Object Data type Description
    partitions List of partitions to assign to the specified consumer
    partitions → topic string Name of the topic
    partitions → partition int ID of the partition
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found
    409 Conflict 40903 Subscription to topics, partitions and pattern are mutually exclusive

Example request

POST /consumers/testgroup/instances/my_consumer/assignments HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

Example response

HTTP/1.1 204 No Content

POST / consumers / group_name / instances / instance / positions

Overrides the fetch offsets that the consumer will use for the next set of records to fetch.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group
    instance string The ID of the consumer instance
  • Request JSON array of objects

    Object Data type Description
    offsets List of offsets
    offsets → topic string Name of the topic for which to override the offsets
    offsets → partition int ID of the partition for which to override the offsets
    offsets → offset int Seek to offset for the next set of records to fetch
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found

Example request

POST /consumers/testgroup/instances/my_consumer/positions HTTP/1.1
Host: proxy-instance.double.cloud
Content-Type: application/vnd.kafka.v2+json


{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 20
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": 30
    }
  ]
}

Example response

HTTP/1.1 204 No Content

POST / consumers / group_name / instances / instance / positions / beginning

Seek to the first offset for each of the specified partitions.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group
    instance string The ID of the consumer instance
  • Request JSON array of objects

    Object Data type Description
    partitions List of partitions manually assigned to the specified consumer
    partitions → topic string Topic name
    partitions → partition int Partition ID
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found

Example request

POST /consumers/testgroup/instances/my_consumer/positions/beginning HTTP/1.1
Host: proxy-instance.double.cloud
Content-Type: application/vnd.kafka.v2+json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

Example response

HTTP/1.1 204 No Content

POST / consumers / group_name / instances / instance / positions / end

Seek to the last offset for each of the specified partitions.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group
    instance string The ID of the consumer instance
  • Request JSON array of objects

    Object Data type Description
    partitions List of partitions manually assigned to the specified consumer
    partitions → topic string Topic name
    partitions → partition int Partition ID
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found

Example request

POST /consumers/testgroup/instances/my_consumer/positions/end HTTP/1.1
Host: proxy-instance.double.cloud
Content-Type: application/vnd.kafka.v2+json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

Example response

HTTP/1.1 204 No Content

GET / consumers / group_name / instances / instance / records

Fetch data for the specified topics or partitions using one of the subscribe/assign APIs.

The format of the embedded data returned by this request is determined by the format specified in the initial consumer instance creation request and must match the format of the Accept header. Mismatches will result in error code 40601.

Note

You must make this request to the specific REST Proxy instance holding the consumer instance in question.

  • Parameters

    Parameter Data type Description
    group_name string The name of the consumer group
    instance string The ID of the consumer instance
  • Query parameters

    Parameter

    Description

    timeout

    Maximum amount of milliseconds the REST Proxy will spend fetching records.

    Other parameters controlling actual time spent fetching records:

    • max_bytes
    • fetch.min.bytes.

    Default value: undefined.

    This parameter is used only if it’s smaller than the consumer.timeout.ms defined either during consumer instance creation or in the REST Proxy’s configuration file.

    max_bytes

    The maximum number of bytes of unencoded keys and values to be included in the response.

    This provides approximate control over the size of responses and the amount of memory required to store the decoded response.

    The actual limit will be the minimum of this setting and the server-side configuration of consumer.request.max.bytes.

    Default value: unlimited.

  • Response JSON Array of objects

    Object Data type Description
    topic string The topic
    key string The message key formatted according to the embedded format (Avro , JSON Schema , or Protobuf )
    value string The message value formatted according to the embedded format (Avro , JSON Schema , or Protobuf )
    partition int Partition containing the message
    offset long Offset of the message
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found
    406 Not acceptable 40601 Consumer format does not match the embedded format requested by the Accept header

Example request

 GET /consumers/testgroup/instances/my_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1
 Host: proxy-instance.double.cloud
 Accept: application/vnd.kafka.binary.v2+json

Example response

 HTTP/1.1 200 OK
 Content-Type: application/vnd.kafka.binary.v2+json

 [
 {
    "topic": "test",
    "key": "a2V5",
    "value": "Y29uZmx1ZW50",
    "partition": 1,
    "offset": 100,
 },
 {
    "topic": "test",
    "key": "a2V5",
    "value": "a2Fma2E=",
    "partition": 2,
    "offset": 101,
 }
 ]

Example request

 GET /consumers/avrogroup/instances/my_avro_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1
 Host: proxy-instance.double.cloud
 Accept: application/vnd.kafka.avro.v2+json

Example response

 HTTP/1.1 200 OK
 Content-Type: application/vnd.kafka.avro.v2+json

 [
 {
    "topic": "test",
    "key": 1,
    "value": {
       "id": 1,
       "name": "Bill"
    },
    "partition": 1,
    "offset": 100,
 },
 {
    "topic": "test",
    "key": 2,
    "value": {
       "id": 2,
       "name": "Melinda"
    },
    "partition": 2,
    "offset": 101,
 }
 ]

Example request

 GET /consumers/jsongroup/instances/my_json_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1
 Host: proxy-instance.double.cloud
 Accept: application/vnd.kafka.json.v2+json

Example response

 HTTP/1.1 200 OK
 Content-Type: application/vnd.kafka.json.v2+json

 [
 {
    "topic": "test",
    "key": "somekey",
    "value": {"foo":"bar"},
    "partition": 1,
    "offset": 10,
 },
 {
    "topic": "test",
    "key": "somekey",
    "value": ["foo", "bar"],
    "partition": 2,
    "offset": 11,
 }
 ]

Brokers

The Brokers resource provides access to the current state of Apache Kafka® brokers in the cluster.

GET / brokers

Get a list of brokers in the cluster.

  • Response JSON object

    Object Data type Description
    brokers array List of broker IDs
  • Error codes

    Status Error code Error message
    404 Not found 40403 Consumer instance not found
    406 Not acceptable 40601 Consumer format does not match the embedded format requested by the Accept header

Example request

  GET /brokers HTTP/1.1
  Host: <your_doublecloud_host_address>
  Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

Example response

  HTTP/1.1 200 OK
  Content-Type: application/vnd.kafka.v2+json

  {
  "brokers": [1, 2, 3]
  }

Errors

REST API endpoints return standard HTTP error codes as a JSON. For example, an error report for a topic that doesn't exist looks as follows:

HTTP/1.1 404 Not Found
Content-Type: application/vnd.kafka.v2+json

{
    "error_code": 40401,
    "message": "Topic not found"
}

Below you can find the list of most frequent error statuses, codes and messages:

Status Error code Error message
401 Unauthorized 40101 Kafka authentication error
403 Forbidden 40301 Kafka Authorization error
404 Not found 40401 Topic not found
40402 Partition not found
500 Internal server error 50001 Zookeeper error
50002 Kafka error
50003 Retriable Kafka error. Although the operation failed, it’s possible that retrying the request will be successful.
50101 Found only SSL endpoints for the specified broker, but SSL isn't supported for the invoked API yet.