Open
Description
Describe the bug
When using Kafka Reader to read messages from kafka with StartOffset set to last offset and group id was never used before the Reader just doesn't read any messages
Kafka Version
- What version(s) of Kafka are you testing against?
7.6.0- What version of kafka-go are you using?
v0.4.47
To Reproduce
Resources to reproduce the behavior:
# docker compose file generated with https://github.com/sknop/kafka-docker-composer
services:
controller-1:
image: confluentinc/cp-server:7.6.0
hostname: controller-1
container_name: controller-1
environment:
KAFKA_NODE_ID: 1
CLUSTER_ID: Nk018hRAQFytWskYqtQduw
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://controller-1:19091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: CONTROLLER
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:19091
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: controller-1
KAFKA_BROKER_RACK: rack-0
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
KAFKA_OFFSET_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
KAFKA_CONFLUENT_METADATA_TOPIC_REPLICATION_FACTOR: 2
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 2
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 2
KAFKA_OPTS: -javaagent:/tmp/jmx_prometheus_javaagent-0.20.0.jar=8091:/tmp/kafka_config.yml
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093
cap_add:
- NET_ADMIN
ports:
- 19091:19091
volumes:
- $PWD/volumes/jmx_prometheus_javaagent-0.20.0.jar:/tmp/jmx_prometheus_javaagent-0.20.0.jar
- $PWD/volumes/kafka_config.yml:/tmp/kafka_config.yml
kafka-1:
image: confluentinc/cp-server:7.6.0
hostname: kafka-1
container_name: kafka-1
healthcheck:
test: curl -fail --silent http://kafka-1:8090/kafka/v3/clusters/ --output /dev/null || exit 1
interval: 10s
retries: 10
start_period: 20s
depends_on:
- controller-1
environment:
KAFKA_LISTENERS: PLAINTEXT://kafka-1:19092, EXTERNAL://0.0.0.0:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092, EXTERNAL://localhost:9091
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 10001
KAFKA_JMX_HOSTNAME: localhost
KAFKA_BROKER_RACK: rack-0
KAFKA_OPTS: -javaagent:/tmp/jmx_prometheus_javaagent-0.20.0.jar=8091:/tmp/kafka_config.yml
KAFKA_MIN_INSYNC_REPLICAS: 1
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
KAFKA_CONFLUENT_CLUSTER_LINK_ENABLE: False
KAFKA_CONFLUENT_REPORTERS_TELEMETRY_AUTO_ENABLE: False
KAFKA_NODE_ID: 2
CLUSTER_ID: Nk018hRAQFytWskYqtQduw
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:19091
KAFKA_PROCESS_ROLES: broker
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
cap_add:
- NET_ADMIN
ports:
- 9091:9091
- 10001:10001
- 10101:8091
- 10201:8090
volumes:
- $PWD/volumes/jmx_prometheus_javaagent-0.20.0.jar:/tmp/jmx_prometheus_javaagent-0.20.0.jar
- $PWD/volumes/kafka_config.yml:/tmp/kafka_config.yml
kafka-2:
image: confluentinc/cp-server:7.6.0
hostname: kafka-2
container_name: kafka-2
healthcheck:
test: curl -fail --silent http://kafka-2:8090/kafka/v3/clusters/ --output /dev/null || exit 1
interval: 10s
retries: 10
start_period: 20s
depends_on:
- controller-1
environment:
KAFKA_LISTENERS: PLAINTEXT://kafka-2:19093, EXTERNAL://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:19093, EXTERNAL://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 10002
KAFKA_JMX_HOSTNAME: localhost
KAFKA_BROKER_RACK: rack-0
KAFKA_OPTS: -javaagent:/tmp/jmx_prometheus_javaagent-0.20.0.jar=8091:/tmp/kafka_config.yml
KAFKA_MIN_INSYNC_REPLICAS: 1
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
KAFKA_CONFLUENT_CLUSTER_LINK_ENABLE: False
KAFKA_CONFLUENT_REPORTERS_TELEMETRY_AUTO_ENABLE: False
KAFKA_NODE_ID: 3
CLUSTER_ID: Nk018hRAQFytWskYqtQduw
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:19091
KAFKA_PROCESS_ROLES: broker
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
cap_add:
- NET_ADMIN
ports:
- 9092:9092
- 10002:10002
- 10102:8091
- 10202:8090
volumes:
- $PWD/volumes/jmx_prometheus_javaagent-0.20.0.jar:/tmp/jmx_prometheus_javaagent-0.20.0.jar
- $PWD/volumes/kafka_config.yml:/tmp/kafka_config.yml
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- controller-1
- kafka-1
- kafka-2
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: controller-1:19091,kafka-1:19092,kafka-2:19093
KAFKA_CLUSTERS_0_METRICS_PORT: 9999
DYNAMIC_CONFIG_ENABLED: 'true'
portainer:
image: portainer/portainer-ce:2.20.2
container_name: portainer
restart: unless-stopped
ports:
- 9000:9000
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- portainer_data:/data
volumes:
portainer_data:
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"strings"
"syscall"
"time"
"example.com/kafka/util/configurator"
timeutil "example.com/kafka/util/time-util"
"github.com/segmentio/kafka-go"
)
var (
cfgFileName string
)
func init() {
flag.StringVar(&cfgFileName, "cfg", "/home/veljko/Desktop/Faks/Master Studije/Prva Godina/2. Semestar/Big Data инфраструктуре и сервиси/Projekat/kod/Kafka/Code/kafka/doc/consumer.conf", "File path to the file containing the configuration")
}
type Message struct {
ID string `json:"id"`
Type int `json:"type"`
Kind int `json:"kind"`
Content string `json:"content"`
}
func main() {
flag.Parse()
defer timeutil.MeasureExecutionTime(time.Now())
var config configurator.KafkaConsumerConfig
// Load configuration
err := configurator.LoadConfiguration(cfgFileName, &config)
if err != nil {
log.Fatalf("Failed to load configuration: %s", err)
}
// Create new Kafka reader
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(config.BootstrapServers, ","),
GroupID: config.GroupID,
Topic: config.Topic,
StartOffset: kafka.LastOffset,
MaxBytes: config.MaxBytes, // 10MB max message size
})
// Close the reader when main function exits
defer r.Close()
// Handle OS signals for graceful shutdown
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
// Start consuming messages
for {
select {
case <-sigchan:
slog.Info("Received shutdown signal. Shutting down consumer...")
return
default:
// Read a message from Kafka
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
msg, err := r.ReadMessage(ctx)
cancel()
if err != nil {
slog.Error(fmt.Sprintf("Error reading message: %v", err))
continue
}
// Process the message
var message Message
err = json.Unmarshal(msg.Value, &message)
if err != nil {
slog.Error(fmt.Sprintf("Error decoding message: %v", err))
continue
}
// Handle the message
slog.Info(fmt.Sprintf("Received message: %+v\n", message))
}
}
}
Config for go program
{
"topic":"json_test_topic",
"bootstrapServers":"localhost:9090,localhost:9091,localhost:9092",
"groupId": "test-group",
"batchBytes": 1000000
}
Expected Behavior
Program should consume messages from kafka and output them to console
Observed Behavior
Nothing happens
2024/05/13 22:22:21 ERROR Error reading message: fetching message: context deadline exceeded
^C2024/05/13 22:22:26 ERROR Error reading message: fetching message: context deadline exceeded
2024/05/13 22:22:26 INFO Received shutdown signal. Shutting down consumer...
2024/05/13 22:22:34 INFO Elapsed time: 18.00940981s