Skip to content

Commit 38c5c4f

Browse files
wip
1 parent c5c2cab commit 38c5c4f

File tree

5 files changed

+131
-38
lines changed

5 files changed

+131
-38
lines changed

pkg/limits/partition_lifecycler.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,33 @@ package limits
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/go-kit/log"
8+
"github.com/go-kit/log/level"
79
"github.com/twmb/franz-go/pkg/kgo"
10+
11+
offset "github.com/grafana/loki/v3/pkg/kafka/partition"
812
)
913

1014
// PartitionLifecycler
1115
type PartitionLifecycler struct {
1216
partitionManager *PartitionManager
17+
offsetManager offset.OffsetManager
1318
usage *UsageStore
1419
logger log.Logger
1520
}
1621

1722
// NewPartitionLifecycler returns a new PartitionLifecycler.
18-
func NewPartitionLifecycler(partitionManager *PartitionManager, usage *UsageStore, logger log.Logger) *PartitionLifecycler {
23+
func NewPartitionLifecycler(
24+
partitionManager *PartitionManager,
25+
offsetManager offset.OffsetManager,
26+
usage *UsageStore,
27+
logger log.Logger,
28+
) *PartitionLifecycler {
1929
return &PartitionLifecycler{
2030
partitionManager: partitionManager,
31+
offsetManager: offsetManager,
2132
usage: usage,
2233
logger: log.With(logger, "component", "limits.PartitionLifecycler"),
2334
}
@@ -27,6 +38,16 @@ func NewPartitionLifecycler(partitionManager *PartitionManager, usage *UsageStor
2738
func (l *PartitionLifecycler) Assign(ctx context.Context, _ *kgo.Client, topics map[string][]int32) {
2839
for _, partitions := range topics {
2940
l.partitionManager.Assign(ctx, partitions)
41+
for _, partition := range partitions {
42+
if err := l.checkOffsets(ctx, partition); err != nil {
43+
level.Error(l.logger).Log(
44+
"msg", "failed to check offsets for partition",
45+
"partition", partition,
46+
"err", err,
47+
)
48+
l.partitionManager.SetState(partition, PartitionReady)
49+
}
50+
}
3051
return
3152
}
3253
}
@@ -39,3 +60,33 @@ func (l *PartitionLifecycler) Revoke(ctx context.Context, _ *kgo.Client, topics
3960
return
4061
}
4162
}
63+
64+
func (l *PartitionLifecycler) checkOffsets(ctx context.Context, partition int32) error {
65+
lastProducedOffset, err := l.offsetManager.FetchPartitionOffset(
66+
ctx, partition, offset.KafkaEndOffset)
67+
if err != nil {
68+
return fmt.Errorf("failed to get last produced offset: %w", err)
69+
}
70+
level.Info(l.logger).Log("lastProducedOffset", lastProducedOffset, "partition", partition)
71+
if lastProducedOffset == 0 {
72+
// The partition has never produced a record which means there is
73+
// nothing to replay and the partition can be marked as ready.
74+
l.partitionManager.SetState(partition, PartitionReady)
75+
return nil
76+
}
77+
// Get the last committed offset.
78+
lastCommittedOffset, err := l.offsetManager.FetchLastCommittedOffset(ctx, partition)
79+
if err != nil {
80+
return fmt.Errorf("failed to get last produced offset: %w", err)
81+
}
82+
level.Info(l.logger).Log("lastProducedOffset", lastProducedOffset, "lastCommittedOffset", lastCommittedOffset, "partition", partition)
83+
if lastCommittedOffset == lastProducedOffset {
84+
// The last produced record is at least as old as the last committed
85+
// offset which means there is nothing to replay either and the
86+
// partition can be marked as ready.
87+
l.partitionManager.SetState(partition, PartitionReady)
88+
return nil
89+
}
90+
l.partitionManager.SetState(partition, PartitionReplaying)
91+
return nil
92+
}

pkg/limits/partition_manager.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/coder/quartz"
88
"github.com/go-kit/log"
9+
"github.com/go-kit/log/level"
910
)
1011

1112
// PartitionState is the state of a partition in [PartitionManager].
@@ -17,6 +18,20 @@ const (
1718
PartitionReady
1819
)
1920

21+
// String implements the [fmt.Stringer] interface.
22+
func (s PartitionState) String() string {
23+
switch s {
24+
case PartitionPending:
25+
return "pending"
26+
case PartitionReplaying:
27+
return "replaying"
28+
case PartitionReady:
29+
return "ready"
30+
default:
31+
return "unknown"
32+
}
33+
}
34+
2035
// PartitionManager keeps track of the partitions assigned and for
2136
// each partition a timestamp of when it was assigned.
2237
type PartitionManager struct {
@@ -72,6 +87,7 @@ func (m *PartitionManager) SetState(partition int32, state PartitionState) bool
7287
defer m.mtx.Unlock()
7388
entry, ok := m.partitions[partition]
7489
if ok {
90+
level.Info(m.logger).Log("msg", "set partition state", "partition", partition, "state", state, "current_state", entry.state)
7591
entry.state = state
7692
m.partitions[partition] = entry
7793
}

pkg/limits/partition_manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ func TestPartitionManager_Revoke(t *testing.T) {
173173
state: PartitionPending,
174174
},
175175
}, m.partitions)
176-
// Revoke partitions 2 and 3.
177-
m.Revoke(context.Background(), []int32{2, 3})
176+
// Remove partitions 2 and 3.
177+
m.Remove(context.Background(), []int32{2, 3})
178178
require.Equal(t, map[int32]partitionEntry{
179179
1: {
180180
assignedAt: now,

pkg/limits/playback.go

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func NewPlaybackManager(
4646
usage: usage,
4747
readinessCheck: readinessCheck,
4848
logger: logger,
49+
clock: quartz.NewReal(),
4950
}
5051
}
5152

@@ -60,55 +61,68 @@ func (m *PlaybackManager) Run(ctx context.Context) error {
6061
case <-ctx.Done():
6162
return ctx.Err()
6263
default:
63-
if err := m.processRecords(ctx); err != nil {
64+
if err := m.pollFetches(ctx); err != nil {
6465
if errors.Is(err, kgo.ErrClientClosed) {
6566
return fmt.Errorf("exited: %w", err)
6667
}
67-
level.Error(m.logger).Log("msg", "failed to process records", "err", err.Error())
68+
level.Error(m.logger).Log("msg", "failed to poll fetches", "err", err.Error())
6869
b.Wait()
6970
}
7071
}
7172
}
7273
return nil
7374
}
7475

75-
func (m *PlaybackManager) processRecords(ctx context.Context) error {
76-
fetches := m.client.PollRecords(ctx, 100)
76+
func (m *PlaybackManager) pollFetches(ctx context.Context) error {
77+
fetches := m.client.PollFetches(ctx)
7778
if err := fetches.Err(); err != nil {
7879
return err
7980
}
80-
iter := fetches.RecordIter()
81-
for !iter.Done() {
82-
if err := m.processRecord(ctx, iter.Next()); err != nil {
83-
level.Error(m.logger).Log("msg", "failed to process record", "err", err.Error())
81+
fetches.EachPartition(m.processFetchTopicPartition(ctx))
82+
return nil
83+
}
84+
85+
func (m *PlaybackManager) processFetchTopicPartition(ctx context.Context) func(kgo.FetchTopicPartition) {
86+
return func(p kgo.FetchTopicPartition) {
87+
state, ok := m.partitionManager.GetState(p.Partition)
88+
if !ok {
89+
level.Warn(m.logger).Log("msg", "discarding records for partition as the partition is not assigned to this client", "partition", p.Partition)
90+
return
91+
}
92+
// Keep a reference to the last processed record. We will use it
93+
// for the readiness check if the partition is replaying.
94+
var lastProcessed *kgo.Record
95+
for _, r := range p.Records {
96+
fmt.Println("processing record", r.Timestamp, "partition", p.Partition)
97+
if err := m.processRecord(ctx, r); err != nil {
98+
level.Error(m.logger).Log("msg", "failed to process record", "err", err.Error())
99+
} else {
100+
lastProcessed = r
101+
}
102+
}
103+
if lastProcessed != nil {
104+
if state == PartitionReplaying {
105+
passed, err := m.readinessCheck(lastProcessed)
106+
if err != nil {
107+
level.Error(m.logger).Log("msg", "failed to run readiness check", "err", err.Error())
108+
} else if passed {
109+
m.partitionManager.SetState(p.Partition, PartitionReady)
110+
}
111+
}
112+
// If the records were more than one hour ago, commit the offset.
113+
// if m.clock.Since(lastProcessed.Timestamp) > time.Second {
114+
// m.client.CommitRecords(ctx, lastProcessed)
115+
// }
84116
}
85117
}
86-
return nil
87118
}
88119

89120
func (m *PlaybackManager) processRecord(ctx context.Context, r *kgo.Record) error {
90-
rec := proto.StreamMetadataRecord{}
91-
if err := rec.Unmarshal(r.Value); err != nil {
121+
s := proto.StreamMetadataRecord{}
122+
if err := s.Unmarshal(r.Value); err != nil {
92123
return fmt.Errorf("corrupted record: %w", err)
93124
}
94-
state, ok := m.partitionManager.GetState(r.Partition)
95-
if !ok {
96-
return fmt.Errorf("partition is not known to PartitionManager: %d", r.Partition)
97-
}
98-
// Check if the partition is ready or needs time to replay.
99-
if state == PartitionPending || state == PartitionReplaying {
100-
ready, err := m.readinessCheck(r)
101-
if err != nil {
102-
level.Error(m.logger).Log("msg", "failed to check readiness for partition %d: %w", r.Partition, err)
103-
} else if ready {
104-
m.partitionManager.SetState(r.Partition, PartitionReady)
105-
}
106-
}
107-
m.usage.Update(
108-
rec.Tenant,
109-
[]*proto.StreamMetadata{rec.Metadata},
110-
r.Timestamp.UnixNano(),
111-
)
125+
m.usage.Update(s.Tenant, []*proto.StreamMetadata{s.Metadata}, r.Timestamp, nil)
112126
return nil
113127
}
114128

pkg/limits/service.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/twmb/franz-go/pkg/kgo"
1717

1818
"github.com/grafana/loki/v3/pkg/kafka/client"
19+
"github.com/grafana/loki/v3/pkg/kafka/partition"
1920
"github.com/grafana/loki/v3/pkg/limits/proto"
2021
"github.com/grafana/loki/v3/pkg/util"
2122
"github.com/grafana/loki/v3/pkg/util/constants"
@@ -91,10 +92,11 @@ func newMetrics(reg prometheus.Registerer) *metrics {
9192
type IngestLimits struct {
9293
services.Service
9394

94-
cfg Config
95-
logger log.Logger
96-
clientReader *kgo.Client
97-
clientWriter *kgo.Client
95+
cfg Config
96+
logger log.Logger
97+
clientReader *kgo.Client
98+
clientWriter *kgo.Client
99+
offsetManager partition.OffsetManager
98100

99101
lifecycler *ring.Lifecycler
100102
lifecyclerWatcher *services.FailureWatcher
@@ -134,9 +136,9 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus.
134136
cfg: cfg,
135137
logger: logger,
136138
usage: NewUsageStore(cfg),
139+
partitionManager: NewPartitionManager(logger),
137140
metrics: newMetrics(reg),
138141
limits: lims,
139-
partitionManager: NewPartitionManager(logger),
140142
clock: quartz.NewReal(),
141143
}
142144

@@ -161,7 +163,16 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus.
161163
kCfg.AutoCreateTopicEnabled = true
162164
kCfg.AutoCreateTopicDefaultPartitions = cfg.NumPartitions
163165

164-
s.partitionLifecycler = NewPartitionLifecycler(s.partitionManager, s.usage, logger)
166+
s.offsetManager, err = partition.NewKafkaOffsetManager(
167+
kCfg,
168+
"ingest-limits",
169+
logger,
170+
prometheus.NewRegistry(),
171+
)
172+
if err != nil {
173+
return nil, fmt.Errorf("failed to create offset manager: %w", err)
174+
}
175+
s.partitionLifecycler = NewPartitionLifecycler(s.partitionManager, s.offsetManager, s.usage, logger)
165176

166177
s.clientReader, err = client.NewReaderClient("ingest-limits-reader", kCfg, logger, reg,
167178
kgo.ConsumerGroup(consumerGroup),
@@ -280,6 +291,7 @@ func (s *IngestLimits) starting(ctx context.Context) (err error) {
280291
func (s *IngestLimits) running(ctx context.Context) error {
281292
// Start the eviction goroutine
282293
go s.evictOldStreamsPeriodic(ctx)
294+
go s.playback.Run(ctx)
283295

284296
for {
285297
select {

0 commit comments

Comments
 (0)