Skip to content

Commit 8b987bb

Browse files
committed
test: add success e2e tests for stream processing commands MONGOSH-2127
1 parent 5238d59 commit 8b987bb

File tree

2 files changed

+366
-0
lines changed

2 files changed

+366
-0
lines changed

.evergreen.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ functions:
214214
MONGOSH_RUN_ONLY_IN_PACKAGE: ${mongosh_run_only_in_package}
215215
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
216216
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
217+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
218+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
219+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
220+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
217221
TASK_NAME: ${task_name}
218222
- command: s3.put
219223
params:
@@ -3772,6 +3776,10 @@ functions:
37723776
NODE_JS_VERSION: ${node_js_version}
37733777
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
37743778
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
3779+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
3780+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
3781+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
3782+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
37753783
DISABLE_OPENSSL_SHARED_CONFIG_FOR_BUNDLED_OPENSSL: ${disable_openssl_shared_config_for_bundled_openssl}
37763784
TASK_NAME: ${task_name}
37773785

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
import { bson } from '@mongosh/service-provider-core';
2+
import type { Db, Collection, Document } from '@mongosh/service-provider-core';
3+
import { MongoClient } from 'mongodb';
4+
import { expect } from 'chai';
5+
import type { TestShell } from './test-shell';
6+
import { ensureTestShellAfterHook } from './test-shell-context';
7+
import { sleep } from './util-helpers';
8+
9+
const {
10+
STREAMS_E2E_SPI_CONNECTION_STRING = '',
11+
STREAMS_E2E_DB_USER = '',
12+
STREAMS_E2E_DB_PASSWORD = '',
13+
STREAMS_E2E_CLUSTER_CONNECTION_STRING = '',
14+
} = process.env;
15+
16+
describe('e2e Streams', function () {
17+
this.timeout(60_000);
18+
let shell: TestShell;
19+
20+
before(function () {
21+
if (!STREAMS_E2E_SPI_CONNECTION_STRING) {
22+
console.error(
23+
'Stream Instance connection string not found - skipping Streams E2E tests...'
24+
);
25+
return this.skip();
26+
}
27+
28+
if (!STREAMS_E2E_CLUSTER_CONNECTION_STRING) {
29+
console.error(
30+
'Cluster connection string not found - skipping Streams E2E tests...'
31+
);
32+
return this.skip();
33+
}
34+
35+
if (!STREAMS_E2E_DB_USER) {
36+
console.error(
37+
'Atlas database user for Stream Processing not found - skipping Streams E2E tests...'
38+
);
39+
return this.skip();
40+
}
41+
42+
if (!STREAMS_E2E_DB_PASSWORD) {
43+
console.error(
44+
'Password for Atlas database user not found - skipping Streams E2E tests...'
45+
);
46+
return this.skip();
47+
}
48+
});
49+
50+
context('basic stream processor operations', function () {
51+
let processorName = '';
52+
let db: Db;
53+
let collection: Collection<Document>;
54+
let client: MongoClient;
55+
56+
beforeEach(async function () {
57+
shell = this.startTestShell({
58+
args: [
59+
STREAMS_E2E_SPI_CONNECTION_STRING,
60+
'--tls',
61+
'--authenticationDatabase admin',
62+
'--username',
63+
STREAMS_E2E_DB_USER,
64+
'--password',
65+
STREAMS_E2E_DB_PASSWORD,
66+
],
67+
removeSigintListeners: true,
68+
});
69+
await shell.waitForPromptOrExit({ timeout: 60_000 });
70+
71+
processorName = `spi${new bson.ObjectId().toHexString()}`;
72+
client = await MongoClient.connect(
73+
STREAMS_E2E_CLUSTER_CONNECTION_STRING,
74+
{}
75+
);
76+
db = client.db(processorName);
77+
const collectionName = 'processedData';
78+
collection = db.collection(collectionName);
79+
80+
// this stream processor reads from the sample stream and inserts documents into an Atlas database
81+
const sourceStage = {
82+
$source: {
83+
connectionName: 'sample_stream_solar',
84+
},
85+
};
86+
87+
const mergeStage = {
88+
$merge: {
89+
into: {
90+
connectionName: 'testClusterConnection',
91+
db: processorName,
92+
coll: collectionName,
93+
},
94+
},
95+
};
96+
97+
const aggPipeline = [sourceStage, mergeStage];
98+
99+
const createResult = await shell.executeLine(
100+
`sp.createStreamProcessor("${processorName}", ${JSON.stringify(
101+
aggPipeline
102+
)})`
103+
);
104+
expect(createResult).to.include(
105+
`Atlas Stream Processor: ${processorName}`
106+
);
107+
});
108+
109+
afterEach(async function () {
110+
try {
111+
await db.dropDatabase();
112+
await client.close();
113+
114+
const result = await shell.executeLine(`sp.${processorName}.drop()`);
115+
expect(result).to.include(`{ ok: 1 }`);
116+
} catch (err: any) {
117+
console.error(
118+
`Could not clean up stream processor ${processorName}:`,
119+
err
120+
);
121+
}
122+
});
123+
124+
ensureTestShellAfterHook('afterEach', this);
125+
126+
it('can list stream processors', async function () {
127+
const listResult = await shell.executeLine(`sp.listStreamProcessors()`);
128+
// make sure the processor created in the beforeEach is present
129+
expect(listResult).to.include(`name: '${processorName}'`);
130+
});
131+
132+
it('can start and stop a stream processor', async function () {
133+
// this should be a unique collection for this test run, so no data to start
134+
const initialDocsCount = await collection.countDocuments();
135+
expect(initialDocsCount).to.eq(0);
136+
137+
const startResult = await shell.executeLine(
138+
`sp.${processorName}.start()`
139+
);
140+
expect(startResult).to.include('{ ok: 1 }');
141+
142+
// sleep for a bit to let the processor do stuff
143+
await sleep(500);
144+
145+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
146+
expect(stopResult).to.include('{ ok: 1 }');
147+
148+
const updatedDocCount = await collection.countDocuments();
149+
expect(updatedDocCount).to.be.greaterThan(0);
150+
151+
// sleep again to make sure the processor isn't doing any more inserts
152+
await sleep(500);
153+
154+
const countAfterStopping = await collection.countDocuments();
155+
expect(countAfterStopping).to.eq(updatedDocCount);
156+
});
157+
158+
it(`can modify an existing stream processor's pipeline`, async function () {
159+
// this field is not present on any docs emit by the stream processor
160+
// created in the beforeEach
161+
const newField = 'newField';
162+
163+
const startResult = await shell.executeLine(
164+
`sp.${processorName}.start()`
165+
);
166+
expect(startResult).to.include('{ ok: 1 }');
167+
168+
// sleep for a bit to let the processor do stuff
169+
await sleep(500);
170+
171+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
172+
expect(stopResult).to.include('{ ok: 1 }');
173+
174+
const initialDocsWithNewField = await collection.countDocuments({
175+
[newField]: { $exists: true },
176+
});
177+
expect(initialDocsWithNewField).to.eq(0);
178+
179+
// define a new pipeline that will append our newField to the docs the stream
180+
// processor inserts into the database
181+
const sourceStage = {
182+
$source: {
183+
connectionName: 'sample_stream_solar',
184+
},
185+
};
186+
187+
const addFieldStage = {
188+
$addFields: {
189+
newField,
190+
},
191+
};
192+
193+
const mergeStage = {
194+
$merge: {
195+
into: {
196+
connectionName: 'testClusterConnection',
197+
db: processorName,
198+
coll: collection.collectionName,
199+
},
200+
},
201+
};
202+
203+
const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage];
204+
205+
const modifyResult = await shell.executeLine(
206+
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})`
207+
);
208+
expect(modifyResult).to.include('{ ok: 1 }');
209+
210+
const secondStartResult = await shell.executeLine(
211+
`sp.${processorName}.start()`
212+
);
213+
expect(secondStartResult).to.include('{ ok: 1 }');
214+
215+
// sleep again to let the processor work again with the updated pipeline
216+
await sleep(500);
217+
218+
const updatedDocsWithNewField = await collection.countDocuments({
219+
[newField]: { $exists: true },
220+
});
221+
expect(updatedDocsWithNewField).to.be.greaterThan(0);
222+
});
223+
224+
it('can view stats for a stream processor', async function () {
225+
const statsResult = await shell.executeLine(
226+
`sp.${processorName}.stats()`
227+
);
228+
expect(statsResult).to.include(`name: '${processorName}'`);
229+
expect(statsResult).to.include(`state: 'CREATED'`);
230+
expect(statsResult).to.include('stats: {');
231+
expect(statsResult).to.include(`pipeline: [`);
232+
expect(statsResult).to.include(
233+
`{ '$source': { connectionName: 'sample_stream_solar' } },`
234+
);
235+
});
236+
});
237+
238+
context('sampling from a running stream processor', function () {
239+
beforeEach(async function () {
240+
shell = this.startTestShell({
241+
args: [
242+
STREAMS_E2E_SPI_CONNECTION_STRING,
243+
'--tls',
244+
'--authenticationDatabase admin',
245+
'--username',
246+
STREAMS_E2E_DB_USER,
247+
'--password',
248+
STREAMS_E2E_DB_PASSWORD,
249+
],
250+
removeSigintListeners: true,
251+
});
252+
await shell.waitForPromptOrExit({ timeout: 60_000 });
253+
});
254+
255+
it('should output streamed documents to the shell', async function () {
256+
if (process.platform === 'win32') {
257+
return this.skip(); // No SIGINT on Windows.
258+
}
259+
260+
// this processor is pre-defined on the cloud-dev test project
261+
// it reads from sample solar stream, appends a field with the processor name to each doc, and
262+
// inserts the docs into an Atlas collection
263+
const immortalProcessorName = 'immortalProcessor';
264+
265+
const sampleCall = shell.executeLine(
266+
`sp.${immortalProcessorName}.sample()`
267+
);
268+
setTimeout(() => shell.kill('SIGINT'), 3000);
269+
await sampleCall;
270+
271+
// data from the sample solar stream isn't deterministic, so just assert that
272+
// the processorName field appears in the shell output after sampling
273+
shell.assertContainsOutput(`processorName: '${immortalProcessorName}'`);
274+
});
275+
});
276+
277+
context(
278+
'creating an interactive stream processor with .process()',
279+
function () {
280+
let interactiveId = '';
281+
let db: Db;
282+
let collection: Collection<Document>;
283+
let client: MongoClient;
284+
285+
beforeEach(async function () {
286+
shell = this.startTestShell({
287+
args: [
288+
STREAMS_E2E_SPI_CONNECTION_STRING,
289+
'--tls',
290+
'--authenticationDatabase admin',
291+
'--username',
292+
STREAMS_E2E_DB_USER,
293+
'--password',
294+
STREAMS_E2E_DB_PASSWORD,
295+
],
296+
removeSigintListeners: true,
297+
});
298+
await shell.waitForPromptOrExit({ timeout: 60_000 });
299+
300+
interactiveId = new bson.ObjectId().toHexString();
301+
client = await MongoClient.connect(
302+
STREAMS_E2E_CLUSTER_CONNECTION_STRING,
303+
{}
304+
);
305+
db = client.db(interactiveId);
306+
const collectionName = 'processedData';
307+
collection = db.collection(collectionName);
308+
});
309+
310+
afterEach(async function () {
311+
await db.dropDatabase();
312+
await client.close();
313+
});
314+
315+
it('should output streamed documents to the shell', async function () {
316+
if (process.platform === 'win32') {
317+
return this.skip(); // No SIGINT on Windows.
318+
}
319+
320+
// the pipeline for our interactive processor reads from sample solar stream, adds a
321+
// unique test id to each document, and inserts it into an Atlas collection
322+
const sourceStage = {
323+
$source: {
324+
connectionName: 'sample_stream_solar',
325+
},
326+
};
327+
328+
const addFieldStage = {
329+
$addFields: {
330+
interactiveId,
331+
},
332+
};
333+
334+
const mergeStage = {
335+
$merge: {
336+
into: {
337+
connectionName: 'testClusterConnection',
338+
db: db.databaseName,
339+
coll: collection.collectionName,
340+
},
341+
},
342+
};
343+
344+
const aggPipeline = [sourceStage, addFieldStage, mergeStage];
345+
346+
const processCall = shell.executeLine(
347+
`sp.process(${JSON.stringify(aggPipeline)})`
348+
);
349+
setTimeout(() => shell.kill('SIGINT'), 10000);
350+
await processCall;
351+
352+
// data from the sample solar stream isn't deterministic, so just assert that
353+
// the interactiveId field appears in the shell output after sampling
354+
shell.assertContainsOutput(`interactiveId: '${interactiveId}'`);
355+
});
356+
}
357+
);
358+
});

0 commit comments

Comments
 (0)