Skip to content

SWIFT-1191, SWIFT-1604: Switch to unified format change streams tests #772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 8 additions & 293 deletions Tests/MongoSwiftSyncTests/SyncChangeStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,309 +3,24 @@ import Nimble
import TestsCommon
import XCTest

/// The entity on which to start a change stream.
internal enum ChangeStreamTarget: String, Decodable {
/// Indicates the change stream will be opened to watch a client.
case client

/// Indicates the change stream will be opened to watch a database.
case database

/// Indicates the change stream will be opened to watch a collection.
case collection

/// Open a change stream against this target. An error will be thrown if the necessary namespace information is not
/// provided.
internal func watch(
_ client: MongoClient,
_ database: String?,
_ collection: String?,
_ pipeline: [BSONDocument],
_ options: ChangeStreamOptions
) throws -> ChangeStream<BSONDocument> {
switch self {
case .client:
return try client.watch(pipeline, options: options, withEventType: BSONDocument.self)
case .database:
guard let database = database else {
throw TestError(message: "missing db in watch")
}
return try client.db(database).watch(pipeline, options: options, withEventType: BSONDocument.self)
case .collection:
guard let collection = collection, let database = database else {
throw TestError(message: "missing db or collection in watch")
}
return try client.db(database)
.collection(collection)
.watch(pipeline, options: options, withEventType: BSONDocument.self)
}
}
}

/// An operation performed as part of a `ChangeStreamTest` (e.g. a CRUD operation, an drop, etc.)
/// This struct includes the namespace against which it should be run.
internal struct ChangeStreamTestOperation: Decodable {
/// The operation itself to run.
private let operation: AnyTestOperation

/// The database to run the operation against.
private let database: String

/// The collection to run the operation against.
private let collection: String

private enum CodingKeys: String, CodingKey {
case database, collection
}

public init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
self.database = try container.decode(String.self, forKey: .database)
self.collection = try container.decode(String.self, forKey: .collection)
self.operation = try AnyTestOperation(from: decoder)
}

/// Run the operation against the namespace associated with this operation.
internal func execute(using client: MongoClient) throws -> TestOperationResult? {
let db = client.db(self.database)
let coll = db.collection(self.collection)
return try self.operation.op.execute(on: coll, sessions: [:])
}
}

/// The outcome of a given `ChangeStreamTest`.
internal enum ChangeStreamTestResult: Decodable {
/// Describes an error received during the test
case error(code: Int, labels: [String]?)

/// An array of event documents expected to be received from the change stream without error during the test.
case success([BSONDocument])

/// Top-level coding keys. Used for determining whether this result is a success or failure.
internal enum CodingKeys: CodingKey {
case error, success
}

/// Coding keys used specifically for decoding the `.error` case.
internal enum ErrorCodingKeys: CodingKey {
case code, errorLabels
}

/// Asserts that the given error matches the one expected by this result.
internal func assertMatchesError(error: Error, description: String) {
guard case let .error(code, labels) = self else {
fail("\(description) failed: got error but result success")
return
}
guard let seenError = error as? MongoError.CommandError else {
fail("\(description) failed: didn't get command error")
return
}

expect(seenError.code).to(equal(code), description: description)
if let labels = labels {
expect(seenError.errorLabels).toNot(beNil(), description: description)
expect(seenError.errorLabels).to(equal(labels), description: description)
}
}

public init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
if container.contains(.success) {
self = .success(try container.decode([BSONDocument].self, forKey: .success))
} else {
let nested = try container.nestedContainer(keyedBy: ErrorCodingKeys.self, forKey: .error)
let code = try nested.decode(Int.self, forKey: .code)
let labels = try nested.decodeIfPresent([String].self, forKey: .errorLabels)
self = .error(code: code, labels: labels)
}
}
}

/// Struct representing a single test within a spec test JSON file.
internal struct ChangeStreamTest: Decodable, FailPointConfigured {
/// The title of this test.
let description: String

/// The minimum server version that this test can be run against.
let minServerVersion: ServerVersion

/// The maximum server version that this test can be run against.
let maxServerVersion: ServerVersion?

/// The fail point that should be set prior to running this test.
let failPoint: FailPoint?

/// The entity on which to run the change stream.
let target: ChangeStreamTarget

/// An array of server topologies against which to run the test.
let topology: [TestTopologyConfiguration]

/// An array of additional aggregation pipeline stages to pass to the `watch` used to create the change stream for
/// this test.
let changeStreamPipeline: [BSONDocument]

/// Additional options to pass to the `watch` used to create the change stream for this test.
let changeStreamOptions: ChangeStreamOptions

/// An array of documents, each describing an operation that should be run as part of this test.
let operations: [ChangeStreamTestOperation]

/// A list of command-started events that are expected to have been emitted by the client that starts the change
/// stream for this test.
let expectations: [TestCommandStartedEvent]?

// The expected result of running this test.
let result: ChangeStreamTestResult

var activeFailPoint: FailPoint?
var targetedHost: ServerAddress?

internal mutating func run(globalClient: MongoClient, database: String, collection: String) throws {
let client = try MongoClient.makeTestClient()
let monitor = client.addCommandMonitor()

if let failPoint = self.failPoint {
try failPoint.enable(using: globalClient)
}
defer { self.failPoint?.disable(using: globalClient) }

monitor.captureEvents {
do {
let changeStream = try self.target.watch(
client,
database,
collection,
self.changeStreamPipeline,
self.changeStreamOptions
)
for operation in self.operations {
_ = try operation.execute(using: globalClient)
}

switch self.result {
case .error:
_ = try changeStream.nextWithTimeout()
fail("\(self.description) failed: expected error but got none while iterating")
case let .success(events):
var seenEvents: [BSONDocument] = []
for _ in 0..<events.count {
guard let event = try changeStream.nextWithTimeout() else {
XCTFail("Unexpectedly got no event from change stream in test: \(self.description)")
return
}
seenEvents.append(event)
}
expect(seenEvents).to(match(events), description: self.description)
}
} catch {
self.result.assertMatchesError(error: error, description: self.description)
}
}

if let expectations = self.expectations {
let commandEvents = monitor.commandStartedEvents()
.filter { ![LEGACY_HELLO, "hello", "killCursors"].contains($0.commandName) }
.map { TestCommandStartedEvent(from: $0) }
expect(commandEvents).to(match(expectations), description: self.description)
}
}
}

/// Struct representing a single change-streams spec test JSON file.
private struct ChangeStreamTestFile: Decodable {
private enum CodingKeys: String, CodingKey {
case databaseName = "database_name",
collectionName = "collection_name",
database2Name = "database2_name",
collection2Name = "collection2_name",
tests
}

/// The default database.
let databaseName: String

/// The default collection.
let collectionName: String

/// Secondary database.
let database2Name: String?

// Secondary collection.
let collection2Name: String?

/// An array of tests that are to be run independently of each other.
let tests: [ChangeStreamTest]
}

/// Class covering the JSON spec tests associated with change streams.
final class ChangeStreamSpecTests: MongoSwiftTestCase {
func testChangeStreamSpec() throws {
let tests = try retrieveSpecTestFiles(
specName: "change-streams",
subdirectory: "legacy",
asType: ChangeStreamTestFile.self
)

let globalClient = try MongoClient.makeTestClient()

for (testName, testFile) in tests {
let db1 = globalClient.db(testFile.databaseName)
// only some test files use a second database.
let db2: MongoDatabase?
if let db2Name = testFile.database2Name {
db2 = globalClient.db(db2Name)
} else {
db2 = nil
}
defer {
try? db1.drop()
try? db2?.drop()
}
print("\n------------\nExecuting tests from file \(testName)...\n")
for var test in testFile.tests {
let testRequirements = TestRequirement(
minServerVersion: test.minServerVersion,
maxServerVersion: test.maxServerVersion,
acceptableTopologies: test.topology
)

let unmetRequirement = try globalClient.getUnmetRequirement(testRequirements)
guard unmetRequirement == nil else {
printSkipMessage(testName: test.description, unmetRequirement: unmetRequirement!)
continue
}

print("Executing test: \(test.description)")

try db1.drop()
try db2?.drop()
_ = try db1.createCollection(testFile.collectionName)
_ = try db2?.createCollection(testFile.collection2Name ?? "foo")

try test.run(
globalClient: globalClient,
database: testFile.databaseName,
collection: testFile.collectionName
)
}
}
}

final class SyncChangeStreamTests: MongoSwiftTestCase {
let excludeFiles = [
// TODO: SWIFT-1458 Unskip.
"change-streams-showExpandedEvents.json",
// TODO: SWIFT-1472 Unskip.
"change-streams-pre_and_post_images.json"
]
func testChangeStreamSpecUnified() throws {
let tests = try retrieveSpecTestFiles(
specName: "change-streams",
subdirectory: "unified",
excludeFiles: excludeFiles,
asType: UnifiedTestFile.self
).map { $0.1 }
let testRunner = try UnifiedTestRunner()
try testRunner.runFiles(tests)
}
}

/// Class for spec prose tests and other integration tests associated with change streams.
final class SyncChangeStreamTests: MongoSwiftTestCase {
/// How long in total a change stream should poll for an event or error before returning.
/// Used as a default value for `ChangeStream.nextWithTimeout`
public static let TIMEOUT: TimeInterval = 15
Expand Down
Loading