Description
Currently, I’m working with Kafka in a multi-region setup, where I use replica-placement configurations within topic settings. I developed an internal tool using the segmentio/kafka-go package to automate topic creation in our pipeline whenever a branch is merged. The topics are created automatically based on YAML file configurations..
example topic-config.yaml
topics:
topic_one:
partitions: 1
replicas: 3
configs:
retentiton.ms: 720000
topic_two:
partitions: 6
replicas: 2
configs:
min.insync.replicas: 3
This configuration works smoothly.
However, we have a Kafka cluster where topics need to be created with the replica-placement option, and we have been doing it manually and we would like to automate it, follow the example;
kafka-topics --bootstrap-server $kafka_broker --create --topic topic_name --partitions 5 --config "min.insync.replicas=3" --replica--placement topic-reg.json
# content of topic-reg.json
{
"version": 1,
"replicas": [{"count": 3, "constraints": {"rack": "us-east-1"}]
"observers": [{"count": 3, "constraints": {"rack": "us-east-2"}]
}
I tried implementing this in code using the segmentio/kafka-go package, but it’s not working properly. I attempted to pass this JSON file into a Kafka.ConfigEntry object, and the code looks something like this:
constraints := PlacementConstraints{
Version: 1,
Replicas: []Replica{
{Count: 3, Constraints: Constraint{Rack: "us-east-1"}},
},
Observers: []Replica{
{Count: 3, Constraints: Constraint{Rack: "us-east-2"}},
},
}
jsonConstraints, _ := json.Marshal(constraints)
topicConfig := kafka.TopicConfig{
Topic: "topic-name",
ConfigEntries: []kafka.ConfigEntry{
{
ConfigName: "confluent.placement.constraints",
ConfigValue: string(jsonConstraints)
},
},
}
However, I’m getting an error from the segmentio/kafka-go package:
Invalid Request: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. Check the broker logs for more details.
output should be like this;
Topic: topic_one TopicId: DRdkdkd0djdjkdkd PartitionCount: 3 Configs: min.insync.replicas=3,retention.ms=720000,confluent.placement.constraints={"version": 1,"replicas": [{"count": 3, "constraints": {"rack": "us-east-1"}], "observers": [{"count": 3, "constraints": {"rack": "us-east-2"}]}
The confluent.placement.constraints
option is within the Configs
object, but it seems the library doesn’t support it.
Supporting documentation