Open
Description
Describe the bug
kafka.NewConsumerGroup
can't automatically create groupid on aws cloudwatch
Kafka Version
- kafka 2.6.2
- kafka-go v0.4.32
To Reproduce
package main
import (
"github.com/segmentio/kafka-go"
)
func NewMulReader(opts ...Option) *MulReader {
// read config file
cfg := new(options)
for _, opt := range opts {
opt.apply(cfg)
}
// init ConsumerGroup
creds := credentials.NewStaticCredentials(cfg.auth.accessKeyId, cfg.auth.secretAccessKey, "")
mechanism := &aws_msk_iam.Mechanism{
Signer: sigv4.NewSigner(creds),
Region: cfg.region,
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
TLS: &tls.Config{},
}
c, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{
ID: cfg.groupID, // but can't automatically create groupID on aws cloudwatch, but `ListGroups` can find this groupID
Brokers: cfg.brokers,
Dialer: dialer,
Topics: []string{cfg.topic},
})
if err != nil {
panic(err)
}
// read example: https://pkg.go.dev/github.com/segmentio/kafka-go#example-Generation.Start-ConsumerGroupParallelReaders
}
func NewKafkaClient(opts ...Option) *Reader {
// read config file
cfg := new(options)
for _, opt := range opts {
opt.apply(cfg)
}
creds := credentials.NewStaticCredentials(cfg.auth.accessKeyId, cfg.auth.secretAccessKey, "")
mechanism := &aws_msk_iam.Mechanism{
Signer: sigv4.NewSigner(creds),
Region: cfg.region,
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
TLS: &tls.Config{},
}
r = kafka.NewReader(kafka.ReaderConfig{
GroupID: cfg.groupID, // can automatically create groupid on aws cloudwatch
Brokers: cfg.brokers,
Topic: cfg.topic,
Dialer: dialer,
})
r.ReadMessage(context.Background())
r.Close()
}
Expected Behavior
NewConsumerGroup
can automatically create groupID on aws cloudwatch
Observed Behavior
NewReader
can automatically create groupid on aws cloudwatch, butNewConsumerGroup
can't.
ListGroups
can findNewReader
groupID andNewConsumerGroup
groupID.
Additional Context
I've consulted with aws tech support, but he's not familiar with the code level, and he suspects that the behavior of aws msk may not be consistent with open source kafka, resulting in groupIDs not being created automatically in aws cloudwatch. he suggested that I ask the kafka-go community first to see if that would help.