Skip to content

WIP test: add success e2e tests for stream processing commands MONGOSH-2127 #2459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .evergreen.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ functions:
MONGOSH_RUN_ONLY_IN_PACKAGE: ${mongosh_run_only_in_package}
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
TASK_NAME: ${task_name}
- command: s3.put
params:
Expand Down Expand Up @@ -3772,6 +3776,10 @@ functions:
NODE_JS_VERSION: ${node_js_version}
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
DISABLE_OPENSSL_SHARED_CONFIG_FOR_BUNDLED_OPENSSL: ${disable_openssl_shared_config_for_bundled_openssl}
TASK_NAME: ${task_name}

Expand Down
334 changes: 334 additions & 0 deletions packages/e2e-tests/test/e2e-streams.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
import { bson } from '@mongosh/service-provider-core';
import type { Db, Collection, Document } from '@mongosh/service-provider-core';
import { MongoClient } from 'mongodb';
import { expect } from 'chai';
import type { TestShell } from './test-shell';
import { ensureTestShellAfterHook } from './test-shell-context';
import { sleep } from './util-helpers';
import { eventually } from '../../../testing/eventually';

const {
STREAMS_E2E_SPI_CONNECTION_STRING = '',
STREAMS_E2E_DB_USER = '',
STREAMS_E2E_DB_PASSWORD = '',
STREAMS_E2E_CLUSTER_CONNECTION_STRING = '',
} = process.env;

describe('e2e Streams', function () {
this.timeout(60_000);
let shell: TestShell;

before(function () {
if (!STREAMS_E2E_SPI_CONNECTION_STRING) {
console.error(
'Stream Instance connection string not found - skipping Streams E2E tests...'
);
return this.skip();
}

if (!STREAMS_E2E_CLUSTER_CONNECTION_STRING) {
console.error(
'Cluster connection string not found - skipping Streams E2E tests...'
);
return this.skip();
}

if (!STREAMS_E2E_DB_USER) {
console.error(
'Atlas database user for Stream Processing not found - skipping Streams E2E tests...'
);
return this.skip();
}

if (!STREAMS_E2E_DB_PASSWORD) {
console.error(
'Password for Atlas database user not found - skipping Streams E2E tests...'
);
return this.skip();
}
});

context('basic stream processor operations', function () {
let processorName = '';
let db: Db;
let collection: Collection<Document>;
let client: MongoClient;

beforeEach(async function () {
shell = this.startTestShell({
args: [
STREAMS_E2E_SPI_CONNECTION_STRING,
'--tls',
'--authenticationDatabase admin',
'--username',
STREAMS_E2E_DB_USER,
'--password',
STREAMS_E2E_DB_PASSWORD,
],
removeSigintListeners: true,
});
await shell.waitForPromptOrExit({ timeout: 60_000 });

processorName = `spi${new bson.ObjectId().toHexString()}`;
client = await MongoClient.connect(
STREAMS_E2E_CLUSTER_CONNECTION_STRING,
{}
);
db = client.db(processorName);
const collectionName = 'processedData';
collection = db.collection(collectionName);

// this stream processor reads from the sample stream and inserts documents into an Atlas database
const sourceStage = {
$source: {
connectionName: 'sample_stream_solar',
},
};

const mergeStage = {
$merge: {
into: {
connectionName: 'testClusterConnection',
db: processorName,
coll: collectionName,
},
},
};

const aggPipeline = [sourceStage, mergeStage];

const createResult = await shell.executeLine(
`sp.createStreamProcessor("${processorName}", ${JSON.stringify(
aggPipeline
)})`
);
expect(createResult).to.include(
`Atlas Stream Processor: ${processorName}`
);
});

afterEach(async function () {
try {
await db.dropDatabase();
await client.close();

const result = await shell.executeLine(`sp.${processorName}.drop()`);
expect(result).to.include(`{ ok: 1 }`);
} catch (err: any) {
console.error(
`Could not clean up stream processor ${processorName}:`,
err
);
}
});

ensureTestShellAfterHook('afterEach', this);

it('can list stream processors', async function () {
const listResult = await shell.executeLine(`sp.listStreamProcessors()`);
// make sure the processor created in the beforeEach is present
expect(listResult).to.include(`name: '${processorName}'`);
});

it('can start and stop a stream processor', async function () {
// this should be a unique collection for this test run, so no data to start
const initialDocsCount = await collection.countDocuments();
expect(initialDocsCount).to.eq(0);

const startResult = await shell.executeLine(
`sp.${processorName}.start()`
);
expect(startResult).to.include('{ ok: 1 }');

// sleep for a bit to let the processor do stuff
await sleep(500);

const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
expect(stopResult).to.include('{ ok: 1 }');

const updatedDocCount = await collection.countDocuments();
expect(updatedDocCount).to.be.greaterThan(0);

// sleep again to make sure the processor isn't doing any more inserts
await sleep(500);

const countAfterStopping = await collection.countDocuments();
expect(countAfterStopping).to.eq(updatedDocCount);
});

it(`can modify an existing stream processor's pipeline`, async function () {
// this field is not present on any docs emit by the stream processor
// created in the beforeEach
const newField = 'newField';

const startResult = await shell.executeLine(
`sp.${processorName}.start()`
);
expect(startResult).to.include('{ ok: 1 }');

// sleep for a bit to let the processor do stuff
await sleep(500);

const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
expect(stopResult).to.include('{ ok: 1 }');

const initialDocsWithNewField = await collection.countDocuments({
[newField]: { $exists: true },
});
expect(initialDocsWithNewField).to.eq(0);

// define a new pipeline that will append our newField to the docs the stream
// processor inserts into the database
const sourceStage = {
$source: {
connectionName: 'sample_stream_solar',
},
};

const addFieldStage = {
$addFields: {
newField,
},
};

const mergeStage = {
$merge: {
into: {
connectionName: 'testClusterConnection',
db: processorName,
coll: collection.collectionName,
},
},
};

const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage];

const modifyResult = await shell.executeLine(
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})`
);
expect(modifyResult).to.include('{ ok: 1 }');

const secondStartResult = await shell.executeLine(
`sp.${processorName}.start()`
);
expect(secondStartResult).to.include('{ ok: 1 }');

// sleep again to let the processor work again with the updated pipeline
await sleep(500);

const updatedDocsWithNewField = await collection.countDocuments({
[newField]: { $exists: true },
});
expect(updatedDocsWithNewField).to.be.greaterThan(0);
});

it('can view stats for a stream processor', async function () {
const statsResult = await shell.executeLine(
`sp.${processorName}.stats()`
);
expect(statsResult).to.include(`name: '${processorName}'`);
expect(statsResult).to.include(`state: 'CREATED'`);
expect(statsResult).to.include('stats: {');
expect(statsResult).to.include(`pipeline: [`);
expect(statsResult).to.include(
`{ '$source': { connectionName: 'sample_stream_solar' } },`
);
});
});

context('sampling from a running stream processor', function () {
beforeEach(async function () {
shell = this.startTestShell({
args: [
STREAMS_E2E_SPI_CONNECTION_STRING,
'--tls',
'--authenticationDatabase admin',
'--username',
STREAMS_E2E_DB_USER,
'--password',
STREAMS_E2E_DB_PASSWORD,
],
removeSigintListeners: true,
});
await shell.waitForPromptOrExit({ timeout: 60_000 });
});

it('should output streamed documents to the shell', async function () {
// this processor is pre-defined on the cloud-dev test project
// it reads from sample solar stream, appends a field with the processor name to each doc, and
// inserts the docs into an Atlas collection
const immortalProcessorName = 'immortalProcessor';

shell.writeInputLine(`sp.${immortalProcessorName}.sample()`);
// data from the sample solar stream isn't deterministic, so just assert that
// the processorName field appears in the shell output after sampling
await eventually(() => {
shell.assertContainsOutput(`processorName: '${immortalProcessorName}'`);
});
});
});

context(
'creating an interactive stream processor with .process()',
function () {
let interactiveId = '';
const collectionName = 'processedData';

beforeEach(async function () {
shell = this.startTestShell({
args: [
STREAMS_E2E_SPI_CONNECTION_STRING,
'--tls',
'--authenticationDatabase admin',
'--username',
STREAMS_E2E_DB_USER,
'--password',
STREAMS_E2E_DB_PASSWORD,
],
removeSigintListeners: true,
});
await shell.waitForPromptOrExit({ timeout: 60_000 });

interactiveId = new bson.ObjectId().toHexString();
});

it('should output streamed documents to the shell', async function () {
// the pipeline for our interactive processor reads from sample solar stream, adds a
// unique test id to each document, and inserts it into an Atlas collection
const sourceStage = {
$source: {
connectionName: 'sample_stream_solar',
},
};

const addFieldStage = {
$addFields: {
interactiveId,
},
};

const mergeStage = {
$merge: {
into: {
connectionName: 'testClusterConnection',
db: interactiveId,
coll: collectionName,
},
},
};

const aggPipeline = [sourceStage, addFieldStage, mergeStage];

shell.writeInputLine(`sp.process(${JSON.stringify(aggPipeline)})`);
// data from the sample solar stream isn't deterministic, so just assert that
// the interactiveId field appears in the shell output after sampling
await eventually(
() => {
shell.assertContainsOutput(`interactiveId: '${interactiveId}'`);
},
{ timeout: 60_000 }
);
});
}
);
});