Skip to content

Commit d937389

Browse files
committed
feat(core): Add Supabase Queues support
1 parent 56137a8 commit d937389

File tree

5 files changed

+279
-6
lines changed

5 files changed

+279
-6
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import * as Sentry from '@sentry/browser';
2+
3+
import { createClient } from '@supabase/supabase-js';
4+
window.Sentry = Sentry;
5+
6+
const queues = createClient('https://test.supabase.co', 'test-key', {
7+
db: {
8+
schema: 'pgmq_public',
9+
},
10+
});
11+
12+
Sentry.init({
13+
dsn: 'https://public@dsn.ingest.sentry.io/1337',
14+
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)],
15+
tracesSampleRate: 1.0,
16+
});
17+
18+
// Simulate queue operations
19+
async function performQueueOperations() {
20+
try {
21+
await queues.rpc('enqueue', {
22+
queue_name: 'todos',
23+
msg: { title: 'Test Todo' },
24+
});
25+
26+
await queues.rpc('dequeue', {
27+
queue_name: 'todos',
28+
});
29+
} catch (error) {
30+
Sentry.captureException(error);
31+
}
32+
}
33+
34+
performQueueOperations();
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import type { Page} from '@playwright/test';
2+
import { expect } from '@playwright/test';
3+
import type { Event } from '@sentry/core';
4+
5+
import { sentryTest } from '../../../../utils/fixtures';
6+
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
7+
8+
async function mockSupabaseRoute(page: Page) {
9+
await page.route('**/rest/v1/rpc**', route => {
10+
return route.fulfill({
11+
status: 200,
12+
body: JSON.stringify({
13+
foo: ['bar', 'baz'],
14+
}),
15+
headers: {
16+
'Content-Type': 'application/json',
17+
},
18+
});
19+
});
20+
}
21+
22+
sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => {
23+
await mockSupabaseRoute(page);
24+
25+
if (shouldSkipTracingTest()) {
26+
return;
27+
}
28+
29+
const url = await getLocalTestUrl({ testDir: __dirname });
30+
31+
const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
32+
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue'));
33+
34+
expect(queueSpans).toHaveLength(2);
35+
36+
expect(queueSpans![0]).toMatchObject({
37+
description: 'supabase.db.rpc',
38+
parent_span_id: event.contexts?.trace?.span_id,
39+
span_id: expect.any(String),
40+
start_timestamp: expect.any(Number),
41+
timestamp: expect.any(Number),
42+
trace_id: event.contexts?.trace?.trace_id,
43+
data: expect.objectContaining({
44+
'sentry.op': 'queue.publish',
45+
'sentry.origin': 'auto.db.supabase',
46+
'messaging.destination.name': 'todos',
47+
'messaging.message.id': 'Test Todo',
48+
}),
49+
});
50+
51+
expect(queueSpans![1]).toMatchObject({
52+
description: 'supabase.db.rpc',
53+
parent_span_id: event.contexts?.trace?.span_id,
54+
span_id: expect.any(String),
55+
start_timestamp: expect.any(Number),
56+
timestamp: expect.any(Number),
57+
trace_id: event.contexts?.trace?.trace_id,
58+
data: expect.objectContaining({
59+
'sentry.op': 'queue.process',
60+
'sentry.origin': 'auto.db.supabase',
61+
'messaging.destination.name': 'todos',
62+
}),
63+
});
64+
});
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import * as Sentry from '@sentry/browser';
2+
3+
import { createClient } from '@supabase/supabase-js';
4+
window.Sentry = Sentry;
5+
6+
const queues = createClient('https://test.supabase.co', 'test-key', {
7+
db: {
8+
schema: 'pgmq_public',
9+
},
10+
});
11+
12+
Sentry.init({
13+
dsn: 'https://public@dsn.ingest.sentry.io/1337',
14+
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)],
15+
tracesSampleRate: 1.0,
16+
});
17+
18+
// Simulate queue operations
19+
async function performQueueOperations() {
20+
try {
21+
await queues
22+
.schema('pgmq_public')
23+
.rpc('enqueue', {
24+
queue_name: 'todos',
25+
msg: { title: 'Test Todo' },
26+
});
27+
28+
await queues
29+
.schema('pgmq_public')
30+
.rpc('dequeue', {
31+
queue_name: 'todos',
32+
});
33+
} catch (error) {
34+
Sentry.captureException(error);
35+
}
36+
}
37+
38+
performQueueOperations();
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { type Page, expect } from '@playwright/test';
2+
import type { Event } from '@sentry/core';
3+
4+
import { sentryTest } from '../../../../utils/fixtures';
5+
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
6+
7+
async function mockSupabaseRoute(page: Page) {
8+
await page.route('**/rest/v1/rpc**', route => {
9+
return route.fulfill({
10+
status: 200,
11+
body: JSON.stringify({
12+
foo: ['bar', 'baz'],
13+
}),
14+
headers: {
15+
'Content-Type': 'application/json',
16+
},
17+
});
18+
});
19+
}
20+
21+
sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => {
22+
await mockSupabaseRoute(page);
23+
24+
if (shouldSkipTracingTest()) {
25+
return;
26+
}
27+
28+
const url = await getLocalTestUrl({ testDir: __dirname });
29+
30+
const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
31+
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue'));
32+
33+
expect(queueSpans).toHaveLength(2);
34+
35+
expect(queueSpans![0]).toMatchObject({
36+
description: 'supabase.db.rpc',
37+
parent_span_id: event.contexts?.trace?.span_id,
38+
span_id: expect.any(String),
39+
start_timestamp: expect.any(Number),
40+
timestamp: expect.any(Number),
41+
trace_id: event.contexts?.trace?.trace_id,
42+
data: expect.objectContaining({
43+
'sentry.op': 'queue.publish',
44+
'sentry.origin': 'auto.db.supabase',
45+
'messaging.destination.name': 'todos',
46+
'messaging.message.id': 'Test Todo',
47+
}),
48+
});
49+
50+
expect(queueSpans![1]).toMatchObject({
51+
description: 'supabase.db.rpc',
52+
parent_span_id: event.contexts?.trace?.span_id,
53+
span_id: expect.any(String),
54+
start_timestamp: expect.any(Number),
55+
timestamp: expect.any(Number),
56+
trace_id: event.contexts?.trace?.trace_id,
57+
data: expect.objectContaining({
58+
'sentry.op': 'queue.process',
59+
'sentry.origin': 'auto.db.supabase',
60+
'messaging.destination.name': 'todos',
61+
}),
62+
});
63+
});

packages/core/src/integrations/supabase.ts

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ import type { IntegrationFn } from '../types-hoist/integration';
1212
import { isPlainObject } from '../utils-hoist/is';
1313
import { logger } from '../utils-hoist/logger';
1414

15+
export interface SupabaseClientConstructor {
16+
prototype: {
17+
from: (table: string) => PostgRESTQueryBuilder;
18+
schema: (schema: string) => { rpc: (...args: unknown[]) => Promise<unknown> };
19+
};
20+
rpc: (fn: string, params: Record<string, unknown>) => Promise<unknown>;
21+
}
22+
1523
const AUTH_OPERATIONS_TO_INSTRUMENT = [
1624
'reauthenticate',
1725
'signInAnonymously',
@@ -113,12 +121,6 @@ export interface SupabaseBreadcrumb {
113121
};
114122
}
115123

116-
export interface SupabaseClientConstructor {
117-
prototype: {
118-
from: (table: string) => PostgRESTQueryBuilder;
119-
};
120-
}
121-
122124
export interface PostgRESTProtoThenable {
123125
then: <T>(
124126
onfulfilled?: ((value: T) => T | PromiseLike<T>) | null,
@@ -214,6 +216,76 @@ export function translateFiltersIntoMethods(key: string, query: string): string
214216
return `${method}(${key}, ${value.join('.')})`;
215217
}
216218

219+
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void {
220+
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy(
221+
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema,
222+
{
223+
apply(target, thisArg, argumentsList) {
224+
const rv = Reflect.apply(target, thisArg, argumentsList);
225+
226+
return instrumentRpc(rv);
227+
},
228+
},
229+
);
230+
}
231+
232+
function instrumentRpc(SupabaseClient: unknown): unknown {
233+
(SupabaseClient as unknown as SupabaseClientConstructor).rpc = new Proxy(
234+
(SupabaseClient as unknown as SupabaseClientConstructor).rpc,
235+
{
236+
apply(target, thisArg, argumentsList) {
237+
const isProducerSpan = argumentsList[0] === 'enqueue';
238+
const isConsumerSpan = argumentsList[0] === 'dequeue';
239+
240+
const maybeQueueParams = argumentsList[1];
241+
242+
// If the second argument is not an object, it's not a queue operation
243+
if (!isPlainObject(maybeQueueParams)) {
244+
return Reflect.apply(target, thisArg, argumentsList);
245+
}
246+
247+
const msg = maybeQueueParams?.msg as { title: string };
248+
249+
const messageId = msg?.title;
250+
const queueName = maybeQueueParams?.queue_name as string;
251+
252+
const op = isProducerSpan ? 'queue.publish' : isConsumerSpan ? 'queue.process' : '';
253+
254+
// If the operation is not a queue operation, return the original function
255+
if (!op) {
256+
return Reflect.apply(target, thisArg, argumentsList);
257+
}
258+
259+
return startSpan(
260+
{
261+
name: 'supabase.db.rpc',
262+
attributes: {
263+
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase',
264+
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: op,
265+
},
266+
},
267+
async span => {
268+
return (Reflect.apply(target, thisArg, argumentsList) as Promise<unknown>).then((res: unknown) => {
269+
if (messageId) {
270+
span.setAttribute('messaging.message.id', messageId);
271+
}
272+
273+
if (queueName) {
274+
span.setAttribute('messaging.destination.name', queueName);
275+
}
276+
277+
span.end();
278+
return res;
279+
});
280+
},
281+
);
282+
},
283+
},
284+
);
285+
286+
return SupabaseClient;
287+
}
288+
217289
function instrumentAuthOperation(operation: AuthOperationFn, isAdmin = false): AuthOperationFn {
218290
return new Proxy(operation, {
219291
apply(target, thisArg, argumentsList) {
@@ -495,6 +567,8 @@ export const instrumentSupabaseClient = (supabaseClient: unknown): void => {
495567
supabaseClient.constructor === Function ? supabaseClient : supabaseClient.constructor;
496568

497569
instrumentSupabaseClientConstructor(SupabaseClientConstructor);
570+
instrumentRpcReturnedFromSchemaCall(SupabaseClientConstructor);
571+
instrumentRpc(supabaseClient as SupabaseClientInstance);
498572
instrumentSupabaseAuthClient(supabaseClient as SupabaseClientInstance);
499573
};
500574

0 commit comments

Comments
 (0)