Skip to content

Support for replica placement constraints topic configuration #1327

Open
@rca0

Description

@rca0

Currently, I’m working with Kafka in a multi-region setup, where I use replica-placement configurations within topic settings. I developed an internal tool using the segmentio/kafka-go package to automate topic creation in our pipeline whenever a branch is merged. The topics are created automatically based on YAML file configurations..

example topic-config.yaml

topics:
  topic_one:
    partitions: 1
    replicas: 3
    configs:
        retentiton.ms: 720000

  topic_two:
    partitions: 6
    replicas: 2
    configs:
        min.insync.replicas: 3

This configuration works smoothly.
However, we have a Kafka cluster where topics need to be created with the replica-placement option, and we have been doing it manually and we would like to automate it, follow the example;

kafka-topics --bootstrap-server $kafka_broker --create --topic topic_name --partitions 5 --config "min.insync.replicas=3" --replica--placement topic-reg.json

# content of topic-reg.json
{
  "version": 1,
  "replicas": [{"count": 3, "constraints": {"rack": "us-east-1"}]  
  "observers": [{"count": 3, "constraints": {"rack": "us-east-2"}]  
}

I tried implementing this in code using the segmentio/kafka-go package, but it’s not working properly. I attempted to pass this JSON file into a Kafka.ConfigEntry object, and the code looks something like this:

    constraints := PlacementConstraints{
        Version: 1,
        Replicas: []Replica{
            {Count: 3, Constraints: Constraint{Rack: "us-east-1"}},
        },
        Observers: []Replica{
            {Count: 3, Constraints: Constraint{Rack: "us-east-2"}},
        },
    }

    jsonConstraints, _ := json.Marshal(constraints)
    topicConfig := kafka.TopicConfig{
        Topic:             "topic-name",
        ConfigEntries: []kafka.ConfigEntry{
            {
                ConfigName:  "confluent.placement.constraints",
                ConfigValue: string(jsonConstraints)
            },
        },
    }

However, I’m getting an error from the segmentio/kafka-go package:

Invalid Request: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. Check the broker logs for more details.

output should be like this;

Topic: topic_one   TopicId: DRdkdkd0djdjkdkd PartitionCount: 3 Configs: min.insync.replicas=3,retention.ms=720000,confluent.placement.constraints={"version": 1,"replicas": [{"count": 3, "constraints": {"rack": "us-east-1"}], "observers": [{"count": 3, "constraints": {"rack": "us-east-2"}]}

The confluent.placement.constraints option is within the Configs object, but it seems the library doesn’t support it.

Supporting documentation

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions