Skip to content

Commit 3939ecc

Browse files
feat: discard records for our own zone
1 parent f6f23b4 commit 3939ecc

File tree

4 files changed

+62
-29
lines changed

4 files changed

+62
-29
lines changed

pkg/limits/playback_manager.go

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"strconv"
87
"time"
98

109
"github.com/coder/quartz"
@@ -33,11 +32,15 @@ type PlaybackManager struct {
3332
// readinessCheck checks if a waiting or replaying partition can be
3433
// switched to ready.
3534
readinessCheck PartitionReadinessCheck
36-
logger log.Logger
35+
// zone is used to discard our own records.
36+
zone string
37+
logger log.Logger
3738

3839
// Metrics.
39-
recordsProcessed *prometheus.CounterVec
4040
lag prometheus.Histogram
41+
recordsFetched prometheus.Counter
42+
recordsDiscarded prometheus.Counter
43+
recordsInvalid prometheus.Counter
4144

4245
// Used for tests.
4346
clock quartz.Clock
@@ -49,6 +52,7 @@ func NewPlaybackManager(
4952
partitionManager *PartitionManager,
5053
usage *UsageStore,
5154
readinessCheck PartitionReadinessCheck,
55+
zone string,
5256
logger log.Logger,
5357
reg prometheus.Registerer,
5458
) *PlaybackManager {
@@ -57,15 +61,9 @@ func NewPlaybackManager(
5761
partitionManager: partitionManager,
5862
usage: usage,
5963
readinessCheck: readinessCheck,
64+
zone: zone,
6065
logger: logger,
6166
clock: quartz.NewReal(),
62-
recordsProcessed: promauto.With(reg).NewCounterVec(
63-
prometheus.CounterOpts{
64-
Name: "loki_ingest_limits_records_processed_total",
65-
Help: "The total number of records processed.",
66-
},
67-
[]string{"partition"},
68-
),
6967
lag: promauto.With(reg).NewHistogram(
7068
prometheus.HistogramOpts{
7169
Name: "loki_ingest_limits_lag_seconds",
@@ -76,6 +74,24 @@ func NewPlaybackManager(
7674
Buckets: prometheus.ExponentialBuckets(0.125, 2, 18),
7775
},
7876
),
77+
recordsFetched: promauto.With(reg).NewCounter(
78+
prometheus.CounterOpts{
79+
Name: "loki_ingest_limits_records_fetched_total",
80+
Help: "The total number of records fetched.",
81+
},
82+
),
83+
recordsDiscarded: promauto.With(reg).NewCounter(
84+
prometheus.CounterOpts{
85+
Name: "loki_ingest_limits_records_discarded_total",
86+
Help: "The total number of records discarded.",
87+
},
88+
),
89+
recordsInvalid: promauto.With(reg).NewCounter(
90+
prometheus.CounterOpts{
91+
Name: "loki_ingest_limits_records_invalid_total",
92+
Help: "The total number of invalid records.",
93+
},
94+
),
7995
}
8096
}
8197

@@ -121,26 +137,26 @@ func (m *PlaybackManager) processFetchTopicPartition(ctx context.Context) func(k
121137
return
122138
}
123139
logger := log.With(m.logger, "partition", p.Partition)
140+
m.recordsFetched.Add(float64(len(p.Records)))
141+
// We need the state of the partition so we can discard any records
142+
// that we produced (unless replaying) and mark a replaying partition
143+
// as ready once it has finished replaying.
124144
state, ok := m.partitionManager.GetState(p.Partition)
125145
if !ok {
146+
m.recordsDiscarded.Add(float64(len(p.Records)))
126147
level.Warn(logger).Log("msg", "discarding records for partition as the partition is not assigned to this client")
127148
return
128149
}
129-
if state == PartitionReplaying {
130-
// TODO(grobinson): For now we just consume records when replaying
131-
// a partition. In a future commit this will be moved outside of
132-
// this check, and records will be consumed both when replaying
133-
// newly assigned partitions and merging records from other zones.
134-
for _, r := range p.Records {
135-
if err := m.processRecord(ctx, r); err != nil {
136-
level.Error(logger).Log("msg", "failed to process record", "err", err.Error())
137-
}
150+
for _, r := range p.Records {
151+
if err := m.processRecord(ctx, state, r); err != nil {
152+
level.Error(logger).Log("msg", "failed to process record", "err", err.Error())
138153
}
139-
m.recordsProcessed.
140-
WithLabelValues(strconv.FormatInt(int64(p.Partition), 10)).
141-
Add(float64(len(p.Records)))
142-
m.lag.Observe(m.clock.Since(p.Records[len(p.Records)-1].Timestamp).Seconds())
143-
passed, err := m.readinessCheck(p.Partition, p.Records[len(p.Records)-1])
154+
}
155+
// Get the last record (has the latest offset and timestamp).
156+
lastRecord := p.Records[len(p.Records)-1]
157+
m.lag.Observe(m.clock.Since(lastRecord.Timestamp).Seconds())
158+
if state == PartitionReplaying {
159+
passed, err := m.readinessCheck(p.Partition, lastRecord)
144160
if err != nil {
145161
level.Error(logger).Log("msg", "failed to run readiness check", "err", err.Error())
146162
} else if passed {
@@ -151,11 +167,17 @@ func (m *PlaybackManager) processFetchTopicPartition(ctx context.Context) func(k
151167
}
152168
}
153169

154-
func (m *PlaybackManager) processRecord(_ context.Context, r *kgo.Record) error {
170+
func (m *PlaybackManager) processRecord(_ context.Context, state PartitionState, r *kgo.Record) error {
155171
s := proto.StreamMetadataRecord{}
156172
if err := s.Unmarshal(r.Value); err != nil {
173+
m.recordsInvalid.Inc()
157174
return fmt.Errorf("corrupted record: %w", err)
158175
}
176+
if state == PartitionReady && m.zone == s.Zone {
177+
// Discard our own records so we don't count the same streams twice.
178+
m.recordsDiscarded.Inc()
179+
return nil
180+
}
159181
m.usage.Update(s.Tenant, []*proto.StreamMetadata{s.Metadata}, r.Timestamp, nil)
160182
return nil
161183
}

pkg/limits/sender.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,20 @@ type Sender struct {
2626
// set in the client.
2727
topic string
2828
partitions int
29+
zone string
2930
logger log.Logger
3031

3132
produced prometheus.Counter
3233
producedFailed prometheus.Counter
3334
}
3435

3536
// NewSender returns a new Sender.
36-
func NewSender(producer Producer, topic string, partitions int, logger log.Logger, reg prometheus.Registerer) *Sender {
37+
func NewSender(producer Producer, topic string, partitions int, zone string, logger log.Logger, reg prometheus.Registerer) *Sender {
3738
return &Sender{
3839
producer: producer,
3940
topic: topic,
4041
partitions: partitions,
42+
zone: zone,
4143
logger: logger,
4244
produced: promauto.With(reg).NewCounter(
4345
prometheus.CounterOpts{
@@ -59,6 +61,7 @@ func NewSender(producer Producer, topic string, partitions int, logger log.Logge
5961
// complete.
6062
func (s *Sender) Produce(ctx context.Context, tenant string, metadata *proto.StreamMetadata) error {
6163
v := proto.StreamMetadataRecord{
64+
Zone: s.zone,
6265
Tenant: tenant,
6366
Metadata: metadata,
6467
}

pkg/limits/service.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,18 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus.
186186
s.partitionManager,
187187
s.usage,
188188
NewOffsetReadinessCheck(s.partitionManager),
189+
cfg.LifecyclerConfig.Zone,
190+
logger,
191+
reg,
192+
)
193+
s.sender = NewSender(
194+
s.clientWriter,
195+
kCfg.Topic,
196+
s.cfg.NumPartitions,
197+
cfg.LifecyclerConfig.Zone,
189198
logger,
190199
reg,
191200
)
192-
s.sender = NewSender(s.clientWriter, kCfg.Topic, s.cfg.NumPartitions, logger, reg)
193201

194202
s.Service = services.NewBasicService(s.starting, s.running, s.stopping)
195203
return s, nil

pkg/limits/service_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
334334
usage: tt.usage,
335335
partitionManager: NewPartitionManager(),
336336
clock: clock,
337-
sender: NewSender(&kafkaClient, "test", tt.numPartitions, log.NewNopLogger(), reg),
337+
sender: NewSender(&kafkaClient, "test", tt.numPartitions, "", log.NewNopLogger(), reg),
338338
}
339339

340340
// Assign the Partition IDs.
@@ -423,7 +423,7 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) {
423423
metrics: newMetrics(reg),
424424
limits: limits,
425425
clock: clock,
426-
sender: NewSender(&kafkaClient, "test", 1, log.NewNopLogger(), reg),
426+
sender: NewSender(&kafkaClient, "test", 1, "", log.NewNopLogger(), reg),
427427
}
428428

429429
// Assign the Partition IDs.

0 commit comments

Comments
 (0)