Skip to content

Commit b665282

Browse files
chore: remove cutoff etc from [UsageStore.Update] (#17706)
1 parent d86e81f commit b665282

File tree

4 files changed

+59
-85
lines changed

4 files changed

+59
-85
lines changed

pkg/limits/service.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -344,14 +344,6 @@ func (s *IngestLimits) GetAssignedPartitions(_ context.Context, _ *proto.GetAssi
344344
func (s *IngestLimits) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
345345
var (
346346
lastSeenAt = s.clock.Now()
347-
// Use the provided lastSeenAt timestamp as the last seen time
348-
recordTime = lastSeenAt.UnixNano()
349-
// Calculate the cutoff for the window size
350-
cutoff = lastSeenAt.Add(-s.cfg.WindowSize).UnixNano()
351-
// Get the bucket for this timestamp using the configured interval duration
352-
bucketStart = lastSeenAt.Truncate(s.cfg.BucketDuration).UnixNano()
353-
// Calculate the rate window cutoff for cleaning up old buckets
354-
bucketCutoff = lastSeenAt.Add(-s.cfg.RateWindow).UnixNano()
355347
// Calculate the max active streams per tenant per partition
356348
maxActiveStreams = uint64(s.limits.MaxGlobalStreamsPerUser(req.Tenant) / s.cfg.NumPartitions)
357349
)
@@ -373,7 +365,7 @@ func (s *IngestLimits) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi
373365
streams = streams[:valid]
374366

375367
cond := streamLimitExceeded(maxActiveStreams)
376-
accepted, rejected := s.usage.Update(req.Tenant, streams, recordTime, cutoff, bucketStart, bucketCutoff, cond)
368+
accepted, rejected := s.usage.Update(req.Tenant, streams, lastSeenAt, cond)
377369

378370
var ingestedBytes uint64
379371
for _, stream := range accepted {

pkg/limits/store.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package limits
33
import (
44
"hash/fnv"
55
"sync"
6+
"time"
67

78
"github.com/coder/quartz"
89

@@ -98,10 +99,17 @@ func (s *UsageStore) ForTenant(tenant string, fn IterateFunc) {
9899
})
99100
}
100101

101-
func (s *UsageStore) Update(tenant string, streams []*proto.StreamMetadata, lastSeenAt, cutoff, bucketStart, bucketCutOff int64, cond CondFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata) {
102-
stored := make([]*proto.StreamMetadata, 0, len(streams))
103-
rejected := make([]*proto.StreamMetadata, 0, len(streams))
104-
102+
func (s *UsageStore) Update(tenant string, streams []*proto.StreamMetadata, lastSeenAt time.Time, cond CondFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata) {
103+
var (
104+
// Calculate the cutoff for the window size
105+
cutoff = lastSeenAt.Add(-s.cfg.WindowSize).UnixNano()
106+
// Get the bucket for this timestamp using the configured interval duration
107+
bucketStart = lastSeenAt.Truncate(s.cfg.BucketDuration).UnixNano()
108+
// Calculate the rate window cutoff for cleaning up old buckets
109+
bucketCutoff = lastSeenAt.Add(-s.cfg.RateWindow).UnixNano()
110+
stored = make([]*proto.StreamMetadata, 0, len(streams))
111+
rejected = make([]*proto.StreamMetadata, 0, len(streams))
112+
)
105113
s.withLock(tenant, func(i int) {
106114
if _, ok := s.stripes[i][tenant]; !ok {
107115
s.stripes[i][tenant] = make(tenantUsage)
@@ -139,11 +147,11 @@ func (s *UsageStore) Update(tenant string, streams []*proto.StreamMetadata, last
139147

140148
// If the stream is stored and expired, reset the stream
141149
if found && recorded.LastSeenAt < cutoff {
142-
s.stripes[i][tenant][partition][stream.StreamHash] = Stream{Hash: stream.StreamHash, LastSeenAt: lastSeenAt}
150+
s.stripes[i][tenant][partition][stream.StreamHash] = Stream{Hash: stream.StreamHash, LastSeenAt: lastSeenAt.UnixNano()}
143151
}
144152
}
145153

146-
s.storeStream(i, tenant, partition, stream.StreamHash, stream.TotalSize, lastSeenAt, bucketStart, bucketCutOff)
154+
s.storeStream(i, tenant, partition, stream.StreamHash, stream.TotalSize, lastSeenAt, bucketStart, bucketCutoff)
147155

148156
stored = append(stored, stream)
149157
}
@@ -185,15 +193,15 @@ func (s *UsageStore) EvictPartitions(partitionsToEvict []int32) {
185193
})
186194
}
187195

188-
func (s *UsageStore) storeStream(i int, tenant string, partition int32, streamHash, recTotalSize uint64, recordTime, bucketStart, bucketCutOff int64) {
196+
func (s *UsageStore) storeStream(i int, tenant string, partition int32, streamHash, recTotalSize uint64, recordTime time.Time, bucketStart, bucketCutOff int64) {
189197
// Check if the stream already exists in the metadata
190198
recorded, ok := s.stripes[i][tenant][partition][streamHash]
191199

192200
// Create new stream metadata with the initial interval
193201
if !ok {
194202
s.stripes[i][tenant][partition][streamHash] = Stream{
195203
Hash: streamHash,
196-
LastSeenAt: recordTime,
204+
LastSeenAt: recordTime.UnixNano(),
197205
TotalSize: recTotalSize,
198206
RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: recTotalSize}},
199207
}

pkg/limits/store_bench_test.go

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,6 @@ import (
99
)
1010

1111
func BenchmarkUsageStore_Store(b *testing.B) {
12-
const (
13-
windowSize = time.Hour
14-
bucketDuration = time.Minute
15-
rateWindow = 5 * time.Minute
16-
rateBuckets = 5 // One bucket per minute in the 5-minute rate window
17-
)
18-
1912
benchmarks := []struct {
2013
name string
2114
numTenants int
@@ -49,11 +42,15 @@ func BenchmarkUsageStore_Store(b *testing.B) {
4942
}
5043

5144
for _, bm := range benchmarks {
52-
s := NewUsageStore(Config{NumPartitions: bm.numPartitions})
45+
s := NewUsageStore(Config{
46+
NumPartitions: bm.numPartitions,
47+
WindowSize: time.Hour,
48+
RateWindow: 5 * time.Minute,
49+
BucketDuration: time.Minute,
50+
})
5351

5452
b.Run(fmt.Sprintf("%s_create", bm.name), func(b *testing.B) {
5553
now := time.Now()
56-
cutoff := now.Add(-windowSize).UnixNano()
5754

5855
// Run the benchmark
5956
for i := range b.N {
@@ -67,16 +64,12 @@ func BenchmarkUsageStore_Store(b *testing.B) {
6764
TotalSize: 1500,
6865
}}
6966

70-
bucketStart := updateTime.Truncate(bucketDuration).UnixNano()
71-
bucketCutOff := updateTime.Add(-rateWindow).UnixNano()
72-
73-
s.Update(tenant, metadata, updateTime.UnixNano(), cutoff, bucketStart, bucketCutOff, nil)
67+
s.Update(tenant, metadata, updateTime, nil)
7468
}
7569
})
7670

7771
b.Run(fmt.Sprintf("%s_update", bm.name), func(b *testing.B) {
7872
now := time.Now()
79-
cutoff := now.Add(-windowSize).UnixNano()
8073

8174
// Run the benchmark
8275
for i := range b.N {
@@ -90,10 +83,7 @@ func BenchmarkUsageStore_Store(b *testing.B) {
9083
TotalSize: 1500,
9184
}}
9285

93-
bucketStart := updateTime.Truncate(bucketDuration).UnixNano()
94-
bucketCutOff := updateTime.Add(-rateWindow).UnixNano()
95-
96-
s.Update(tenant, metadata, updateTime.UnixNano(), cutoff, bucketStart, bucketCutOff, nil)
86+
s.Update(tenant, metadata, updateTime, nil)
9787
}
9888
})
9989

@@ -102,7 +92,6 @@ func BenchmarkUsageStore_Store(b *testing.B) {
10292
// Run parallel benchmark
10393
b.Run(bm.name+"_create_parallel", func(b *testing.B) {
10494
now := time.Now()
105-
cutoff := now.Add(-windowSize).UnixNano()
10695
// Run parallel benchmark
10796
b.RunParallel(func(pb *testing.PB) {
10897
i := 0
@@ -116,18 +105,14 @@ func BenchmarkUsageStore_Store(b *testing.B) {
116105
TotalSize: 1500,
117106
}}
118107

119-
bucketStart := updateTime.Truncate(bucketDuration).UnixNano()
120-
bucketCutOff := updateTime.Add(-rateWindow).UnixNano()
121-
122-
s.Update(tenant, metadata, updateTime.UnixNano(), cutoff, bucketStart, bucketCutOff, nil)
108+
s.Update(tenant, metadata, updateTime, nil)
123109
i++
124110
}
125111
})
126112
})
127113

128114
b.Run(bm.name+"_update_parallel", func(b *testing.B) {
129115
now := time.Now()
130-
cutoff := now.Add(-windowSize).UnixNano()
131116
// Run parallel benchmark
132117
b.RunParallel(func(pb *testing.PB) {
133118
i := 0
@@ -141,10 +126,7 @@ func BenchmarkUsageStore_Store(b *testing.B) {
141126
TotalSize: 1500,
142127
}}
143128

144-
bucketStart := updateTime.Truncate(bucketDuration).UnixNano()
145-
bucketCutOff := updateTime.Add(-rateWindow).UnixNano()
146-
147-
s.Update(tenant, metadata, updateTime.UnixNano(), cutoff, bucketStart, bucketCutOff, nil)
129+
s.Update(tenant, metadata, updateTime, nil)
148130
i++
149131
}
150132
})

pkg/limits/store_test.go

Lines changed: 32 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,18 @@ import (
1111
)
1212

1313
func TestUsageStore_All(t *testing.T) {
14-
now := time.Now()
15-
cutoff := now.Add(-5 * time.Minute).UnixNano()
1614
// Create a store with 10 partitions.
17-
s := NewUsageStore(Config{NumPartitions: 10})
15+
s := NewUsageStore(Config{
16+
NumPartitions: 10,
17+
WindowSize: time.Minute,
18+
})
19+
clock := quartz.NewMock(t)
20+
s.clock = clock
1821
// Create 10 streams. Since we use i as the hash, we can expect the
1922
// streams to be sharded over all 10 partitions.
20-
streams := make([]*proto.StreamMetadata, 10)
2123
for i := 0; i < 10; i++ {
22-
streams[i] = &proto.StreamMetadata{
23-
StreamHash: uint64(i),
24-
}
24+
s.set("tenant", Stream{Hash: uint64(i)})
2525
}
26-
// Add the streams to the store, all streams should be accepted.
27-
accepted, rejected := s.Update("tenant", streams, now.UnixNano(), cutoff, 0, 0, nil)
28-
require.Len(t, accepted, 10)
29-
require.Empty(t, rejected)
3026
// Check that we can iterate all stored streams.
3127
expected := []uint64{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9}
3228
actual := make([]uint64, 0, len(expected))
@@ -37,26 +33,22 @@ func TestUsageStore_All(t *testing.T) {
3733
}
3834

3935
func TestUsageStore_ForTenant(t *testing.T) {
40-
now := time.Now()
41-
cutoff := now.Add(-5 * time.Minute).UnixNano()
4236
// Create a store with 10 partitions.
43-
s := NewUsageStore(Config{NumPartitions: 10})
37+
s := NewUsageStore(Config{
38+
NumPartitions: 10,
39+
WindowSize: time.Minute,
40+
})
41+
clock := quartz.NewMock(t)
42+
s.clock = clock
4443
// Create 10 streams. Since we use i as the hash, we can expect the
4544
// streams to be sharded over all 10 partitions.
46-
streams := make([]*proto.StreamMetadata, 10)
4745
for i := 0; i < 10; i++ {
48-
streams[i] = &proto.StreamMetadata{
49-
StreamHash: uint64(i),
46+
tenant := "tenant1"
47+
if i >= 5 {
48+
tenant = "tenant2"
5049
}
50+
s.set(tenant, Stream{Hash: uint64(i)})
5151
}
52-
// Add the streams to the store, but with the streams shared between
53-
// two tenants.
54-
accepted, rejected := s.Update("tenant1", streams[0:5], now.UnixNano(), cutoff, 0, 0, nil)
55-
require.Len(t, accepted, 5)
56-
require.Empty(t, rejected)
57-
accepted, rejected = s.Update("tenant2", streams[5:], now.UnixNano(), cutoff, 0, 0, nil)
58-
require.Len(t, accepted, 5)
59-
require.Empty(t, rejected)
6052
// Check we can iterate just the streams for each tenant.
6153
expected1 := []uint64{0x0, 0x1, 0x2, 0x3, 0x4}
6254
actual1 := make([]uint64, 0, 5)
@@ -73,11 +65,6 @@ func TestUsageStore_ForTenant(t *testing.T) {
7365
}
7466

7567
func TestUsageStore_Store(t *testing.T) {
76-
now := time.Now()
77-
cutoff := now.Add(-5 * time.Minute).UnixNano()
78-
bucketStart := now.Truncate(time.Minute).UnixNano()
79-
bucketCutoff := now.Add(-5 * time.Minute).UnixNano()
80-
8168
tests := []struct {
8269
name string
8370
numPartitions int
@@ -180,10 +167,15 @@ func TestUsageStore_Store(t *testing.T) {
180167

181168
for _, test := range tests {
182169
t.Run(test.name, func(t *testing.T) {
183-
s := NewUsageStore(Config{NumPartitions: test.numPartitions})
184-
s.Update("tenant", test.seed, now.UnixNano(), cutoff, bucketStart, bucketCutoff, nil)
170+
s := NewUsageStore(Config{
171+
NumPartitions: test.numPartitions,
172+
WindowSize: time.Minute,
173+
})
174+
clock := quartz.NewMock(t)
175+
s.clock = clock
176+
s.Update("tenant", test.seed, clock.Now(), nil)
185177
streamLimitCond := streamLimitExceeded(test.maxGlobalStreams)
186-
accepted, rejected := s.Update("tenant", test.streams, now.UnixNano(), cutoff, bucketStart, bucketCutoff, streamLimitCond)
178+
accepted, rejected := s.Update("tenant", test.streams, clock.Now(), streamLimitCond)
187179
require.ElementsMatch(t, test.expectedAccepted, accepted)
188180
require.ElementsMatch(t, test.expectedRejected, rejected)
189181
})
@@ -225,17 +217,17 @@ func TestUsageStore_Evict(t *testing.T) {
225217

226218
func TestUsageStore_EvictPartitions(t *testing.T) {
227219
// Create a store with 10 partitions.
228-
s := NewUsageStore(Config{NumPartitions: 10})
220+
s := NewUsageStore(Config{
221+
NumPartitions: 10,
222+
WindowSize: time.Minute,
223+
})
224+
clock := quartz.NewMock(t)
225+
s.clock = clock
229226
// Create 10 streams. Since we use i as the hash, we can expect the
230227
// streams to be sharded over all 10 partitions.
231-
streams := make([]*proto.StreamMetadata, 10)
232228
for i := 0; i < 10; i++ {
233-
streams[i] = &proto.StreamMetadata{
234-
StreamHash: uint64(i),
235-
}
229+
s.set("tenant", Stream{Hash: uint64(i)})
236230
}
237-
now := time.Now()
238-
s.Update("tenant", streams, now.UnixNano(), now.Add(-time.Minute).UnixNano(), 0, 0, nil)
239231
// Evict the first 5 partitions.
240232
s.EvictPartitions([]int32{0, 1, 2, 3, 4})
241233
// The last 5 partitions should still have data.

0 commit comments

Comments
 (0)