Skip to content

Commit f5dcf73

Browse files
committed
test: implement unified thread entities
1 parent 0a0fd7d commit f5dcf73

File tree

3 files changed

+62
-0
lines changed

3 files changed

+62
-0
lines changed

test/tools/unified-spec-runner/entities.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,35 @@ export interface UnifiedChangeStream extends ChangeStream {
5151
eventCollector: InstanceType<typeof import('../../tools/utils')['EventCollector']>;
5252
}
5353

54+
export class UnifiedThread {
55+
#promise: Promise<void>;
56+
#error: Error;
57+
#killed = false;
58+
59+
id: string;
60+
61+
constructor(id) {
62+
this.id = id;
63+
this.#promise = Promise.resolve();
64+
}
65+
66+
queue(functionToQueue: () => Promise<any>) {
67+
if (this.#killed || this.#error) {
68+
return;
69+
}
70+
71+
this.#promise = this.#promise.then(functionToQueue).catch(e => (this.#error = e));
72+
}
73+
74+
async finish() {
75+
this.#killed = true;
76+
await this.#promise;
77+
if (this.#error) {
78+
throw this.#error;
79+
}
80+
}
81+
}
82+
5483
export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent;
5584
export type CmapEvent =
5685
| ConnectionPoolCreatedEvent
@@ -285,6 +314,7 @@ export type EntityCtor =
285314
| typeof ChangeStream
286315
| typeof AbstractCursor
287316
| typeof GridFSBucket
317+
| typeof UnifiedThread
288318
| ClientEncryption;
289319

290320
export type EntityTypeId =
@@ -293,6 +323,7 @@ export type EntityTypeId =
293323
| 'collection'
294324
| 'session'
295325
| 'bucket'
326+
| 'thread'
296327
| 'cursor'
297328
| 'stream'
298329
| 'clientEncryption';
@@ -303,6 +334,7 @@ ENTITY_CTORS.set('db', Db);
303334
ENTITY_CTORS.set('collection', Collection);
304335
ENTITY_CTORS.set('session', ClientSession);
305336
ENTITY_CTORS.set('bucket', GridFSBucket);
337+
ENTITY_CTORS.set('thread', UnifiedThread);
306338
ENTITY_CTORS.set('cursor', AbstractCursor);
307339
ENTITY_CTORS.set('stream', ChangeStream);
308340

@@ -335,6 +367,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
335367
getEntity(type: 'collection', key: string, assertExists?: boolean): Collection;
336368
getEntity(type: 'session', key: string, assertExists?: boolean): ClientSession;
337369
getEntity(type: 'bucket', key: string, assertExists?: boolean): GridFSBucket;
370+
getEntity(type: 'thread', key: string, assertExists?: boolean): UnifiedThread;
338371
getEntity(type: 'cursor', key: string, assertExists?: boolean): AbstractCursor;
339372
getEntity(type: 'stream', key: string, assertExists?: boolean): UnifiedChangeStream;
340373
getEntity(type: 'clientEncryption', key: string, assertExists?: boolean): ClientEncryption;
@@ -474,6 +507,8 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
474507
}
475508

476509
map.set(entity.bucket.id, new GridFSBucket(db, options));
510+
} else if ('thread' in entity) {
511+
map.set(entity.thread.id, new UnifiedThread(entity.thread.id));
477512
} else if ('stream' in entity) {
478513
throw new Error(`Unsupported Entity ${JSON.stringify(entity)}`);
479514
} else if ('clientEncryption' in entity) {

test/tools/unified-spec-runner/operations.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,31 @@ operations.set('waitForPrimaryChange', async ({ entities, operation }) => {
572572
]);
573573
});
574574

575+
operations.set('runOnThread', async ({ entities, operation, client, testConfig }) => {
576+
const threadId: string = operation.arguments!.thread;
577+
expect(threadId).to.be.a('string');
578+
const thread = entities.getEntity('thread', threadId, true);
579+
const operationToQueue = operation.arguments!.operation;
580+
const executeFn = executeOperationAndCheck.bind(
581+
null,
582+
operationToQueue,
583+
entities,
584+
client,
585+
testConfig
586+
);
587+
thread.queue(executeFn);
588+
});
589+
590+
operations.set('waitForThread', async ({ entities, operation }) => {
591+
const threadId: string = operation.arguments!.thread;
592+
expect(threadId).to.be.a('string');
593+
const thread = entities.getEntity('thread', threadId, true);
594+
await Promise.race([
595+
thread.finish(),
596+
sleep(10000).then(() => Promise.reject(new Error(`Timed out waiting for thread: ${threadId}`)))
597+
]);
598+
});
599+
575600
operations.set('withTransaction', async ({ entities, operation, client, testConfig }) => {
576601
const session = entities.getEntity('session', operation.object);
577602

test/tools/unified-spec-runner/schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { ReadPreferenceMode } from '../../../src/read_preference';
55
import type { TagSet } from '../../../src/sdam/server_description';
66
import type { W } from '../../../src/write_concern';
77
import { TestConfiguration } from '../runner/config';
8+
import { UnifiedThread } from './entities';
89

910
export const SupportedVersion = '^1.0';
1011

@@ -199,6 +200,7 @@ export type EntityDescription =
199200
| { database: DatabaseEntity }
200201
| { collection: CollectionEntity }
201202
| { bucket: BucketEntity }
203+
| { thread: UnifiedThread }
202204
| { stream: StreamEntity }
203205
| { session: SessionEntity }
204206
| { clientEncryption: ClientEncryptionEntity };

0 commit comments

Comments
 (0)