Skip to content

Commit f6f23b4

Browse files
feat: support replaying records for newly assigned partitions
This commit supports replaying records for newly assigned partitions so an instance can rebuild it's in-memory usage data before serving queries.
1 parent a580d52 commit f6f23b4

12 files changed

+716
-185
lines changed

pkg/blockbuilder/scheduler/scheduler_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ func (m *mockOffsetManager) ConsumerGroup() string { return m.consumerGroup }
2929
func (m *mockOffsetManager) GroupLag(_ context.Context, _ int64) (map[int32]partition.Lag, error) {
3030
return nil, nil
3131
}
32+
func (m *mockOffsetManager) NextOffset(_ context.Context, _ int32, _ time.Time) (int64, error) {
33+
return 0, nil
34+
}
3235
func (m *mockOffsetManager) FetchLastCommittedOffset(_ context.Context, _ int32) (int64, error) {
3336
return 0, nil
3437
}

pkg/kafka/partition/offset_manager.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"time"
78

89
"github.com/go-kit/log"
910
"github.com/go-kit/log/level"
@@ -72,6 +73,7 @@ type OffsetManager interface {
7273
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error)
7374
FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error)
7475
FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)
76+
NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error)
7577
Commit(ctx context.Context, partition int32, offset int64) error
7678
}
7779

@@ -130,6 +132,31 @@ func (r *KafkaOffsetManager) ConsumerGroup() string {
130132
return r.cfg.GetConsumerGroup(r.instanceID)
131133
}
132134

135+
// NextOffset returns the first offset after the timestamp t. If the partition
136+
// does not have an offset after t, it returns the current end offset.
137+
func (r *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error) {
138+
resp, err := r.adminClient.ListOffsetsAfterMilli(ctx, t.UnixMilli(), r.cfg.Topic)
139+
if err != nil {
140+
return 0, err
141+
}
142+
// If a topic does not exist, a special -1 partition for each non-existing
143+
// topic is added to the response.
144+
partitions := resp[r.cfg.Topic]
145+
if special, ok := partitions[-1]; ok {
146+
return 0, special.Err
147+
}
148+
// If a partition does not exist, it will be missing.
149+
listed, ok := partitions[partition]
150+
if !ok {
151+
return 0, fmt.Errorf("unknown partition %d", partition)
152+
}
153+
// Err is non-nil if the partition has a load error.
154+
if listed.Err != nil {
155+
return 0, listed.Err
156+
}
157+
return listed.Offset, nil
158+
}
159+
133160
// FetchLastCommittedOffset retrieves the last committed offset for this partition
134161
func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) {
135162
req := kmsg.NewPtrOffsetFetchRequest()
@@ -218,7 +245,6 @@ func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partition
218245
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
219246
return 0, err
220247
}
221-
222248
return partition.Offset, nil
223249
}
224250

pkg/limits/http_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ func TestIngestLimits_ServeHTTP(t *testing.T) {
4242
},
4343
logger: log.NewNopLogger(),
4444
partitionManager: &PartitionManager{
45-
partitions: map[int32]int64{
46-
0: time.Now().UnixNano(),
45+
partitions: map[int32]partitionEntry{
46+
0: {
47+
assignedAt: time.Now().UnixNano(),
48+
},
4749
},
4850
},
4951
}

pkg/limits/mock_test.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ import (
66
"testing"
77

88
"github.com/stretchr/testify/require"
9-
10-
"github.com/grafana/loki/v3/pkg/limits/proto"
9+
"github.com/twmb/franz-go/pkg/kgo"
1110
)
1211

1312
type MockLimits struct {
@@ -27,24 +26,26 @@ func (m *MockLimits) IngestionBurstSizeBytes(_ string) int {
2726
return 1000
2827
}
2928

30-
type mockWAL struct {
31-
t *testing.T
32-
NumAppendsTotal int
33-
ExpectedAppendsTotal int
34-
mtx sync.Mutex
35-
}
29+
// mockKafka mocks a [kgo.Client].
30+
type mockKafka struct {
31+
t *testing.T
32+
mtx sync.Mutex
3633

37-
func (m *mockWAL) Append(_ context.Context, _ string, _ *proto.StreamMetadata) error {
38-
m.mtx.Lock()
39-
defer m.mtx.Unlock()
40-
m.NumAppendsTotal++
41-
return nil
34+
expectedNumRecords int
35+
numRecords int
4236
}
4337

44-
func (m *mockWAL) Close() error {
45-
return nil
38+
func (k *mockKafka) Produce(
39+
_ context.Context,
40+
r *kgo.Record,
41+
promise func(*kgo.Record, error),
42+
) {
43+
k.mtx.Lock()
44+
defer k.mtx.Unlock()
45+
k.numRecords++
46+
promise(r, nil)
4647
}
4748

48-
func (m *mockWAL) AssertAppendsTotal() {
49-
require.Equal(m.t, m.ExpectedAppendsTotal, m.NumAppendsTotal)
49+
func (k *mockKafka) AssertExpectedNumRecords() {
50+
require.Equal(k.t, k.expectedNumRecords, k.numRecords)
5051
}

pkg/limits/partition_lifecycler.go

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,39 @@ package limits
22

33
import (
44
"context"
5+
"fmt"
6+
"time"
57

68
"github.com/go-kit/log"
9+
"github.com/go-kit/log/level"
710
"github.com/twmb/franz-go/pkg/kgo"
11+
12+
kafka_partition "github.com/grafana/loki/v3/pkg/kafka/partition"
813
)
914

1015
// PartitionLifecycler manages assignment and revocation of partitions.
1116
type PartitionLifecycler struct {
17+
cfg Config
1218
partitionManager *PartitionManager
19+
offsetManager kafka_partition.OffsetManager
1320
usage *UsageStore
1421
logger log.Logger
1522
}
1623

1724
// NewPartitionLifecycler returns a new PartitionLifecycler.
18-
func NewPartitionLifecycler(partitionManager *PartitionManager, usage *UsageStore, logger log.Logger) *PartitionLifecycler {
25+
func NewPartitionLifecycler(
26+
cfg Config,
27+
partitionManager *PartitionManager,
28+
offsetManager kafka_partition.OffsetManager,
29+
usage *UsageStore,
30+
logger log.Logger,
31+
) *PartitionLifecycler {
1932
return &PartitionLifecycler{
33+
cfg: cfg,
2034
partitionManager: partitionManager,
35+
offsetManager: offsetManager,
2136
usage: usage,
22-
logger: log.With(logger, "component", "limits.PartitionLifecycler"),
37+
logger: logger,
2338
}
2439
}
2540

@@ -29,6 +44,16 @@ func (l *PartitionLifecycler) Assign(ctx context.Context, _ *kgo.Client, topics
2944
// TODO(grobinson): Figure out what to do if this is not the case.
3045
for _, partitions := range topics {
3146
l.partitionManager.Assign(ctx, partitions)
47+
for _, partition := range partitions {
48+
if err := l.checkOffsets(ctx, partition); err != nil {
49+
level.Error(l.logger).Log(
50+
"msg", "failed to check offsets, partition is ready",
51+
"partition", partition,
52+
"err", err,
53+
)
54+
l.partitionManager.SetReady(partition)
55+
}
56+
}
3257
return
3358
}
3459
}
@@ -43,3 +68,55 @@ func (l *PartitionLifecycler) Revoke(ctx context.Context, _ *kgo.Client, topics
4368
return
4469
}
4570
}
71+
72+
func (l *PartitionLifecycler) checkOffsets(ctx context.Context, partition int32) error {
73+
logger := log.With(l.logger, "partition", partition)
74+
// Get the start offset for the partition. This can be greater than zero
75+
// if a retention period has deleted old records.
76+
startOffset, err := l.offsetManager.FetchPartitionOffset(
77+
ctx, partition, kafka_partition.KafkaStartOffset)
78+
if err != nil {
79+
return fmt.Errorf("failed to get last produced offset: %w", err)
80+
}
81+
// The last produced offset is the next offset after the last produced
82+
// record. For example, if a partition contains 1 record, then the last
83+
// produced offset is 1. However, the offset of the last produced record
84+
// is 0, as offsets start from 0.
85+
lastProducedOffset, err := l.offsetManager.FetchPartitionOffset(
86+
ctx, partition, kafka_partition.KafkaEndOffset)
87+
if err != nil {
88+
return fmt.Errorf("failed to get last produced offset: %w", err)
89+
}
90+
// Get the first offset produced within the window. This can be the same
91+
// offset as the last produced offset if no records have been produced
92+
// within that time.
93+
nextOffset, err := l.offsetManager.NextOffset(ctx, partition, time.Now().Add(-l.cfg.WindowSize))
94+
if err != nil {
95+
return fmt.Errorf("failed to get next offset: %w", err)
96+
}
97+
level.Debug(logger).Log(
98+
"msg", "fetched offsets",
99+
"start_offset", startOffset,
100+
"last_produced_offset", lastProducedOffset,
101+
"next_offset", nextOffset,
102+
)
103+
if startOffset >= lastProducedOffset {
104+
// The partition has no records. This happens when either the
105+
// partition has never produced a record, or all records that have
106+
// been produced have been deleted due to the retention period.
107+
level.Debug(logger).Log("msg", "no records in partition, partition is ready")
108+
l.partitionManager.SetReady(partition)
109+
return nil
110+
}
111+
if nextOffset == lastProducedOffset {
112+
level.Debug(logger).Log("msg", "no records within window size, partition is ready")
113+
l.partitionManager.SetReady(partition)
114+
return nil
115+
}
116+
// Since we want to fetch all records up to and including the last
117+
// produced record, we must fetch all records up to and including the
118+
// last produced offset - 1.
119+
level.Debug(logger).Log("msg", "partition is replaying")
120+
l.partitionManager.SetReplaying(partition, lastProducedOffset-1)
121+
return nil
122+
}

0 commit comments

Comments
 (0)