Skip to content

Commit c5c2cab

Browse files
feat: create partition lifecycler to manage assignment and revocation
1 parent 422a2d0 commit c5c2cab

6 files changed

+98
-93
lines changed

pkg/limits/http_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestIngestLimits_ServeHTTP(t *testing.T) {
4444
partitionManager: &PartitionManager{
4545
partitions: map[int32]partitionEntry{
4646
0: {
47-
lastUpdated: time.Now().UnixNano(),
47+
assignedAt: time.Now().UnixNano(),
4848
},
4949
},
5050
},

pkg/limits/partition_lifecycler.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package limits
2+
3+
import (
4+
"context"
5+
6+
"github.com/go-kit/log"
7+
"github.com/twmb/franz-go/pkg/kgo"
8+
)
9+
10+
// PartitionLifecycler
11+
type PartitionLifecycler struct {
12+
partitionManager *PartitionManager
13+
usage *UsageStore
14+
logger log.Logger
15+
}
16+
17+
// NewPartitionLifecycler returns a new PartitionLifecycler.
18+
func NewPartitionLifecycler(partitionManager *PartitionManager, usage *UsageStore, logger log.Logger) *PartitionLifecycler {
19+
return &PartitionLifecycler{
20+
partitionManager: partitionManager,
21+
usage: usage,
22+
logger: log.With(logger, "component", "limits.PartitionLifecycler"),
23+
}
24+
}
25+
26+
// Assign implements kgo.OnPartitionsAssigned.
27+
func (l *PartitionLifecycler) Assign(ctx context.Context, _ *kgo.Client, topics map[string][]int32) {
28+
for _, partitions := range topics {
29+
l.partitionManager.Assign(ctx, partitions)
30+
return
31+
}
32+
}
33+
34+
// Revoke implements kgo.OnPartitionsRevoked.
35+
func (l *PartitionLifecycler) Revoke(ctx context.Context, _ *kgo.Client, topics map[string][]int32) {
36+
for _, partitions := range topics {
37+
l.partitionManager.Revoke(ctx, partitions)
38+
l.usage.EvictPartitions(partitions)
39+
return
40+
}
41+
}

pkg/limits/partition_manager.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@ import (
66

77
"github.com/coder/quartz"
88
"github.com/go-kit/log"
9-
"github.com/twmb/franz-go/pkg/kgo"
109
)
1110

12-
// PartitionState contains the current state of an assigned partition.
11+
// PartitionState is the state of a partition in [PartitionManager].
1312
type PartitionState int
1413

1514
const (
@@ -19,20 +18,20 @@ const (
1918
)
2019

2120
// PartitionManager keeps track of the partitions assigned and for
22-
// each partition a timestamp of when it was last updated.
21+
// each partition a timestamp of when it was assigned.
2322
type PartitionManager struct {
2423
partitions map[int32]partitionEntry
25-
mtx sync.Mutex
2624
logger log.Logger
25+
mtx sync.Mutex
2726

2827
// Used for tests.
2928
clock quartz.Clock
3029
}
3130

3231
// partitionEntry contains metadata about an assigned partition.
3332
type partitionEntry struct {
34-
lastUpdated int64
35-
state PartitionState
33+
assignedAt int64
34+
state PartitionState
3635
}
3736

3837
// NewPartitionManager returns a new [PartitionManager].
@@ -46,15 +45,13 @@ func NewPartitionManager(logger log.Logger) *PartitionManager {
4645

4746
// Assign assigns the partitions and sets the last updated timestamp for each
4847
// partition to the current time.
49-
func (m *PartitionManager) Assign(_ context.Context, _ *kgo.Client, topicPartitions map[string][]int32) {
48+
func (m *PartitionManager) Assign(_ context.Context, partitions []int32) {
5049
m.mtx.Lock()
5150
defer m.mtx.Unlock()
52-
for _, partitions := range topicPartitions {
53-
for _, partition := range partitions {
54-
m.partitions[partition] = partitionEntry{
55-
lastUpdated: m.clock.Now().UnixNano(),
56-
state: PartitionPending,
57-
}
51+
for _, partition := range partitions {
52+
m.partitions[partition] = partitionEntry{
53+
assignedAt: m.clock.Now().UnixNano(),
54+
state: PartitionPending,
5855
}
5956
}
6057
}
@@ -96,7 +93,7 @@ func (m *PartitionManager) List() map[int32]int64 {
9693
defer m.mtx.Unlock()
9794
result := make(map[int32]int64)
9895
for partition, entry := range m.partitions {
99-
result[partition] = entry.lastUpdated
96+
result[partition] = entry.assignedAt
10097
}
10198
return result
10299
}
@@ -109,18 +106,16 @@ func (m *PartitionManager) ListByState(state PartitionState) map[int32]int64 {
109106
result := make(map[int32]int64)
110107
for partition, entry := range m.partitions {
111108
if entry.state == state {
112-
result[partition] = entry.lastUpdated
109+
result[partition] = entry.assignedAt
113110
}
114111
}
115112
return result
116113
}
117114

118-
func (m *PartitionManager) Remove(_ context.Context, _ *kgo.Client, topicPartitions map[string][]int32) {
115+
func (m *PartitionManager) Revoke(_ context.Context, partitions []int32) {
119116
m.mtx.Lock()
120117
defer m.mtx.Unlock()
121-
for _, partitions := range topicPartitions {
122-
for _, partition := range partitions {
123-
delete(m.partitions, partition)
124-
}
118+
for _, partition := range partitions {
119+
delete(m.partitions, partition)
125120
}
126121
}

pkg/limits/partition_manager_test.go

Lines changed: 33 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,50 +17,46 @@ func TestPartitionManager_Assign(t *testing.T) {
1717
// Advance the clock so we compare with a time that is not the default
1818
// value.
1919
c.Advance(1)
20-
m.Assign(context.Background(), nil, map[string][]int32{
21-
"foo": {1, 2, 3},
22-
})
20+
m.Assign(context.Background(), []int32{1, 2, 3})
2321
// Assert that the partitions were assigned and the timestamps are set to
2422
// the current time.
2523
now := c.Now().UnixNano()
2624
require.Equal(t, map[int32]partitionEntry{
2725
1: {
28-
lastUpdated: now,
29-
state: PartitionPending,
26+
assignedAt: now,
27+
state: PartitionPending,
3028
},
3129
2: {
32-
lastUpdated: now,
33-
state: PartitionPending,
30+
assignedAt: now,
31+
state: PartitionPending,
3432
},
3533
3: {
36-
lastUpdated: now,
37-
state: PartitionPending,
34+
assignedAt: now,
35+
state: PartitionPending,
3836
},
3937
}, m.partitions)
4038
// Advance the clock again, re-assign partition #3 and assign a new
4139
// partition #4. We expect the updated timestamp is equal to the advanced
4240
// time.
4341
c.Advance(1)
44-
m.Assign(context.Background(), nil, map[string][]int32{
45-
"foo": {3, 4},
46-
})
42+
m.Assign(context.Background(), []int32{3, 4})
4743
later := c.Now().UnixNano()
4844
require.Equal(t, map[int32]partitionEntry{
4945
1: {
50-
lastUpdated: now,
51-
state: PartitionPending,
46+
assignedAt: now,
47+
state: PartitionPending,
5248
},
5349
2: {
54-
lastUpdated: now,
55-
state: PartitionPending,
50+
assignedAt: now,
51+
state: PartitionPending,
5652
},
5753
3: {
58-
lastUpdated: later,
59-
state: PartitionPending,
54+
assignedAt: later,
55+
state: PartitionPending,
6056
},
6157
4: {
62-
lastUpdated: later,
63-
state: PartitionPending,
58+
assignedAt: later,
59+
state: PartitionPending,
6460
},
6561
}, m.partitions)
6662
}
@@ -69,9 +65,7 @@ func TestPartitionManager_GetState(t *testing.T) {
6965
m := NewPartitionManager(log.NewNopLogger())
7066
c := quartz.NewMock(t)
7167
m.clock = c
72-
m.Assign(context.Background(), nil, map[string][]int32{
73-
"foo": {1, 2, 3},
74-
})
68+
m.Assign(context.Background(), []int32{1, 2, 3})
7569
// Getting the state for an assigned partition should return true.
7670
state, ok := m.GetState(1)
7771
require.True(t, ok)
@@ -85,9 +79,7 @@ func TestPartitionManager_SetState(t *testing.T) {
8579
m := NewPartitionManager(log.NewNopLogger())
8680
c := quartz.NewMock(t)
8781
m.clock = c
88-
m.Assign(context.Background(), nil, map[string][]int32{
89-
"foo": {1, 2, 3},
90-
})
82+
m.Assign(context.Background(), []int32{1, 2, 3})
9183
// Setting the state for an assigned partition should return true.
9284
require.True(t, m.SetState(1, PartitionReady))
9385
state, ok := m.GetState(1)
@@ -101,9 +93,7 @@ func TestPartitionManager_Has(t *testing.T) {
10193
m := NewPartitionManager(log.NewNopLogger())
10294
c := quartz.NewMock(t)
10395
m.clock = c
104-
m.Assign(context.Background(), nil, map[string][]int32{
105-
"foo": {1, 2, 3},
106-
})
96+
m.Assign(context.Background(), []int32{1, 2, 3})
10797
require.True(t, m.Has(1))
10898
require.True(t, m.Has(2))
10999
require.True(t, m.Has(3))
@@ -117,9 +107,7 @@ func TestPartitionManager_List(t *testing.T) {
117107
// Advance the clock so we compare with a time that is not the default
118108
// value.
119109
c.Advance(1)
120-
m.Assign(context.Background(), nil, map[string][]int32{
121-
"foo": {1, 2, 3},
122-
})
110+
m.Assign(context.Background(), []int32{1, 2, 3})
123111
now := c.Now().UnixNano()
124112
result := m.List()
125113
require.Equal(t, map[int32]int64{
@@ -141,9 +129,7 @@ func TestPartitionManager_ListByState(t *testing.T) {
141129
// Advance the clock so we compare with a time that is not the default
142130
// value.
143131
c.Advance(1)
144-
m.Assign(context.Background(), nil, map[string][]int32{
145-
"foo": {1, 2, 3},
146-
})
132+
m.Assign(context.Background(), []int32{1, 2, 3})
147133
now := c.Now().UnixNano()
148134
result := m.ListByState(PartitionPending)
149135
require.Equal(t, map[int32]int64{
@@ -165,38 +151,34 @@ func TestPartitionManager_ListByState(t *testing.T) {
165151
require.Equal(t, map[int32]int64{1: now}, result)
166152
}
167153

168-
func TestPartitionManager_Remove(t *testing.T) {
154+
func TestPartitionManager_Revoke(t *testing.T) {
169155
m := NewPartitionManager(log.NewNopLogger())
170156
c := quartz.NewMock(t)
171157
m.clock = c
172-
m.Assign(context.Background(), nil, map[string][]int32{
173-
"foo": {1, 2, 3},
174-
})
158+
m.Assign(context.Background(), []int32{1, 2, 3})
175159
// Assert that the partitions were assigned and the timestamps are set to
176160
// the current time.
177161
now := c.Now().UnixNano()
178162
require.Equal(t, map[int32]partitionEntry{
179163
1: {
180-
lastUpdated: now,
181-
state: PartitionPending,
164+
assignedAt: now,
165+
state: PartitionPending,
182166
},
183167
2: {
184-
lastUpdated: now,
185-
state: PartitionPending,
168+
assignedAt: now,
169+
state: PartitionPending,
186170
},
187171
3: {
188-
lastUpdated: now,
189-
state: PartitionPending,
172+
assignedAt: now,
173+
state: PartitionPending,
190174
},
191175
}, m.partitions)
192-
// Remove partitions 2 and 3.
193-
m.Remove(context.Background(), nil, map[string][]int32{
194-
"foo": {2, 3},
195-
})
176+
// Revoke partitions 2 and 3.
177+
m.Revoke(context.Background(), []int32{2, 3})
196178
require.Equal(t, map[int32]partitionEntry{
197179
1: {
198-
lastUpdated: now,
199-
state: PartitionPending,
180+
assignedAt: now,
181+
state: PartitionPending,
200182
},
201183
}, m.partitions)
202184
}

pkg/limits/service.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ type IngestLimits struct {
111111
playback *PlaybackManager
112112

113113
// Track partition assignments
114-
partitionManager *PartitionManager
114+
partitionManager *PartitionManager
115+
partitionLifecycler *PartitionLifecycler
115116

116117
// Used for tests.
117118
clock quartz.Clock
@@ -160,14 +161,16 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus.
160161
kCfg.AutoCreateTopicEnabled = true
161162
kCfg.AutoCreateTopicDefaultPartitions = cfg.NumPartitions
162163

164+
s.partitionLifecycler = NewPartitionLifecycler(s.partitionManager, s.usage, logger)
165+
163166
s.clientReader, err = client.NewReaderClient("ingest-limits-reader", kCfg, logger, reg,
164167
kgo.ConsumerGroup(consumerGroup),
165168
kgo.ConsumeTopics(kCfg.Topic),
166169
kgo.Balancers(kgo.CooperativeStickyBalancer()),
167170
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(s.clock.Now().Add(-s.cfg.WindowSize).UnixMilli())),
168171
kgo.DisableAutoCommit(),
169-
kgo.OnPartitionsAssigned(s.onPartitionsAssigned),
170-
kgo.OnPartitionsRevoked(s.onPartitionsRevoked),
172+
kgo.OnPartitionsAssigned(s.partitionLifecycler.Assign),
173+
kgo.OnPartitionsRevoked(s.partitionLifecycler.Revoke),
171174
)
172175
if err != nil {
173176
return nil, fmt.Errorf("failed to create kafka client: %w", err)
@@ -235,18 +238,6 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) {
235238
)
236239
}
237240

238-
func (s *IngestLimits) onPartitionsAssigned(ctx context.Context, client *kgo.Client, partitions map[string][]int32) {
239-
s.partitionManager.Assign(ctx, client, partitions)
240-
}
241-
242-
func (s *IngestLimits) onPartitionsRevoked(ctx context.Context, client *kgo.Client, partitions map[string][]int32) {
243-
s.partitionManager.Remove(ctx, client, partitions)
244-
245-
for _, ids := range partitions {
246-
s.usage.EvictPartitions(ids)
247-
}
248-
}
249-
250241
func (s *IngestLimits) CheckReady(ctx context.Context) error {
251242
if s.State() != services.Running {
252243
return fmt.Errorf("service is not running: %v", s.State())

pkg/limits/service_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,10 +338,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
338338
}
339339

340340
// Assign the Partition IDs.
341-
partitions := make(map[string][]int32)
342-
partitions["test"] = make([]int32, 0, len(tt.assignedPartitions))
343-
partitions["test"] = append(partitions["test"], tt.assignedPartitions...)
344-
s.partitionManager.Assign(context.Background(), nil, partitions)
341+
s.partitionManager.Assign(context.Background(), tt.assignedPartitions)
345342

346343
// Call ExceedsLimits.
347344
req := &proto.ExceedsLimitsRequest{
@@ -429,8 +426,7 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) {
429426
}
430427

431428
// Assign the Partition IDs.
432-
partitions := map[string][]int32{"tenant1": {0}}
433-
s.partitionManager.Assign(context.Background(), nil, partitions)
429+
s.partitionManager.Assign(context.Background(), []int32{0})
434430

435431
// Run concurrent requests
436432
concurrency := 10

0 commit comments

Comments
 (0)