Kafka Rest Proxy monitoring tool

Rest Proxy

TECH WOLF
3 min readDec 27, 2022

Get all the topics using rest proxy

server:port/topics (GET)

ex: http://localhost:8082/topics

[
"connect-configs",
"_confluent-ksql-default__command_topic",
"_confluent-controlcenter-6-1-0-1-group-aggregate-store-ONE_MINUTE-repartition",
"_confluent-controlcenter-6-1-0-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-repartition",
"purchases"
]

Get topic by name:

GET topics/${topic_name}
Ex:-
http://localhost:8082/topics/purchases

Response (200 OK):
{
"name": "purchases",
"configs": {
"compression.type": "producer",
"confluent.value.schema.validation": "false",
"leader.replication.throttled.replicas": "",
"confluent.key.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
"message.downconversion.enable": "true",
"min.insync.replicas": "1",
"segment.jitter.ms": "0",
"cleanup.policy": "delete",
"flush.ms": "9223372036854775807",
"confluent.tier.local.hotset.ms": "86400000",
"follower.replication.throttled.replicas": "",
"confluent.tier.local.hotset.bytes": "-1",
"confluent.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
"segment.bytes": "1073741824",
"retention.ms": "604800000",
"flush.messages": "9223372036854775807",
"confluent.tier.enable": "false",
"confluent.tier.segment.hotset.roll.min.bytes": "104857600",
"confluent.segment.speculative.prefetch.enable": "false",
"message.format.version": "2.7-IV2",
"max.compaction.lag.ms": "9223372036854775807",
"file.delete.delay.ms": "60000",
"max.message.bytes": "1048588",
"min.compaction.lag.ms": "0",
"message.timestamp.type": "CreateTime",
"preallocate": "false",
"confluent.placement.constraints": "",
"min.cleanable.dirty.ratio": "0.5",
"index.interval.bytes": "4096",
"unclean.leader.election.enable": "false",
"retention.bytes": "-1",
"delete.retention.ms": "86400000",
"confluent.prefer.tier.fetch.ms": "-1",
"confluent.key.schema.validation": "false",
"segment.ms": "604800000",
"message.timestamp.difference.max.ms": "9223372036854775807",
"segment.index.bytes": "10485760"
},
"partitions": [
{
"partition": 0,
"leader": 0,
"replicas": [
{
"broker": 0,
"leader": true,
"in_sync": true
}
]
}
]
}

View Partitions for specific topics:

GET /topics/purchases/partitions 
Ex:-
http://localhost:8082/topics/purchases/partitions

Response (200):
[
{
"partition": 0,
"leader": 0,
"replicas": [
{
"broker": 0,
"leader": true,
"in_sync": true
}
]
}
]

Get specific partitions:

GET /topics/purchases/partitions/${partition}
http://localhost:8082/topics/purchases/partitions/0

Response:

{
"partition": 0,
"leader": 0,
"replicas": [
{
"broker": 0,
"leader": true,
"in_sync": true
}
]
}

Get current offset of specific topic:

GET /topics/purchases/partitions/${topic_name}/offsets
http://localhost:8082/topics/purchases/partitions/0/offsets

Response:
{
"beginning_offset": 0,
"end_offset": 3
}

Brokers:

List current state of all brokers:

GET /brokers

Ex:
http://localhost:8082/brokers

Response(200):

{
"brokers": [
0
]
}

Rest Proxy API v3:

These APIs are available both on Confluent Server (as a part of Confluent Enterprise) and REST Proxy. When using the API in Confluent Server, all paths should be prefixed with /kafka. For example, the path to list clusters is:

  • Confluent Server: /kafka/v3/clusters
  • REST Proxy: /v3/clusters

List Clusters:

GET /v3/clusters
http://localhost:8082/v3/clusters

Response:

{
"kind": "KafkaClusterList",
"metadata": {
"self": "http://localhost:8082/v3/clusters",
"next": null
},
"data": [
{
"kind": "KafkaCluster",
"metadata": {
"self": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA",
"resource_name": "crn:///kafka=VqZyobc4Q-GAClbDY3HEGA"
},
"cluster_id": "VqZyobc4Q-GAClbDY3HEGA",
"controller": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/brokers/0"
},
"acls": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/acls"
},
"brokers": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/brokers"
},
"broker_configs": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs"
},
"consumer_groups": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/consumer-groups"
},
"topics": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/topics"
},
"partition_reassignments": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/topics/-/partitions/-/reassignment"
}
}
]
}

Get Cluster details:

http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA

{
"kind": "KafkaCluster",
"metadata": {
"self": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA",
"resource_name": "crn:///kafka=VqZyobc4Q-GAClbDY3HEGA"
},
"cluster_id": "VqZyobc4Q-GAClbDY3HEGA",
"controller": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/brokers/0"
},
"acls": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/acls"
},
"brokers": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/brokers"
},
"broker_configs": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs"
},
"consumer_groups": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/consumer-groups"
},
"topics": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/topics"
},
"partition_reassignments": {
"related": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/topics/-/partitions/-/reassignment"
}
}

List Cluster Configs:

GET /v3/clusters/${cluster_id}/broker-configs
http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs

Response (200):

{
"kind": "KafkaClusterConfigList",
"metadata": {
"self": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs",
"next": null
},
"data": []
}

Alter Cluster Configs

POST /v3/clusters/${cluster_id}/broker-configs:alter
http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs:alter

RESPONSE:
No Response (204)

Get Cluster Config by Name:

GET /v3/clusters/${cluster_id}/broker-configs/${name} 
http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs/compression.type

Response (200):
{
"kind": "KafkaClusterConfig",
"metadata": {
"self": "http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs/compression.type",
"resource_name": "crn:///kafka=VqZyobc4Q-GAClbDY3HEGA/broker-config=compression.type"
},
"cluster_id": "VqZyobc4Q-GAClbDY3HEGA",
"name": "compression.type",
"value": "gzip",
"is_read_only": false,
"is_sensitive": false,
"source": "DYNAMIC_DEFAULT_BROKER_CONFIG",
"synonyms": [
{
"name": "compression.type",
"value": "gzip",
"source": "DYNAMIC_DEFAULT_BROKER_CONFIG"
}
],
"config_type": "BROKER",
"is_default": false
}

Update Cluster Config:

PUT /v3/clusters/${cluster_id}/broker-configs/${name}
http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs/compression.type

Response (204):
No response content

Delete Cluster Config:

DELETE /v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs/compression.type
http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/broker-configs/compression.type

Resposne (204):
No Content

List All Broker configs:

GET v3/clusters/${cluster_id}/brokers/_/configs
http://localhost:8082/v3/clusters/VqZyobc4Q-GAClbDY3HEGA/brokers/0/configs

Response:

JSON File

--

--