Skip to content

Commit a580d52

Browse files
chore: add PartitionLifecycler to manage assignment and revocation (#17709)
1 parent 973ddbf commit a580d52

File tree

5 files changed

+76
-61
lines changed

5 files changed

+76
-61
lines changed

pkg/limits/partition_lifecycler.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package limits
2+
3+
import (
4+
"context"
5+
6+
"github.com/go-kit/log"
7+
"github.com/twmb/franz-go/pkg/kgo"
8+
)
9+
10+
// PartitionLifecycler manages assignment and revocation of partitions.
11+
type PartitionLifecycler struct {
12+
partitionManager *PartitionManager
13+
usage *UsageStore
14+
logger log.Logger
15+
}
16+
17+
// NewPartitionLifecycler returns a new PartitionLifecycler.
18+
func NewPartitionLifecycler(partitionManager *PartitionManager, usage *UsageStore, logger log.Logger) *PartitionLifecycler {
19+
return &PartitionLifecycler{
20+
partitionManager: partitionManager,
21+
usage: usage,
22+
logger: log.With(logger, "component", "limits.PartitionLifecycler"),
23+
}
24+
}
25+
26+
// Assign implements kgo.OnPartitionsAssigned.
27+
func (l *PartitionLifecycler) Assign(ctx context.Context, _ *kgo.Client, topics map[string][]int32) {
28+
// We expect the client to just consume one topic.
29+
// TODO(grobinson): Figure out what to do if this is not the case.
30+
for _, partitions := range topics {
31+
l.partitionManager.Assign(ctx, partitions)
32+
return
33+
}
34+
}
35+
36+
// Revoke implements kgo.OnPartitionsRevoked.
37+
func (l *PartitionLifecycler) Revoke(ctx context.Context, _ *kgo.Client, topics map[string][]int32) {
38+
// We expect the client to just consume one topic.
39+
// TODO(grobinson): Figure out what to do if this is not the case.
40+
for _, partitions := range topics {
41+
l.partitionManager.Revoke(ctx, partitions)
42+
l.usage.EvictPartitions(partitions)
43+
return
44+
}
45+
}

pkg/limits/partition_manager.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@ import (
66

77
"github.com/coder/quartz"
88
"github.com/go-kit/log"
9-
"github.com/twmb/franz-go/pkg/kgo"
109
)
1110

12-
// PartitionManager keeps track of the partitions assigned and for
13-
// each partition a timestamp of when it was last updated.
11+
// PartitionManager keeps track of the partitions assigned and the timestamp
12+
// of when it was assigned.
1413
type PartitionManager struct {
15-
// partitions maps partitionID to last updated (unix nanoseconds).
1614
partitions map[int32]int64
1715
mtx sync.Mutex
1816
logger log.Logger
@@ -30,15 +28,12 @@ func NewPartitionManager(logger log.Logger) *PartitionManager {
3028
}
3129
}
3230

33-
// Assign assigns the partitions and sets the last updated timestamp for each
34-
// partition to the current time.
35-
func (m *PartitionManager) Assign(_ context.Context, _ *kgo.Client, topicPartitions map[string][]int32) {
31+
// Assign assigns the partitions.
32+
func (m *PartitionManager) Assign(_ context.Context, partitions []int32) {
3633
m.mtx.Lock()
3734
defer m.mtx.Unlock()
38-
for _, partitions := range topicPartitions {
39-
for _, partition := range partitions {
40-
m.partitions[partition] = m.clock.Now().UnixNano()
41-
}
35+
for _, partition := range partitions {
36+
m.partitions[partition] = m.clock.Now().UnixNano()
4237
}
4338
}
4439

@@ -50,23 +45,23 @@ func (m *PartitionManager) Has(partition int32) bool {
5045
return ok
5146
}
5247

53-
// List returns a map of all assigned partitions and their last updated timestamps.
48+
// List returns a map of all assigned partitions and the timestamp of when
49+
// each partition was assigned.
5450
func (m *PartitionManager) List() map[int32]int64 {
5551
m.mtx.Lock()
5652
defer m.mtx.Unlock()
57-
v := make(map[int32]int64)
53+
result := make(map[int32]int64)
5854
for partition, lastUpdated := range m.partitions {
59-
v[partition] = lastUpdated
55+
result[partition] = lastUpdated
6056
}
61-
return v
57+
return result
6258
}
6359

64-
func (m *PartitionManager) Remove(_ context.Context, _ *kgo.Client, topicPartitions map[string][]int32) {
60+
// Revoke revokes the partitions.
61+
func (m *PartitionManager) Revoke(_ context.Context, partitions []int32) {
6562
m.mtx.Lock()
6663
defer m.mtx.Unlock()
67-
for _, partitions := range topicPartitions {
68-
for _, partition := range partitions {
69-
delete(m.partitions, partition)
70-
}
64+
for _, partition := range partitions {
65+
delete(m.partitions, partition)
7166
}
7267
}

pkg/limits/partition_manager_test.go

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ func TestPartitionManager_Assign(t *testing.T) {
1717
// Advance the clock so we compare with a time that is not the default
1818
// value.
1919
c.Advance(1)
20-
m.Assign(context.Background(), nil, map[string][]int32{
21-
"foo": {1, 2, 3},
22-
})
20+
m.Assign(context.Background(), []int32{1, 2, 3})
2321
// Assert that the partitions were assigned and the timestamps are set to
2422
// the current time.
2523
now := c.Now().UnixNano()
@@ -32,9 +30,7 @@ func TestPartitionManager_Assign(t *testing.T) {
3230
// partition #4. We expect the updated timestamp is equal to the advanced
3331
// time.
3432
c.Advance(1)
35-
m.Assign(context.Background(), nil, map[string][]int32{
36-
"foo": {3, 4},
37-
})
33+
m.Assign(context.Background(), []int32{3, 4})
3834
later := c.Now().UnixNano()
3935
require.Equal(t, map[int32]int64{
4036
1: now,
@@ -48,9 +44,7 @@ func TestPartitionManager_Has(t *testing.T) {
4844
m := NewPartitionManager(log.NewNopLogger())
4945
c := quartz.NewMock(t)
5046
m.clock = c
51-
m.Assign(context.Background(), nil, map[string][]int32{
52-
"foo": {1, 2, 3},
53-
})
47+
m.Assign(context.Background(), []int32{1, 2, 3})
5448
require.True(t, m.Has(1))
5549
require.True(t, m.Has(2))
5650
require.True(t, m.Has(3))
@@ -64,9 +58,7 @@ func TestPartitionManager_List(t *testing.T) {
6458
// Advance the clock so we compare with a time that is not the default
6559
// value.
6660
c.Advance(1)
67-
m.Assign(context.Background(), nil, map[string][]int32{
68-
"foo": {1, 2, 3},
69-
})
61+
m.Assign(context.Background(), []int32{1, 2, 3})
7062
now := c.Now().UnixNano()
7163
result := m.List()
7264
require.Equal(t, map[int32]int64{
@@ -81,13 +73,11 @@ func TestPartitionManager_List(t *testing.T) {
8173
require.NotEqual(t, p1, p2)
8274
}
8375

84-
func TestPartitionManager_Remove(t *testing.T) {
76+
func TestPartitionManager_Revoke(t *testing.T) {
8577
m := NewPartitionManager(log.NewNopLogger())
8678
c := quartz.NewMock(t)
8779
m.clock = c
88-
m.Assign(context.Background(), nil, map[string][]int32{
89-
"foo": {1, 2, 3},
90-
})
80+
m.Assign(context.Background(), []int32{1, 2, 3})
9181
// Assert that the partitions were assigned and the timestamps are set to
9282
// the current time.
9383
now := c.Now().UnixNano()
@@ -96,10 +86,8 @@ func TestPartitionManager_Remove(t *testing.T) {
9686
2: now,
9787
3: now,
9888
}, m.partitions)
99-
// Remove partitions 2 and 3.
100-
m.Remove(context.Background(), nil, map[string][]int32{
101-
"foo": {2, 3},
102-
})
89+
// Revoke partitions 2 and 3.
90+
m.Revoke(context.Background(), []int32{2, 3})
10391
require.Equal(t, map[int32]int64{
10492
1: now,
10593
}, m.partitions)

pkg/limits/service.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ type IngestLimits struct {
110110
wal WAL
111111

112112
// Track partition assignments
113-
partitionManager *PartitionManager
113+
partitionManager *PartitionManager
114+
partitionLifecycler *PartitionLifecycler
114115

115116
// Used for tests.
116117
clock quartz.Clock
@@ -138,6 +139,8 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus.
138139
clock: quartz.NewReal(),
139140
}
140141

142+
s.partitionLifecycler = NewPartitionLifecycler(s.partitionManager, s.usage, logger)
143+
141144
// Initialize internal metadata metrics
142145
if err := reg.Register(s); err != nil {
143146
return nil, fmt.Errorf("failed to register ingest limits internal metadata metrics: %w", err)
@@ -165,8 +168,8 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus.
165168
kgo.Balancers(kgo.CooperativeStickyBalancer()),
166169
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(s.clock.Now().Add(-s.cfg.WindowSize).UnixMilli())),
167170
kgo.DisableAutoCommit(),
168-
kgo.OnPartitionsAssigned(s.onPartitionsAssigned),
169-
kgo.OnPartitionsRevoked(s.onPartitionsRevoked),
171+
kgo.OnPartitionsAssigned(s.partitionLifecycler.Assign),
172+
kgo.OnPartitionsRevoked(s.partitionLifecycler.Revoke),
170173
)
171174
if err != nil {
172175
return nil, fmt.Errorf("failed to create kafka client: %w", err)
@@ -225,18 +228,6 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) {
225228
)
226229
}
227230

228-
func (s *IngestLimits) onPartitionsAssigned(ctx context.Context, client *kgo.Client, partitions map[string][]int32) {
229-
s.partitionManager.Assign(ctx, client, partitions)
230-
}
231-
232-
func (s *IngestLimits) onPartitionsRevoked(ctx context.Context, client *kgo.Client, partitions map[string][]int32) {
233-
s.partitionManager.Remove(ctx, client, partitions)
234-
235-
for _, ids := range partitions {
236-
s.usage.EvictPartitions(ids)
237-
}
238-
}
239-
240231
func (s *IngestLimits) CheckReady(ctx context.Context) error {
241232
if s.State() != services.Running {
242233
return fmt.Errorf("service is not running: %v", s.State())

pkg/limits/service_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,10 +338,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
338338
}
339339

340340
// Assign the Partition IDs.
341-
partitions := make(map[string][]int32)
342-
partitions["test"] = make([]int32, 0, len(tt.assignedPartitions))
343-
partitions["test"] = append(partitions["test"], tt.assignedPartitions...)
344-
s.partitionManager.Assign(context.Background(), nil, partitions)
341+
s.partitionManager.Assign(context.Background(), tt.assignedPartitions)
345342

346343
// Call ExceedsLimits.
347344
req := &proto.ExceedsLimitsRequest{
@@ -429,8 +426,7 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) {
429426
}
430427

431428
// Assign the Partition IDs.
432-
partitions := map[string][]int32{"tenant1": {0}}
433-
s.partitionManager.Assign(context.Background(), nil, partitions)
429+
s.partitionManager.Assign(context.Background(), []int32{0})
434430

435431
// Run concurrent requests
436432
concurrency := 10

0 commit comments

Comments
 (0)