diff --git a/.evergreen.yml b/.evergreen.yml index af405a3e8..d63a908b2 100644 --- a/.evergreen.yml +++ b/.evergreen.yml @@ -214,6 +214,10 @@ functions: MONGOSH_RUN_ONLY_IN_PACKAGE: ${mongosh_run_only_in_package} AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key} AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret} + STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string} + STREAMS_E2E_DB_USER: ${streams_e2e_db_user} + STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password} + STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string} TASK_NAME: ${task_name} - command: s3.put params: @@ -3772,6 +3776,10 @@ functions: NODE_JS_VERSION: ${node_js_version} AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key} AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret} + STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string} + STREAMS_E2E_DB_USER: ${streams_e2e_db_user} + STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password} + STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string} DISABLE_OPENSSL_SHARED_CONFIG_FOR_BUNDLED_OPENSSL: ${disable_openssl_shared_config_for_bundled_openssl} TASK_NAME: ${task_name} diff --git a/packages/e2e-tests/test/e2e-streams.spec.ts b/packages/e2e-tests/test/e2e-streams.spec.ts new file mode 100644 index 000000000..12d1cc763 --- /dev/null +++ b/packages/e2e-tests/test/e2e-streams.spec.ts @@ -0,0 +1,334 @@ +import { bson } from '@mongosh/service-provider-core'; +import type { Db, Collection, Document } from '@mongosh/service-provider-core'; +import { MongoClient } from 'mongodb'; +import { expect } from 'chai'; +import type { TestShell } from './test-shell'; +import { ensureTestShellAfterHook } from './test-shell-context'; +import { sleep } from './util-helpers'; +import { eventually } from '../../../testing/eventually'; + +const { + STREAMS_E2E_SPI_CONNECTION_STRING = '', + STREAMS_E2E_DB_USER = '', + STREAMS_E2E_DB_PASSWORD = '', + STREAMS_E2E_CLUSTER_CONNECTION_STRING = '', +} = process.env; + +describe('e2e Streams', function () { + this.timeout(60_000); + let shell: TestShell; + + before(function () { + if (!STREAMS_E2E_SPI_CONNECTION_STRING) { + console.error( + 'Stream Instance connection string not found - skipping Streams E2E tests...' + ); + return this.skip(); + } + + if (!STREAMS_E2E_CLUSTER_CONNECTION_STRING) { + console.error( + 'Cluster connection string not found - skipping Streams E2E tests...' + ); + return this.skip(); + } + + if (!STREAMS_E2E_DB_USER) { + console.error( + 'Atlas database user for Stream Processing not found - skipping Streams E2E tests...' + ); + return this.skip(); + } + + if (!STREAMS_E2E_DB_PASSWORD) { + console.error( + 'Password for Atlas database user not found - skipping Streams E2E tests...' + ); + return this.skip(); + } + }); + + context('basic stream processor operations', function () { + let processorName = ''; + let db: Db; + let collection: Collection; + let client: MongoClient; + + beforeEach(async function () { + shell = this.startTestShell({ + args: [ + STREAMS_E2E_SPI_CONNECTION_STRING, + '--tls', + '--authenticationDatabase admin', + '--username', + STREAMS_E2E_DB_USER, + '--password', + STREAMS_E2E_DB_PASSWORD, + ], + removeSigintListeners: true, + }); + await shell.waitForPromptOrExit({ timeout: 60_000 }); + + processorName = `spi${new bson.ObjectId().toHexString()}`; + client = await MongoClient.connect( + STREAMS_E2E_CLUSTER_CONNECTION_STRING, + {} + ); + db = client.db(processorName); + const collectionName = 'processedData'; + collection = db.collection(collectionName); + + // this stream processor reads from the sample stream and inserts documents into an Atlas database + const sourceStage = { + $source: { + connectionName: 'sample_stream_solar', + }, + }; + + const mergeStage = { + $merge: { + into: { + connectionName: 'testClusterConnection', + db: processorName, + coll: collectionName, + }, + }, + }; + + const aggPipeline = [sourceStage, mergeStage]; + + const createResult = await shell.executeLine( + `sp.createStreamProcessor("${processorName}", ${JSON.stringify( + aggPipeline + )})` + ); + expect(createResult).to.include( + `Atlas Stream Processor: ${processorName}` + ); + }); + + afterEach(async function () { + try { + await db.dropDatabase(); + await client.close(); + + const result = await shell.executeLine(`sp.${processorName}.drop()`); + expect(result).to.include(`{ ok: 1 }`); + } catch (err: any) { + console.error( + `Could not clean up stream processor ${processorName}:`, + err + ); + } + }); + + ensureTestShellAfterHook('afterEach', this); + + it('can list stream processors', async function () { + const listResult = await shell.executeLine(`sp.listStreamProcessors()`); + // make sure the processor created in the beforeEach is present + expect(listResult).to.include(`name: '${processorName}'`); + }); + + it('can start and stop a stream processor', async function () { + // this should be a unique collection for this test run, so no data to start + const initialDocsCount = await collection.countDocuments(); + expect(initialDocsCount).to.eq(0); + + const startResult = await shell.executeLine( + `sp.${processorName}.start()` + ); + expect(startResult).to.include('{ ok: 1 }'); + + // sleep for a bit to let the processor do stuff + await sleep(500); + + const stopResult = await shell.executeLine(`sp.${processorName}.stop()`); + expect(stopResult).to.include('{ ok: 1 }'); + + const updatedDocCount = await collection.countDocuments(); + expect(updatedDocCount).to.be.greaterThan(0); + + // sleep again to make sure the processor isn't doing any more inserts + await sleep(500); + + const countAfterStopping = await collection.countDocuments(); + expect(countAfterStopping).to.eq(updatedDocCount); + }); + + it(`can modify an existing stream processor's pipeline`, async function () { + // this field is not present on any docs emit by the stream processor + // created in the beforeEach + const newField = 'newField'; + + const startResult = await shell.executeLine( + `sp.${processorName}.start()` + ); + expect(startResult).to.include('{ ok: 1 }'); + + // sleep for a bit to let the processor do stuff + await sleep(500); + + const stopResult = await shell.executeLine(`sp.${processorName}.stop()`); + expect(stopResult).to.include('{ ok: 1 }'); + + const initialDocsWithNewField = await collection.countDocuments({ + [newField]: { $exists: true }, + }); + expect(initialDocsWithNewField).to.eq(0); + + // define a new pipeline that will append our newField to the docs the stream + // processor inserts into the database + const sourceStage = { + $source: { + connectionName: 'sample_stream_solar', + }, + }; + + const addFieldStage = { + $addFields: { + newField, + }, + }; + + const mergeStage = { + $merge: { + into: { + connectionName: 'testClusterConnection', + db: processorName, + coll: collection.collectionName, + }, + }, + }; + + const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage]; + + const modifyResult = await shell.executeLine( + `sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})` + ); + expect(modifyResult).to.include('{ ok: 1 }'); + + const secondStartResult = await shell.executeLine( + `sp.${processorName}.start()` + ); + expect(secondStartResult).to.include('{ ok: 1 }'); + + // sleep again to let the processor work again with the updated pipeline + await sleep(500); + + const updatedDocsWithNewField = await collection.countDocuments({ + [newField]: { $exists: true }, + }); + expect(updatedDocsWithNewField).to.be.greaterThan(0); + }); + + it('can view stats for a stream processor', async function () { + const statsResult = await shell.executeLine( + `sp.${processorName}.stats()` + ); + expect(statsResult).to.include(`name: '${processorName}'`); + expect(statsResult).to.include(`state: 'CREATED'`); + expect(statsResult).to.include('stats: {'); + expect(statsResult).to.include(`pipeline: [`); + expect(statsResult).to.include( + `{ '$source': { connectionName: 'sample_stream_solar' } },` + ); + }); + }); + + context('sampling from a running stream processor', function () { + beforeEach(async function () { + shell = this.startTestShell({ + args: [ + STREAMS_E2E_SPI_CONNECTION_STRING, + '--tls', + '--authenticationDatabase admin', + '--username', + STREAMS_E2E_DB_USER, + '--password', + STREAMS_E2E_DB_PASSWORD, + ], + removeSigintListeners: true, + }); + await shell.waitForPromptOrExit({ timeout: 60_000 }); + }); + + it('should output streamed documents to the shell', async function () { + // this processor is pre-defined on the cloud-dev test project + // it reads from sample solar stream, appends a field with the processor name to each doc, and + // inserts the docs into an Atlas collection + const immortalProcessorName = 'immortalProcessor'; + + shell.writeInputLine(`sp.${immortalProcessorName}.sample()`); + // data from the sample solar stream isn't deterministic, so just assert that + // the processorName field appears in the shell output after sampling + await eventually(() => { + shell.assertContainsOutput(`processorName: '${immortalProcessorName}'`); + }); + }); + }); + + context( + 'creating an interactive stream processor with .process()', + function () { + let interactiveId = ''; + const collectionName = 'processedData'; + + beforeEach(async function () { + shell = this.startTestShell({ + args: [ + STREAMS_E2E_SPI_CONNECTION_STRING, + '--tls', + '--authenticationDatabase admin', + '--username', + STREAMS_E2E_DB_USER, + '--password', + STREAMS_E2E_DB_PASSWORD, + ], + removeSigintListeners: true, + }); + await shell.waitForPromptOrExit({ timeout: 60_000 }); + + interactiveId = new bson.ObjectId().toHexString(); + }); + + it('should output streamed documents to the shell', async function () { + // the pipeline for our interactive processor reads from sample solar stream, adds a + // unique test id to each document, and inserts it into an Atlas collection + const sourceStage = { + $source: { + connectionName: 'sample_stream_solar', + }, + }; + + const addFieldStage = { + $addFields: { + interactiveId, + }, + }; + + const mergeStage = { + $merge: { + into: { + connectionName: 'testClusterConnection', + db: interactiveId, + coll: collectionName, + }, + }, + }; + + const aggPipeline = [sourceStage, addFieldStage, mergeStage]; + + shell.writeInputLine(`sp.process(${JSON.stringify(aggPipeline)})`); + // data from the sample solar stream isn't deterministic, so just assert that + // the interactiveId field appears in the shell output after sampling + await eventually( + () => { + shell.assertContainsOutput(`interactiveId: '${interactiveId}'`); + }, + { timeout: 60_000 } + ); + }); + } + ); +});