Skip to content

Commit 75b6dc4

Browse files
feat: support replaying records for newly assigned partitions
This commit supports replaying records for newly assigned partitions so an instance can rebuild it's in-memory usage data before serving queries.
1 parent b665282 commit 75b6dc4

9 files changed

+620
-113
lines changed

pkg/blockbuilder/scheduler/scheduler_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ func (m *mockOffsetManager) ConsumerGroup() string { return m.consumerGroup }
2929
func (m *mockOffsetManager) GroupLag(_ context.Context, _ int64) (map[int32]partition.Lag, error) {
3030
return nil, nil
3131
}
32+
func (m *mockOffsetManager) NextOffset(_ context.Context, _ int32, _ time.Time) (int64, error) {
33+
return 0, nil
34+
}
3235
func (m *mockOffsetManager) FetchLastCommittedOffset(_ context.Context, _ int32) (int64, error) {
3336
return 0, nil
3437
}

pkg/kafka/partition/offset_manager.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"time"
78

89
"github.com/go-kit/log"
910
"github.com/go-kit/log/level"
@@ -72,6 +73,7 @@ type OffsetManager interface {
7273
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error)
7374
FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error)
7475
FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)
76+
NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error)
7577
Commit(ctx context.Context, partition int32, offset int64) error
7678
}
7779

@@ -130,6 +132,31 @@ func (r *KafkaOffsetManager) ConsumerGroup() string {
130132
return r.cfg.GetConsumerGroup(r.instanceID)
131133
}
132134

135+
// NextOffset returns the first offset after the timestamp t. If the partition
136+
// does not have an offset after t, it returns the current end offset.
137+
func (r *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error) {
138+
resp, err := r.adminClient.ListOffsetsAfterMilli(ctx, t.UnixMilli(), r.cfg.Topic)
139+
if err != nil {
140+
return 0, err
141+
}
142+
// If a topic does not exist, a special -1 partition for each non-existing
143+
// topic is added to the response.
144+
partitions := resp[r.cfg.Topic]
145+
if special, ok := partitions[-1]; ok {
146+
return 0, special.Err
147+
}
148+
// If a partition does not exist, it will be missing.
149+
listed, ok := partitions[partition]
150+
if !ok {
151+
return 0, fmt.Errorf("unknown partition %d", partition)
152+
}
153+
// Err is non-nil if the partition has a load error.
154+
if listed.Err != nil {
155+
return 0, listed.Err
156+
}
157+
return listed.Offset, nil
158+
}
159+
133160
// FetchLastCommittedOffset retrieves the last committed offset for this partition
134161
func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) {
135162
req := kmsg.NewPtrOffsetFetchRequest()
@@ -218,7 +245,6 @@ func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partition
218245
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
219246
return 0, err
220247
}
221-
222248
return partition.Offset, nil
223249
}
224250

pkg/limits/http_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ func TestIngestLimits_ServeHTTP(t *testing.T) {
4242
},
4343
logger: log.NewNopLogger(),
4444
partitionManager: &PartitionManager{
45-
partitions: map[int32]int64{
46-
0: time.Now().UnixNano(),
45+
partitions: map[int32]partitionEntry{
46+
0: {
47+
assignedAt: time.Now().UnixNano(),
48+
},
4749
},
4850
},
4951
}

pkg/limits/partition_lifecycler.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package limits
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/go-kit/log"
9+
"github.com/go-kit/log/level"
10+
"github.com/twmb/franz-go/pkg/kgo"
11+
12+
kafka_partition "github.com/grafana/loki/v3/pkg/kafka/partition"
13+
)
14+
15+
// PartitionLifecycler manages assignment and revocation of partitions.
16+
type PartitionLifecycler struct {
17+
cfg Config
18+
partitionManager *PartitionManager
19+
offsetManager kafka_partition.OffsetManager
20+
usage *UsageStore
21+
logger log.Logger
22+
}
23+
24+
// NewPartitionLifecycler returns a new PartitionLifecycler.
25+
func NewPartitionLifecycler(
26+
cfg Config,
27+
partitionManager *PartitionManager,
28+
offsetManager kafka_partition.OffsetManager,
29+
usage *UsageStore,
30+
logger log.Logger,
31+
) *PartitionLifecycler {
32+
return &PartitionLifecycler{
33+
cfg: cfg,
34+
partitionManager: partitionManager,
35+
offsetManager: offsetManager,
36+
usage: usage,
37+
logger: logger,
38+
}
39+
}
40+
41+
// Assign implements kgo.OnPartitionsAssigned.
42+
func (l *PartitionLifecycler) Assign(ctx context.Context, _ *kgo.Client, topics map[string][]int32) {
43+
for _, partitions := range topics {
44+
l.partitionManager.Assign(ctx, partitions)
45+
for _, partition := range partitions {
46+
if err := l.checkOffsets(ctx, partition); err != nil {
47+
level.Error(l.logger).Log(
48+
"msg", "failed to check offsets, partition is ready",
49+
"partition", partition,
50+
"err", err,
51+
)
52+
l.partitionManager.SetReady(partition)
53+
}
54+
}
55+
return
56+
}
57+
}
58+
59+
// Revoke implements kgo.OnPartitionsRevoked.
60+
func (l *PartitionLifecycler) Revoke(ctx context.Context, _ *kgo.Client, topics map[string][]int32) {
61+
for _, partitions := range topics {
62+
l.partitionManager.Revoke(ctx, partitions)
63+
l.usage.EvictPartitions(partitions)
64+
return
65+
}
66+
}
67+
68+
func (l *PartitionLifecycler) checkOffsets(ctx context.Context, partition int32) error {
69+
logger := log.With(l.logger, "partition", partition)
70+
// Get the start offset for the partition. This can be greater than zero
71+
// if a retention period has deleted old records.
72+
startOffset, err := l.offsetManager.FetchPartitionOffset(
73+
ctx, partition, kafka_partition.KafkaStartOffset)
74+
if err != nil {
75+
return fmt.Errorf("failed to get last produced offset: %w", err)
76+
}
77+
// The last produced offset is the next offset after the last produced
78+
// record. For example, if a partition contains 1 record, then the last
79+
// produced offset is 1. However, the offset of the record is 0, as
80+
// offsets start from 0.
81+
lastProducedOffset, err := l.offsetManager.FetchPartitionOffset(
82+
ctx, partition, kafka_partition.KafkaEndOffset)
83+
if err != nil {
84+
return fmt.Errorf("failed to get last produced offset: %w", err)
85+
}
86+
// Get the first offset after the window. This can be the same offset as
87+
// the last produced offset if no records have been produced within that
88+
// time.
89+
nextOffset, err := l.offsetManager.NextOffset(ctx, partition, time.Now().Add(-l.cfg.WindowSize))
90+
if err != nil {
91+
return fmt.Errorf("failed to get next offset: %w", err)
92+
}
93+
level.Debug(logger).Log(
94+
"msg", "fetched offsets",
95+
"start_offset", startOffset,
96+
"last_produced_offset", lastProducedOffset,
97+
"next_offset", nextOffset,
98+
)
99+
if startOffset >= lastProducedOffset {
100+
// The partition has no records. This happens when either the
101+
// partition has never produced a record, or all records that have
102+
// been produced have been deleted due to the retention period.
103+
level.Debug(logger).Log("msg", "no records in partition, partition is ready")
104+
l.partitionManager.SetReady(partition)
105+
return nil
106+
}
107+
if nextOffset == lastProducedOffset {
108+
level.Debug(logger).Log("msg", "no records within window size, partition is ready")
109+
l.partitionManager.SetReady(partition)
110+
return nil
111+
}
112+
// Since we want to fetch all records up to and including the last
113+
// produced record, we must fetch all records up to and including the
114+
// last produced offset - 1.
115+
level.Debug(logger).Log("msg", "partition is replaying")
116+
l.partitionManager.SetReplaying(partition, lastProducedOffset-1)
117+
return nil
118+
}

pkg/limits/partition_manager.go

Lines changed: 112 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,43 +5,90 @@ import (
55
"sync"
66

77
"github.com/coder/quartz"
8-
"github.com/go-kit/log"
9-
"github.com/twmb/franz-go/pkg/kgo"
108
)
119

10+
// PartitionState is the state of a partition in [PartitionManager].
11+
type PartitionState int
12+
13+
const (
14+
PartitionPending PartitionState = iota
15+
PartitionReplaying
16+
PartitionReady
17+
)
18+
19+
// String implements the [fmt.Stringer] interface.
20+
func (s PartitionState) String() string {
21+
switch s {
22+
case PartitionPending:
23+
return "pending"
24+
case PartitionReplaying:
25+
return "replaying"
26+
case PartitionReady:
27+
return "ready"
28+
default:
29+
return "unknown"
30+
}
31+
}
32+
1233
// PartitionManager keeps track of the partitions assigned and for
13-
// each partition a timestamp of when it was last updated.
34+
// each partition a timestamp of when it was assigned.
1435
type PartitionManager struct {
15-
// partitions maps partitionID to last updated (unix nanoseconds).
16-
partitions map[int32]int64
36+
partitions map[int32]partitionEntry
1737
mtx sync.Mutex
18-
logger log.Logger
1938

2039
// Used for tests.
2140
clock quartz.Clock
2241
}
2342

43+
// partitionEntry contains metadata about an assigned partition.
44+
type partitionEntry struct {
45+
assignedAt int64
46+
targetOffset int64
47+
state PartitionState
48+
}
49+
2450
// NewPartitionManager returns a new [PartitionManager].
25-
func NewPartitionManager(logger log.Logger) *PartitionManager {
51+
func NewPartitionManager() *PartitionManager {
2652
return &PartitionManager{
27-
partitions: make(map[int32]int64),
28-
logger: log.With(logger, "component", "limits.PartitionManager"),
53+
partitions: make(map[int32]partitionEntry),
2954
clock: quartz.NewReal(),
3055
}
3156
}
3257

3358
// Assign assigns the partitions and sets the last updated timestamp for each
3459
// partition to the current time.
35-
func (m *PartitionManager) Assign(_ context.Context, _ *kgo.Client, topicPartitions map[string][]int32) {
60+
func (m *PartitionManager) Assign(_ context.Context, partitions []int32) {
3661
m.mtx.Lock()
3762
defer m.mtx.Unlock()
38-
for _, partitions := range topicPartitions {
39-
for _, partition := range partitions {
40-
m.partitions[partition] = m.clock.Now().UnixNano()
63+
for _, partition := range partitions {
64+
m.partitions[partition] = partitionEntry{
65+
assignedAt: m.clock.Now().UnixNano(),
66+
state: PartitionPending,
4167
}
4268
}
4369
}
4470

71+
// GetState returns the current state of the partition. It returns false
72+
// if the partition does not exist.
73+
func (m *PartitionManager) GetState(partition int32) (PartitionState, bool) {
74+
m.mtx.Lock()
75+
defer m.mtx.Unlock()
76+
entry, ok := m.partitions[partition]
77+
return entry.state, ok
78+
}
79+
80+
// TargetOffsetReached returns true if the partition is replaying and the
81+
// target offset has been reached.
82+
func (m *PartitionManager) TargetOffsetReached(partition int32, offset int64) bool {
83+
m.mtx.Lock()
84+
defer m.mtx.Unlock()
85+
entry, ok := m.partitions[partition]
86+
if ok {
87+
return entry.state == PartitionReplaying && entry.targetOffset <= offset
88+
}
89+
return false
90+
}
91+
4592
// Has returns true if the partition is assigned, otherwise false.
4693
func (m *PartitionManager) Has(partition int32) bool {
4794
m.mtx.Lock()
@@ -50,23 +97,66 @@ func (m *PartitionManager) Has(partition int32) bool {
5097
return ok
5198
}
5299

53-
// List returns a map of all assigned partitions and their last updated timestamps.
100+
// List returns a map of all assigned partitions and their last updated
101+
// timestamps.
54102
func (m *PartitionManager) List() map[int32]int64 {
55103
m.mtx.Lock()
56104
defer m.mtx.Unlock()
57-
v := make(map[int32]int64)
58-
for partition, lastUpdated := range m.partitions {
59-
v[partition] = lastUpdated
105+
result := make(map[int32]int64)
106+
for partition, entry := range m.partitions {
107+
result[partition] = entry.assignedAt
60108
}
61-
return v
109+
return result
62110
}
63111

64-
func (m *PartitionManager) Remove(_ context.Context, _ *kgo.Client, topicPartitions map[string][]int32) {
112+
// ListByState returns all partitions with the specified state and their last
113+
// updated timestamps.
114+
func (m *PartitionManager) ListByState(state PartitionState) map[int32]int64 {
65115
m.mtx.Lock()
66116
defer m.mtx.Unlock()
67-
for _, partitions := range topicPartitions {
68-
for _, partition := range partitions {
69-
delete(m.partitions, partition)
117+
result := make(map[int32]int64)
118+
for partition, entry := range m.partitions {
119+
if entry.state == state {
120+
result[partition] = entry.assignedAt
70121
}
71122
}
123+
return result
124+
}
125+
126+
// SetReplaying sets the partition as replaying and the offset that must
127+
// be consumed for it to become ready. It returns false if the partition
128+
// does not exist.
129+
func (m *PartitionManager) SetReplaying(partition int32, offset int64) bool {
130+
m.mtx.Lock()
131+
defer m.mtx.Unlock()
132+
entry, ok := m.partitions[partition]
133+
if ok {
134+
entry.state = PartitionReplaying
135+
entry.targetOffset = offset
136+
m.partitions[partition] = entry
137+
}
138+
return ok
139+
}
140+
141+
// SetReady sets the partition as ready. It returns false if the partition
142+
// does not exist.
143+
func (m *PartitionManager) SetReady(partition int32) bool {
144+
m.mtx.Lock()
145+
defer m.mtx.Unlock()
146+
entry, ok := m.partitions[partition]
147+
if ok {
148+
entry.state = PartitionReady
149+
entry.targetOffset = 0
150+
m.partitions[partition] = entry
151+
}
152+
return ok
153+
}
154+
155+
// Revoke deletes the partitions.
156+
func (m *PartitionManager) Revoke(_ context.Context, partitions []int32) {
157+
m.mtx.Lock()
158+
defer m.mtx.Unlock()
159+
for _, partition := range partitions {
160+
delete(m.partitions, partition)
161+
}
72162
}

0 commit comments

Comments
 (0)