Skip to content

Aws cloud watch can't find groupID #930

Open
@kscooo

Description

@kscooo

Describe the bug

kafka.NewConsumerGroup can't automatically create groupid on aws cloudwatch

Kafka Version

  • kafka 2.6.2
  • kafka-go v0.4.32

To Reproduce

package main

import (
    "github.com/segmentio/kafka-go"
)

func NewMulReader(opts ...Option) *MulReader {
        // read config file 
	cfg := new(options)
	for _, opt := range opts {
		opt.apply(cfg)
	}
        
       // init ConsumerGroup
       creds := credentials.NewStaticCredentials(cfg.auth.accessKeyId, cfg.auth.secretAccessKey, "")
	mechanism := &aws_msk_iam.Mechanism{
		Signer: sigv4.NewSigner(creds),
		Region: cfg.region,
	}
	
	dialer := &kafka.Dialer{
		Timeout:       10 * time.Second,
		DualStack:     true,
		SASLMechanism: mechanism,
		TLS:           &tls.Config{},
	}
	
	c, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{
		ID:      cfg.groupID, // but can't automatically create groupID on aws cloudwatch, but `ListGroups` can find this groupID
		Brokers: cfg.brokers,
		Dialer:  dialer,
		Topics:  []string{cfg.topic},
	})
	if err != nil {
		panic(err)
	}
        // read example: https://pkg.go.dev/github.com/segmentio/kafka-go#example-Generation.Start-ConsumerGroupParallelReaders
}

func NewKafkaClient(opts ...Option) *Reader {
         // read config file 
	cfg := new(options)
	for _, opt := range opts {
		opt.apply(cfg)
	}
	
	creds := credentials.NewStaticCredentials(cfg.auth.accessKeyId, cfg.auth.secretAccessKey, "")
	mechanism := &aws_msk_iam.Mechanism{
		Signer: sigv4.NewSigner(creds),
		Region: cfg.region,
	}
	
	dialer := &kafka.Dialer{
		Timeout:       10 * time.Second,
		DualStack:     true,
		SASLMechanism: mechanism,
		TLS:           &tls.Config{},
	}
	
	r = kafka.NewReader(kafka.ReaderConfig{
			GroupID: cfg.groupID, // can automatically create groupid on aws cloudwatch
			Brokers: cfg.brokers,
			Topic:   cfg.topic,
			Dialer:  dialer,
		})
	
        r.ReadMessage(context.Background())

	r.Close() 
}

Expected Behavior

NewConsumerGroup can automatically create groupID on aws cloudwatch

Observed Behavior

NewReader can automatically create groupid on aws cloudwatch, but NewConsumerGroup can't.
ListGroups can find NewReader groupID and NewConsumerGroup groupID.

Additional Context

I've consulted with aws tech support, but he's not familiar with the code level, and he suspects that the behavior of aws msk may not be consistent with open source kafka, resulting in groupIDs not being created automatically in aws cloudwatch. he suggested that I ask the kafka-go community first to see if that would help.

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions