Skip to content

Commit ebd733a

Browse files
[1.x] Fix trimming direct to storage ingest (#318)
* remove extra closure * disable lottery for test * remove only call * add test to ensure trimming is called * change comparison to match with database query * Only collect queries we care about to remove trimming concerns --------- Co-authored-by: Tim MacDonald <hello@timacdonald.me>
1 parent bfc4bb3 commit ebd733a

File tree

4 files changed

+41
-13
lines changed

4 files changed

+41
-13
lines changed

src/Pulse.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public function ingest(): int
311311
$odds = $this->app->make('config')->get('pulse.ingest.trim.lottery') ?? $this->app->make('config')->get('pulse.ingest.trim_lottery');
312312

313313
Lottery::odds(...$odds)
314-
->winner(fn () => $this->rescue(fn () => $ingest->trim(...)))
314+
->winner(fn () => $this->rescue($ingest->trim(...)))
315315
->choose();
316316

317317
$this->flush();

tests/Feature/PulseTest.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,17 @@
3434
expect($storage->stored[1]->key)->toBe('keep');
3535
});
3636

37+
it('can trim records', function () {
38+
App::instance(Storage::class, $storage = new StorageFake);
39+
40+
Pulse::record('foo', 'delete', 0, now()->subMonth());
41+
Pulse::record('foo', 'keep', 0);
42+
43+
Pulse::ingest();
44+
45+
expect($storage->stored)->toHaveCount(1);
46+
});
47+
3748
it('can lazily capture entries', function () {
3849
App::instance(Storage::class, $storage = new StorageFake);
3950

tests/Feature/Storage/DatabaseStorageTest.php

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,12 @@
123123
});
124124

125125
it('combines duplicate count aggregates before upserting', function () {
126-
Config::set('pulse.ingest.trim.lottery', [0, 1]);
127126
$queries = collect();
128-
DB::listen(fn ($query) => $queries[] = $query);
127+
DB::listen(function (QueryExecuted $event) use (&$queries) {
128+
if (str_starts_with($event->sql, 'insert')) {
129+
$queries[] = $event;
130+
}
131+
});
129132

130133
Pulse::record('type', 'key1')->count();
131134
Pulse::record('type', 'key1')->count();
@@ -150,9 +153,12 @@
150153
});
151154

152155
it('combines duplicate min aggregates before upserting', function () {
153-
Config::set('pulse.ingest.trim.lottery', [0, 1]);
154156
$queries = collect();
155-
DB::listen(fn ($query) => $queries[] = $query);
157+
DB::listen(function (QueryExecuted $event) use (&$queries) {
158+
if (str_starts_with($event->sql, 'insert')) {
159+
$queries[] = $event;
160+
}
161+
});
156162

157163
Pulse::record('type', 'key1', 200)->min();
158164
Pulse::record('type', 'key1', 100)->min();
@@ -177,9 +183,12 @@
177183
});
178184

179185
it('combines duplicate max aggregates before upserting', function () {
180-
Config::set('pulse.ingest.trim.lottery', [0, 1]);
181186
$queries = collect();
182-
DB::listen(fn ($query) => $queries[] = $query);
187+
DB::listen(function (QueryExecuted $event) use (&$queries) {
188+
if (str_starts_with($event->sql, 'insert')) {
189+
$queries[] = $event;
190+
}
191+
});
183192

184193
Pulse::record('type', 'key1', 100)->max();
185194
Pulse::record('type', 'key1', 300)->max();
@@ -204,9 +213,12 @@
204213
});
205214

206215
it('combines duplicate sum aggregates before upserting', function () {
207-
Config::set('pulse.ingest.trim.lottery', [0, 1]);
208216
$queries = collect();
209-
DB::listen(fn ($query) => $queries[] = $query);
217+
DB::listen(function (QueryExecuted $event) use (&$queries) {
218+
if (str_starts_with($event->sql, 'insert')) {
219+
$queries[] = $event;
220+
}
221+
});
210222

211223
Pulse::record('type', 'key1', 100)->sum();
212224
Pulse::record('type', 'key1', 300)->sum();
@@ -231,9 +243,12 @@
231243
});
232244

233245
it('combines duplicate average aggregates before upserting', function () {
234-
Config::set('pulse.ingest.trim.lottery', [0, 1]);
235246
$queries = collect();
236-
DB::listen(fn ($query) => $queries[] = $query);
247+
DB::listen(function (QueryExecuted $event) use (&$queries) {
248+
if (str_starts_with($event->sql, 'insert')) {
249+
$queries[] = $event;
250+
}
251+
});
237252

238253
Pulse::record('type', 'key1', 100)->avg();
239254
Pulse::record('type', 'key1', 300)->avg();
@@ -466,7 +481,9 @@
466481
it('collapses values with the same key into a single upsert', function () {
467482
$bindings = [];
468483
DB::listen(function (QueryExecuted $event) use (&$bindings) {
469-
$bindings = $event->bindings;
484+
if (str_starts_with($event->sql, 'insert')) {
485+
$bindings = $event->bindings;
486+
}
470487
});
471488

472489
Pulse::set('read_counter', 'post:321', 123);

tests/StorageFake.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public function store(Collection $items): void
3131
*/
3232
public function trim(): void
3333
{
34-
//
34+
$this->stored = $this->stored->reject(fn($record) => $record->timestamp <= now()->subWeek()->timestamp);
3535
}
3636

3737
/**

0 commit comments

Comments
 (0)