Skip to content

Commit ff2d8de

Browse files
committed
test: add success e2e tests for stream processing commands MONGOSH-2127
1 parent 5238d59 commit ff2d8de

File tree

2 files changed

+352
-0
lines changed

2 files changed

+352
-0
lines changed

.evergreen.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ functions:
214214
MONGOSH_RUN_ONLY_IN_PACKAGE: ${mongosh_run_only_in_package}
215215
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
216216
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
217+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
218+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
219+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
220+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
217221
TASK_NAME: ${task_name}
218222
- command: s3.put
219223
params:
@@ -3772,6 +3776,10 @@ functions:
37723776
NODE_JS_VERSION: ${node_js_version}
37733777
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
37743778
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
3779+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
3780+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
3781+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
3782+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
37753783
DISABLE_OPENSSL_SHARED_CONFIG_FOR_BUNDLED_OPENSSL: ${disable_openssl_shared_config_for_bundled_openssl}
37763784
TASK_NAME: ${task_name}
37773785

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
import { bson } from '@mongosh/service-provider-core';
2+
import type { Db, Collection, Document } from '@mongosh/service-provider-core';
3+
import { MongoClient } from 'mongodb';
4+
import { expect } from 'chai';
5+
import type { TestShell } from './test-shell';
6+
import { ensureTestShellAfterHook } from './test-shell-context';
7+
import { sleep } from './util-helpers';
8+
9+
const {
10+
STREAMS_E2E_SPI_CONNECTION_STRING = '',
11+
STREAMS_E2E_DB_USER = '',
12+
STREAMS_E2E_DB_PASSWORD = '',
13+
STREAMS_E2E_CLUSTER_CONNECTION_STRING = '',
14+
} = process.env;
15+
16+
describe.only('e2e Streams', function () {
17+
let shell: TestShell;
18+
19+
before(function () {
20+
this.timeout(60_000);
21+
if (!STREAMS_E2E_SPI_CONNECTION_STRING) {
22+
console.error(
23+
'Stream Instance connection string not found - skipping Streams E2E tests...'
24+
);
25+
return this.skip();
26+
}
27+
28+
if (!STREAMS_E2E_CLUSTER_CONNECTION_STRING) {
29+
console.error(
30+
'Cluster connection string not found - skipping Streams E2E tests...'
31+
);
32+
return this.skip();
33+
}
34+
35+
if (!STREAMS_E2E_DB_USER) {
36+
console.error(
37+
'Atlas database user for Stream Processing not found - skipping Streams E2E tests...'
38+
);
39+
return this.skip();
40+
}
41+
42+
if (!STREAMS_E2E_DB_PASSWORD) {
43+
console.error(
44+
'Password for Atlas database user not found - skipping Streams E2E tests...'
45+
);
46+
return this.skip();
47+
}
48+
});
49+
50+
context('basic stream processor operations', function () {
51+
let processorName = '';
52+
let db: Db;
53+
let collection: Collection<Document>;
54+
let client: MongoClient;
55+
56+
beforeEach(async function () {
57+
shell = this.startTestShell({
58+
args: [
59+
STREAMS_E2E_SPI_CONNECTION_STRING,
60+
'--tls',
61+
'--authenticationDatabase admin',
62+
'--username',
63+
STREAMS_E2E_DB_USER,
64+
'--password',
65+
STREAMS_E2E_DB_PASSWORD,
66+
],
67+
removeSigintListeners: true,
68+
});
69+
await shell.waitForPromptOrExit({ timeout: 60_000 });
70+
71+
processorName = `spi${new bson.ObjectId().toHexString()}`;
72+
client = await MongoClient.connect(
73+
STREAMS_E2E_CLUSTER_CONNECTION_STRING,
74+
{}
75+
);
76+
db = client.db(processorName);
77+
const collectionName = 'processedData';
78+
collection = db.collection(collectionName);
79+
80+
// this stream processor reads from the sample stream and inserts documents into an Atlas database
81+
const sourceStage = {
82+
$source: {
83+
connectionName: 'sample_stream_solar',
84+
},
85+
};
86+
87+
const mergeStage = {
88+
$merge: {
89+
into: {
90+
connectionName: 'testClusterConnection',
91+
db: processorName,
92+
coll: collectionName,
93+
},
94+
},
95+
};
96+
97+
const aggPipeline = [sourceStage, mergeStage];
98+
99+
const createResult = await shell.executeLine(
100+
`sp.createStreamProcessor("${processorName}", ${JSON.stringify(
101+
aggPipeline
102+
)})`
103+
);
104+
expect(createResult).to.include(
105+
`Atlas Stream Processor: ${processorName}`
106+
);
107+
});
108+
109+
afterEach(async function () {
110+
try {
111+
await db.dropDatabase();
112+
await client.close();
113+
114+
const result = await shell.executeLine(`sp.${processorName}.drop()`);
115+
expect(result).to.include(`{ ok: 1 }`);
116+
} catch (err: any) {
117+
console.error(
118+
`Could not clean up stream processor ${processorName}:`,
119+
err
120+
);
121+
}
122+
});
123+
124+
ensureTestShellAfterHook('afterEach', this);
125+
126+
it('can list stream processors', async function () {
127+
const listResult = await shell.executeLine(`sp.listStreamProcessors()`);
128+
// make sure the processor created in the beforeEach is present
129+
expect(listResult).to.include(`name: '${processorName}'`);
130+
});
131+
132+
it('can start and stop a stream processor', async function () {
133+
// this should be a unique collection for this test run, so no data to start
134+
const initialDocsCount = await collection.countDocuments();
135+
expect(initialDocsCount).to.eq(0);
136+
137+
const startResult = await shell.executeLine(
138+
`sp.${processorName}.start()`
139+
);
140+
expect(startResult).to.include('{ ok: 1 }');
141+
142+
// sleep for a bit to let the processor do stuff
143+
await sleep(500);
144+
145+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
146+
expect(stopResult).to.include('{ ok: 1 }');
147+
148+
const updatedDocCount = await collection.countDocuments();
149+
expect(updatedDocCount).to.be.greaterThan(0);
150+
151+
// sleep again to make sure the processor isn't doing any more inserts
152+
await sleep(500);
153+
154+
const countAfterStopping = await collection.countDocuments();
155+
expect(countAfterStopping).to.eq(updatedDocCount);
156+
});
157+
158+
it(`can modify an existing stream processor's pipeline`, async function () {
159+
// this field is not present on any docs emit by the stream processor
160+
// created in the beforeEach
161+
const newField = 'newField';
162+
163+
const startResult = await shell.executeLine(
164+
`sp.${processorName}.start()`
165+
);
166+
expect(startResult).to.include('{ ok: 1 }');
167+
168+
// sleep for a bit to let the processor do stuff
169+
await sleep(500);
170+
171+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
172+
expect(stopResult).to.include('{ ok: 1 }');
173+
174+
const initialDocsWithNewField = await collection.countDocuments({
175+
[newField]: { $exists: true },
176+
});
177+
expect(initialDocsWithNewField).to.eq(0);
178+
179+
// define a new pipeline that will append our newField to the docs the stream
180+
// processor inserts into the database
181+
const sourceStage = {
182+
$source: {
183+
connectionName: 'sample_stream_solar',
184+
},
185+
};
186+
187+
const addFieldStage = {
188+
$addFields: {
189+
newField,
190+
},
191+
};
192+
193+
const mergeStage = {
194+
$merge: {
195+
into: {
196+
connectionName: 'testClusterConnection',
197+
db: processorName,
198+
coll: collection.collectionName,
199+
},
200+
},
201+
};
202+
203+
const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage];
204+
205+
const modifyResult = await shell.executeLine(
206+
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})`
207+
);
208+
expect(modifyResult).to.include('{ ok: 1 }');
209+
210+
const secondStartResult = await shell.executeLine(
211+
`sp.${processorName}.start()`
212+
);
213+
expect(secondStartResult).to.include('{ ok: 1 }');
214+
215+
// sleep again to let the processor work again with the updated pipeline
216+
await sleep(500);
217+
218+
const updatedDocsWithNewField = await collection.countDocuments({
219+
[newField]: { $exists: true },
220+
});
221+
expect(updatedDocsWithNewField).to.be.greaterThan(0);
222+
});
223+
224+
it('can view stats for a stream processor', async function () {
225+
const statsResult = await shell.executeLine(
226+
`sp.${processorName}.stats()`
227+
);
228+
expect(statsResult).to.include(`name: '${processorName}'`);
229+
expect(statsResult).to.include(`state: 'CREATED'`);
230+
expect(statsResult).to.include('stats: {');
231+
expect(statsResult).to.include(`pipeline: [`);
232+
expect(statsResult).to.include(
233+
`{ '$source': { connectionName: 'sample_stream_solar' } },`
234+
);
235+
});
236+
});
237+
238+
context('sampling from a running stream processor', function () {
239+
beforeEach(async function () {
240+
shell = this.startTestShell({
241+
args: [
242+
STREAMS_E2E_SPI_CONNECTION_STRING,
243+
'--tls',
244+
'--authenticationDatabase admin',
245+
'--username',
246+
STREAMS_E2E_DB_USER,
247+
'--password',
248+
STREAMS_E2E_DB_PASSWORD,
249+
],
250+
removeSigintListeners: true,
251+
});
252+
await shell.waitForPromptOrExit({ timeout: 60_000 });
253+
});
254+
255+
it('should output streamed documents to the shell', async function () {
256+
if (process.platform === 'win32') {
257+
return this.skip(); // No SIGINT on Windows.
258+
}
259+
260+
// this processor is pre-defined on the cloud-dev test project
261+
// it reads from sample solar stream, appends a field with the processor name to each doc, and
262+
// inserts the docs into an Atlas collection
263+
const immortalProcessorName = 'immortalProcessor';
264+
265+
const sampleCall = shell.executeLine(
266+
`sp.${immortalProcessorName}.sample()`
267+
);
268+
setTimeout(() => shell.kill('SIGINT'), 3000);
269+
await sampleCall;
270+
271+
// data from the sample solar stream isn't deterministic, so just assert that
272+
// the processorName field appears in the shell output after sampling
273+
shell.assertContainsOutput(`processorName: '${immortalProcessorName}'`);
274+
});
275+
});
276+
277+
context(
278+
'creating an interactive stream processor with .process()',
279+
function () {
280+
let interactiveId = '';
281+
const collectionName = 'processedData';
282+
283+
beforeEach(async function () {
284+
shell = this.startTestShell({
285+
args: [
286+
STREAMS_E2E_SPI_CONNECTION_STRING,
287+
'--tls',
288+
'--authenticationDatabase admin',
289+
'--username',
290+
STREAMS_E2E_DB_USER,
291+
'--password',
292+
STREAMS_E2E_DB_PASSWORD,
293+
],
294+
removeSigintListeners: true,
295+
});
296+
await shell.waitForPromptOrExit({ timeout: 60_000 });
297+
298+
interactiveId = new bson.ObjectId().toHexString();
299+
});
300+
301+
it('should output streamed documents to the shell', async function () {
302+
if (process.platform === 'win32') {
303+
return this.skip(); // No SIGINT on Windows.
304+
}
305+
306+
// the pipeline for our interactive processor reads from sample solar stream, adds a
307+
// unique test id to each document, and inserts it into an Atlas collection
308+
const sourceStage = {
309+
$source: {
310+
connectionName: 'sample_stream_solar',
311+
},
312+
};
313+
314+
const addFieldStage = {
315+
$addFields: {
316+
interactiveId,
317+
},
318+
};
319+
320+
const mergeStage = {
321+
$merge: {
322+
into: {
323+
connectionName: 'testClusterConnection',
324+
db: interactiveId,
325+
coll: collectionName,
326+
},
327+
},
328+
};
329+
330+
const aggPipeline = [sourceStage, addFieldStage, mergeStage];
331+
332+
const processCall = shell.executeLine(
333+
`sp.process(${JSON.stringify(aggPipeline)})`
334+
);
335+
setTimeout(() => shell.kill('SIGINT'), 10000);
336+
await processCall;
337+
338+
// data from the sample solar stream isn't deterministic, so just assert that
339+
// the interactiveId field appears in the shell output after sampling
340+
shell.assertContainsOutput(`interactiveId: '${interactiveId}'`);
341+
});
342+
}
343+
);
344+
});

0 commit comments

Comments
 (0)