Skip to content

Commit 9c21f46

Browse files
committed
test: add success e2e tests for stream processing commands MONGOSH-2127
1 parent 5238d59 commit 9c21f46

File tree

2 files changed

+339
-0
lines changed

2 files changed

+339
-0
lines changed

.evergreen.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ functions:
214214
MONGOSH_RUN_ONLY_IN_PACKAGE: ${mongosh_run_only_in_package}
215215
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
216216
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
217+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
218+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
219+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
220+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
217221
TASK_NAME: ${task_name}
218222
- command: s3.put
219223
params:
@@ -3772,6 +3776,10 @@ functions:
37723776
NODE_JS_VERSION: ${node_js_version}
37733777
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
37743778
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
3779+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
3780+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
3781+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
3782+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
37753783
DISABLE_OPENSSL_SHARED_CONFIG_FOR_BUNDLED_OPENSSL: ${disable_openssl_shared_config_for_bundled_openssl}
37763784
TASK_NAME: ${task_name}
37773785

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
import { bson } from '@mongosh/service-provider-core';
2+
import type { Db, Collection, Document } from '@mongosh/service-provider-core';
3+
import { MongoClient } from 'mongodb';
4+
import { expect } from 'chai';
5+
import type { TestShell } from './test-shell';
6+
import { ensureTestShellAfterHook } from './test-shell-context';
7+
import { sleep } from './util-helpers';
8+
import { eventually } from '../../../testing/eventually';
9+
10+
const {
11+
STREAMS_E2E_SPI_CONNECTION_STRING = '',
12+
STREAMS_E2E_DB_USER = '',
13+
STREAMS_E2E_DB_PASSWORD = '',
14+
STREAMS_E2E_CLUSTER_CONNECTION_STRING = '',
15+
} = process.env;
16+
17+
describe('e2e Streams', function () {
18+
this.timeout(60_000);
19+
let shell: TestShell;
20+
21+
before(function () {
22+
if (!STREAMS_E2E_SPI_CONNECTION_STRING) {
23+
console.error(
24+
'Stream Instance connection string not found - skipping Streams E2E tests...'
25+
);
26+
return this.skip();
27+
}
28+
29+
if (!STREAMS_E2E_CLUSTER_CONNECTION_STRING) {
30+
console.error(
31+
'Cluster connection string not found - skipping Streams E2E tests...'
32+
);
33+
return this.skip();
34+
}
35+
36+
if (!STREAMS_E2E_DB_USER) {
37+
console.error(
38+
'Atlas database user for Stream Processing not found - skipping Streams E2E tests...'
39+
);
40+
return this.skip();
41+
}
42+
43+
if (!STREAMS_E2E_DB_PASSWORD) {
44+
console.error(
45+
'Password for Atlas database user not found - skipping Streams E2E tests...'
46+
);
47+
return this.skip();
48+
}
49+
});
50+
51+
context('basic stream processor operations', function () {
52+
let processorName = '';
53+
let db: Db;
54+
let collection: Collection<Document>;
55+
let client: MongoClient;
56+
57+
beforeEach(async function () {
58+
shell = this.startTestShell({
59+
args: [
60+
STREAMS_E2E_SPI_CONNECTION_STRING,
61+
'--tls',
62+
'--authenticationDatabase admin',
63+
'--username',
64+
STREAMS_E2E_DB_USER,
65+
'--password',
66+
STREAMS_E2E_DB_PASSWORD,
67+
],
68+
removeSigintListeners: true,
69+
});
70+
await shell.waitForPromptOrExit({ timeout: 60_000 });
71+
72+
processorName = `spi${new bson.ObjectId().toHexString()}`;
73+
client = await MongoClient.connect(
74+
STREAMS_E2E_CLUSTER_CONNECTION_STRING,
75+
{}
76+
);
77+
db = client.db(processorName);
78+
const collectionName = 'processedData';
79+
collection = db.collection(collectionName);
80+
81+
// this stream processor reads from the sample stream and inserts documents into an Atlas database
82+
const sourceStage = {
83+
$source: {
84+
connectionName: 'sample_stream_solar',
85+
},
86+
};
87+
88+
const mergeStage = {
89+
$merge: {
90+
into: {
91+
connectionName: 'testClusterConnection',
92+
db: processorName,
93+
coll: collectionName,
94+
},
95+
},
96+
};
97+
98+
const aggPipeline = [sourceStage, mergeStage];
99+
100+
const createResult = await shell.executeLine(
101+
`sp.createStreamProcessor("${processorName}", ${JSON.stringify(
102+
aggPipeline
103+
)})`
104+
);
105+
expect(createResult).to.include(
106+
`Atlas Stream Processor: ${processorName}`
107+
);
108+
});
109+
110+
afterEach(async function () {
111+
try {
112+
await db.dropDatabase();
113+
await client.close();
114+
115+
const result = await shell.executeLine(`sp.${processorName}.drop()`);
116+
expect(result).to.include(`{ ok: 1 }`);
117+
} catch (err: any) {
118+
console.error(
119+
`Could not clean up stream processor ${processorName}:`,
120+
err
121+
);
122+
}
123+
});
124+
125+
ensureTestShellAfterHook('afterEach', this);
126+
127+
it('can list stream processors', async function () {
128+
const listResult = await shell.executeLine(`sp.listStreamProcessors()`);
129+
// make sure the processor created in the beforeEach is present
130+
expect(listResult).to.include(`name: '${processorName}'`);
131+
});
132+
133+
it('can start and stop a stream processor', async function () {
134+
// this should be a unique collection for this test run, so no data to start
135+
const initialDocsCount = await collection.countDocuments();
136+
expect(initialDocsCount).to.eq(0);
137+
138+
const startResult = await shell.executeLine(
139+
`sp.${processorName}.start()`
140+
);
141+
expect(startResult).to.include('{ ok: 1 }');
142+
143+
// sleep for a bit to let the processor do stuff
144+
await sleep(500);
145+
146+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
147+
expect(stopResult).to.include('{ ok: 1 }');
148+
149+
const updatedDocCount = await collection.countDocuments();
150+
expect(updatedDocCount).to.be.greaterThan(0);
151+
152+
// sleep again to make sure the processor isn't doing any more inserts
153+
await sleep(500);
154+
155+
const countAfterStopping = await collection.countDocuments();
156+
expect(countAfterStopping).to.eq(updatedDocCount);
157+
});
158+
159+
it(`can modify an existing stream processor's pipeline`, async function () {
160+
// this field is not present on any docs emit by the stream processor
161+
// created in the beforeEach
162+
const newField = 'newField';
163+
164+
const startResult = await shell.executeLine(
165+
`sp.${processorName}.start()`
166+
);
167+
expect(startResult).to.include('{ ok: 1 }');
168+
169+
// sleep for a bit to let the processor do stuff
170+
await sleep(500);
171+
172+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
173+
expect(stopResult).to.include('{ ok: 1 }');
174+
175+
const initialDocsWithNewField = await collection.countDocuments({
176+
[newField]: { $exists: true },
177+
});
178+
expect(initialDocsWithNewField).to.eq(0);
179+
180+
// define a new pipeline that will append our newField to the docs the stream
181+
// processor inserts into the database
182+
const sourceStage = {
183+
$source: {
184+
connectionName: 'sample_stream_solar',
185+
},
186+
};
187+
188+
const addFieldStage = {
189+
$addFields: {
190+
newField,
191+
},
192+
};
193+
194+
const mergeStage = {
195+
$merge: {
196+
into: {
197+
connectionName: 'testClusterConnection',
198+
db: processorName,
199+
coll: collection.collectionName,
200+
},
201+
},
202+
};
203+
204+
const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage];
205+
206+
const modifyResult = await shell.executeLine(
207+
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})`
208+
);
209+
expect(modifyResult).to.include('{ ok: 1 }');
210+
211+
const secondStartResult = await shell.executeLine(
212+
`sp.${processorName}.start()`
213+
);
214+
expect(secondStartResult).to.include('{ ok: 1 }');
215+
216+
// sleep again to let the processor work again with the updated pipeline
217+
await sleep(500);
218+
219+
const updatedDocsWithNewField = await collection.countDocuments({
220+
[newField]: { $exists: true },
221+
});
222+
expect(updatedDocsWithNewField).to.be.greaterThan(0);
223+
});
224+
225+
it('can view stats for a stream processor', async function () {
226+
const statsResult = await shell.executeLine(
227+
`sp.${processorName}.stats()`
228+
);
229+
expect(statsResult).to.include(`name: '${processorName}'`);
230+
expect(statsResult).to.include(`state: 'CREATED'`);
231+
expect(statsResult).to.include('stats: {');
232+
expect(statsResult).to.include(`pipeline: [`);
233+
expect(statsResult).to.include(
234+
`{ '$source': { connectionName: 'sample_stream_solar' } },`
235+
);
236+
});
237+
});
238+
239+
context('sampling from a running stream processor', function () {
240+
beforeEach(async function () {
241+
shell = this.startTestShell({
242+
args: [
243+
STREAMS_E2E_SPI_CONNECTION_STRING,
244+
'--tls',
245+
'--authenticationDatabase admin',
246+
'--username',
247+
STREAMS_E2E_DB_USER,
248+
'--password',
249+
STREAMS_E2E_DB_PASSWORD,
250+
],
251+
removeSigintListeners: true,
252+
});
253+
await shell.waitForPromptOrExit({ timeout: 60_000 });
254+
});
255+
256+
it('should output streamed documents to the shell', async function () {
257+
// this processor is pre-defined on the cloud-dev test project
258+
// it reads from sample solar stream, appends a field with the processor name to each doc, and
259+
// inserts the docs into an Atlas collection
260+
const immortalProcessorName = 'immortalProcessor';
261+
262+
shell.writeInputLine(`sp.${immortalProcessorName}.sample()`);
263+
// data from the sample solar stream isn't deterministic, so just assert that
264+
// the processorName field appears in the shell output after sampling
265+
await eventually(() => {
266+
shell.assertContainsOutput(`processorName: '${immortalProcessorName}'`);
267+
});
268+
});
269+
});
270+
271+
context(
272+
'creating an interactive stream processor with .process()',
273+
function () {
274+
let interactiveId = '';
275+
const collectionName = 'processedData';
276+
277+
beforeEach(async function () {
278+
shell = this.startTestShell({
279+
args: [
280+
STREAMS_E2E_SPI_CONNECTION_STRING,
281+
'--tls',
282+
'--authenticationDatabase admin',
283+
'--username',
284+
STREAMS_E2E_DB_USER,
285+
'--password',
286+
STREAMS_E2E_DB_PASSWORD,
287+
],
288+
removeSigintListeners: true,
289+
});
290+
await shell.waitForPromptOrExit({ timeout: 60_000 });
291+
292+
interactiveId = new bson.ObjectId().toHexString();
293+
});
294+
295+
it('should output streamed documents to the shell', async function () {
296+
// the pipeline for our interactive processor reads from sample solar stream, adds a
297+
// unique test id to each document, and inserts it into an Atlas collection
298+
const sourceStage = {
299+
$source: {
300+
connectionName: 'sample_stream_solar',
301+
},
302+
};
303+
304+
const addFieldStage = {
305+
$addFields: {
306+
interactiveId,
307+
},
308+
};
309+
310+
const mergeStage = {
311+
$merge: {
312+
into: {
313+
connectionName: 'testClusterConnection',
314+
db: interactiveId,
315+
coll: collectionName,
316+
},
317+
},
318+
};
319+
320+
const aggPipeline = [sourceStage, addFieldStage, mergeStage];
321+
322+
shell.writeInputLine(`sp.process(${JSON.stringify(aggPipeline)})`);
323+
// data from the sample solar stream isn't deterministic, so just assert that
324+
// the interactiveId field appears in the shell output after sampling
325+
await eventually(() => {
326+
shell.assertContainsOutput(`interactiveId: '${interactiveId}'`);
327+
});
328+
});
329+
}
330+
);
331+
});

0 commit comments

Comments
 (0)