Skip to content

Commit 1673975

Browse files
committed
test: add success e2e tests for stream processing commands MONGOSH-2127
1 parent 5238d59 commit 1673975

File tree

2 files changed

+371
-0
lines changed

2 files changed

+371
-0
lines changed

.evergreen.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ functions:
214214
MONGOSH_RUN_ONLY_IN_PACKAGE: ${mongosh_run_only_in_package}
215215
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
216216
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
217+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
218+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
219+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
220+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
217221
TASK_NAME: ${task_name}
218222
- command: s3.put
219223
params:
@@ -3772,6 +3776,10 @@ functions:
37723776
NODE_JS_VERSION: ${node_js_version}
37733777
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
37743778
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
3779+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
3780+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
3781+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
3782+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
37753783
DISABLE_OPENSSL_SHARED_CONFIG_FOR_BUNDLED_OPENSSL: ${disable_openssl_shared_config_for_bundled_openssl}
37763784
TASK_NAME: ${task_name}
37773785

Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
import { bson } from '@mongosh/service-provider-core';
2+
import type { Db, Collection, Document } from '@mongosh/service-provider-core';
3+
import { MongoClient } from 'mongodb';
4+
import { expect } from 'chai';
5+
import type { TestShell } from './test-shell';
6+
import { ensureTestShellAfterHook } from './test-shell-context';
7+
import { sleep } from './util-helpers';
8+
9+
const {
10+
STREAMS_E2E_SPI_CONNECTION_STRING = '',
11+
STREAMS_E2E_DB_USER = '',
12+
STREAMS_E2E_DB_PASSWORD = '',
13+
STREAMS_E2E_CLUSTER_CONNECTION_STRING = '',
14+
} = process.env;
15+
16+
describe('e2e Streams', function () {
17+
let shell: TestShell;
18+
19+
before(function () {
20+
if (!STREAMS_E2E_SPI_CONNECTION_STRING) {
21+
console.error(
22+
'Stream Instance connection string not found - skipping Streams E2E tests...'
23+
);
24+
return this.skip();
25+
}
26+
27+
if (!STREAMS_E2E_CLUSTER_CONNECTION_STRING) {
28+
console.error(
29+
'Cluster connection string not found - skipping Streams E2E tests...'
30+
);
31+
return this.skip();
32+
}
33+
34+
if (!STREAMS_E2E_DB_USER) {
35+
console.error(
36+
'Atlas database user for Stream Processing not found - skipping Streams E2E tests...'
37+
);
38+
return this.skip();
39+
}
40+
41+
if (!STREAMS_E2E_DB_PASSWORD) {
42+
console.error(
43+
'Password for Atlas database user not found - skipping Streams E2E tests...'
44+
);
45+
return this.skip();
46+
}
47+
});
48+
49+
context('basic stream processor operations', function () {
50+
let processorName = '';
51+
let db: Db;
52+
let collection: Collection<Document>;
53+
let client: MongoClient;
54+
55+
beforeEach(async function () {
56+
this.timeout(60_000);
57+
58+
shell = this.startTestShell({
59+
args: [
60+
STREAMS_E2E_SPI_CONNECTION_STRING,
61+
'--tls',
62+
'--authenticationDatabase admin',
63+
'--username',
64+
STREAMS_E2E_DB_USER,
65+
'--password',
66+
STREAMS_E2E_DB_PASSWORD,
67+
],
68+
removeSigintListeners: true,
69+
});
70+
await shell.waitForPromptOrExit({ timeout: 60_000 });
71+
72+
processorName = `spi${new bson.ObjectId().toHexString()}`;
73+
client = await MongoClient.connect(
74+
STREAMS_E2E_CLUSTER_CONNECTION_STRING,
75+
{}
76+
);
77+
db = client.db(processorName);
78+
const collectionName = 'processedData';
79+
collection = db.collection(collectionName);
80+
81+
// this stream processor reads from the sample stream and inserts documents into an Atlas database
82+
const sourceStage = {
83+
$source: {
84+
connectionName: 'sample_stream_solar',
85+
},
86+
};
87+
88+
const mergeStage = {
89+
$merge: {
90+
into: {
91+
connectionName: 'testClusterConnection',
92+
db: processorName,
93+
coll: collectionName,
94+
},
95+
},
96+
};
97+
98+
const aggPipeline = [sourceStage, mergeStage];
99+
100+
const createResult = await shell.executeLine(
101+
`sp.createStreamProcessor("${processorName}", ${JSON.stringify(
102+
aggPipeline
103+
)})`
104+
);
105+
expect(createResult).to.include(
106+
`Atlas Stream Processor: ${processorName}`
107+
);
108+
});
109+
110+
afterEach(async function () {
111+
try {
112+
await db.dropDatabase();
113+
await client.close();
114+
115+
const result = await shell.executeLine(`sp.${processorName}.drop()`);
116+
expect(result).to.include(`{ ok: 1 }`);
117+
} catch (err: any) {
118+
console.error(
119+
`Could not clean up stream processor ${processorName}:`,
120+
err
121+
);
122+
}
123+
});
124+
125+
ensureTestShellAfterHook('afterEach', this);
126+
127+
it('can list stream processors', async function () {
128+
const listResult = await shell.executeLine(`sp.listStreamProcessors()`);
129+
// make sure the processor created in the beforeEach is present
130+
expect(listResult).to.include(`name: '${processorName}'`);
131+
});
132+
133+
it('can start and stop a stream processor', async function () {
134+
// this should be a unique collection for this test run, so no data to start
135+
const initialDocsCount = await collection.countDocuments();
136+
expect(initialDocsCount).to.eq(0);
137+
138+
const startResult = await shell.executeLine(
139+
`sp.${processorName}.start()`
140+
);
141+
expect(startResult).to.include('{ ok: 1 }');
142+
143+
// sleep for a bit to let the processor do stuff
144+
await sleep(500);
145+
146+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
147+
expect(stopResult).to.include('{ ok: 1 }');
148+
149+
const updatedDocCount = await collection.countDocuments();
150+
expect(updatedDocCount).to.be.greaterThan(0);
151+
152+
// sleep again to make sure the processor isn't doing any more inserts
153+
await sleep(500);
154+
155+
const countAfterStopping = await collection.countDocuments();
156+
expect(countAfterStopping).to.eq(updatedDocCount);
157+
});
158+
159+
it(`can modify an existing stream processor's pipeline`, async function () {
160+
// this field is not present on any docs emit by the stream processor
161+
// created in the beforeEach
162+
const newField = 'newField';
163+
164+
const startResult = await shell.executeLine(
165+
`sp.${processorName}.start()`
166+
);
167+
expect(startResult).to.include('{ ok: 1 }');
168+
169+
// sleep for a bit to let the processor do stuff
170+
await sleep(500);
171+
172+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
173+
expect(stopResult).to.include('{ ok: 1 }');
174+
175+
const initialDocsWithNewField = await collection.countDocuments({
176+
[newField]: { $exists: true },
177+
});
178+
expect(initialDocsWithNewField).to.eq(0);
179+
180+
// define a new pipeline that will append our newField to the docs the stream
181+
// processor inserts into the database
182+
const sourceStage = {
183+
$source: {
184+
connectionName: 'sample_stream_solar',
185+
},
186+
};
187+
188+
const addFieldStage = {
189+
$addFields: {
190+
newField,
191+
},
192+
};
193+
194+
const mergeStage = {
195+
$merge: {
196+
into: {
197+
connectionName: 'testClusterConnection',
198+
db: processorName,
199+
coll: collection.collectionName,
200+
},
201+
},
202+
};
203+
204+
const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage];
205+
206+
const modifyResult = await shell.executeLine(
207+
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})`
208+
);
209+
expect(modifyResult).to.include('{ ok: 1 }');
210+
211+
const secondStartResult = await shell.executeLine(
212+
`sp.${processorName}.start()`
213+
);
214+
expect(secondStartResult).to.include('{ ok: 1 }');
215+
216+
// sleep again to let the processor work again with the updated pipeline
217+
await sleep(500);
218+
219+
const updatedDocsWithNewField = await collection.countDocuments({
220+
[newField]: { $exists: true },
221+
});
222+
expect(updatedDocsWithNewField).to.be.greaterThan(0);
223+
});
224+
225+
it('can view stats for a stream processor', async function () {
226+
const statsResult = await shell.executeLine(
227+
`sp.${processorName}.stats()`
228+
);
229+
expect(statsResult).to.include(`name: '${processorName}'`);
230+
expect(statsResult).to.include(`state: 'CREATED'`);
231+
expect(statsResult).to.include('stats: {');
232+
expect(statsResult).to.include(`pipeline: [`);
233+
expect(statsResult).to.include(
234+
`{ '$source': { connectionName: 'sample_stream_solar' } },`
235+
);
236+
});
237+
});
238+
239+
context('sampling from a running stream processor', function () {
240+
beforeEach(async function () {
241+
this.timeout(60_000);
242+
243+
shell = this.startTestShell({
244+
args: [
245+
STREAMS_E2E_SPI_CONNECTION_STRING,
246+
'--tls',
247+
'--authenticationDatabase admin',
248+
'--username',
249+
STREAMS_E2E_DB_USER,
250+
'--password',
251+
STREAMS_E2E_DB_PASSWORD,
252+
],
253+
removeSigintListeners: true,
254+
});
255+
await shell.waitForPromptOrExit({ timeout: 60_000 });
256+
});
257+
258+
it('should output streamed documents to the shell', async function () {
259+
if (process.platform === 'win32') {
260+
return this.skip(); // No SIGINT on Windows.
261+
}
262+
263+
// this processor is pre-defined on the cloud-dev test project
264+
// it reads from sample solar stream, appends a field with the processor name to each doc, and
265+
// inserts the docs into an Atlas collection
266+
const immortalProcessorName = 'immortalProcessor';
267+
268+
const sampleCall = shell.executeLine(
269+
`sp.${immortalProcessorName}.sample()`
270+
);
271+
setTimeout(() => shell.kill('SIGINT'), 3000);
272+
await sampleCall;
273+
274+
// data from the sample solar stream isn't deterministic, so just assert that
275+
// the processorName field appears in the shell output after sampling
276+
shell.assertContainsOutput(`processorName: '${immortalProcessorName}'`);
277+
});
278+
});
279+
280+
context(
281+
'creating an interactive stream processor with .process()',
282+
function () {
283+
let interactiveId = '';
284+
let db: Db;
285+
let collection: Collection<Document>;
286+
let client: MongoClient;
287+
288+
beforeEach(async function () {
289+
this.timeout(60_000);
290+
291+
shell = this.startTestShell({
292+
args: [
293+
STREAMS_E2E_SPI_CONNECTION_STRING,
294+
'--tls',
295+
'--authenticationDatabase admin',
296+
'--username',
297+
STREAMS_E2E_DB_USER,
298+
'--password',
299+
STREAMS_E2E_DB_PASSWORD,
300+
],
301+
removeSigintListeners: true,
302+
});
303+
await shell.waitForPromptOrExit({ timeout: 60_000 });
304+
305+
interactiveId = new bson.ObjectId().toHexString();
306+
client = await MongoClient.connect(
307+
STREAMS_E2E_CLUSTER_CONNECTION_STRING,
308+
{}
309+
);
310+
db = client.db(interactiveId);
311+
const collectionName = 'processedData';
312+
collection = db.collection(collectionName);
313+
});
314+
315+
afterEach(async function () {
316+
await db.dropDatabase();
317+
await client.close();
318+
});
319+
320+
it('should output streamed documents to the shell', async function () {
321+
if (process.platform === 'win32') {
322+
return this.skip(); // No SIGINT on Windows.
323+
}
324+
325+
// the pipeline for our interactive processor reads from sample solar stream, adds a
326+
// unique test id to each document, and inserts it into an Atlas collection
327+
const sourceStage = {
328+
$source: {
329+
connectionName: 'sample_stream_solar',
330+
},
331+
};
332+
333+
const addFieldStage = {
334+
$addFields: {
335+
interactiveId,
336+
},
337+
};
338+
339+
const mergeStage = {
340+
$merge: {
341+
into: {
342+
connectionName: 'testClusterConnection',
343+
db: db.databaseName,
344+
coll: collection.collectionName,
345+
},
346+
},
347+
};
348+
349+
const aggPipeline = [sourceStage, addFieldStage, mergeStage];
350+
351+
const processCall = shell.executeLine(
352+
`sp.process(${JSON.stringify(aggPipeline)})`
353+
);
354+
setTimeout(() => shell.kill('SIGINT'), 10000);
355+
await processCall;
356+
357+
// data from the sample solar stream isn't deterministic, so just assert that
358+
// the interactiveId field appears in the shell output after sampling
359+
shell.assertContainsOutput(`interactiveId: '${interactiveId}'`);
360+
});
361+
}
362+
);
363+
});

0 commit comments

Comments
 (0)