Skip to content

Commit 41ec4d6

Browse files
authored
SWIFT-1610, SWIFT-1378, SWIFT-1632: Spec tests sync + expose clusterTime on ChangeStreamEvent (#775)
1 parent 237452e commit 41ec4d6

20 files changed

+1546
-19
lines changed

Sources/MongoSwift/ChangeStreamEvent.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ public struct ChangeStreamEvent<T: Codable>: Codable {
122122
/// Only present for server versions 6.0 and above.
123123
public let wallTime: Date?
124124

125+
/// The cluster time at which the change occurred. Only present for server versions 4.0 and above.
126+
public let clusterTime: BSONTimestamp?
127+
125128
/**
126129
* Always present for operations of type `insert` and `replace`. Also present for operations of type `update` if
127130
* the user has specified `.updateLookup` for the `fullDocument` option in the `ChangeStreamOptions` used to create
@@ -136,7 +139,7 @@ public struct ChangeStreamEvent<T: Codable>: Codable {
136139
public let fullDocument: T?
137140

138141
private enum CodingKeys: String, CodingKey {
139-
case operationType, _id, ns, to, documentKey, updateDescription, wallTime, fullDocument
142+
case operationType, _id, ns, to, documentKey, updateDescription, wallTime, clusterTime, fullDocument
140143
}
141144

142145
// Custom decode method to work around the fact that `invalidate` events do not have an `ns` field in the raw
@@ -164,6 +167,7 @@ public struct ChangeStreamEvent<T: Codable>: Codable {
164167

165168
self.documentKey = try container.decodeIfPresent(BSONDocument.self, forKey: .documentKey)
166169
self.wallTime = try container.decodeIfPresent(Date.self, forKey: .wallTime)
170+
self.clusterTime = try container.decodeIfPresent(BSONTimestamp.self, forKey: .clusterTime)
167171
self.updateDescription = try container.decodeIfPresent(UpdateDescription.self, forKey: .updateDescription)
168172
self.fullDocument = try container.decodeIfPresent(T.self, forKey: .fullDocument)
169173
}

Sources/TestsCommon/APMUtils.swift

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,11 @@ public class TestCommandMonitor: CommandEventHandler {
8181
}
8282

8383
public enum EventType: String, Decodable {
84-
case commandStartedEvent, commandSucceededEvent, commandFailedEvent,
85-
connectionCreatedEvent, connectionReadyEvent, connectionClosedEvent,
86-
connectionCheckedInEvent, connectionCheckedOutEvent, connectionCheckOutFailedEvent,
87-
poolCreatedEvent, poolReadyEvent, poolClearedEvent, poolClosedEvent,
88-
topologyDescriptionChanged, topologyOpening, topologyClosed, serverDescriptionChanged,
89-
serverOpening, serverClosed, serverHeartbeatStarted, serverHeartbeatSucceeded,
90-
serverHeartbeatFailed
84+
case commandStartedEvent, commandSucceededEvent, commandFailedEvent, connectionCreatedEvent, connectionReadyEvent,
85+
connectionClosedEvent, connectionCheckedInEvent, connectionCheckOutStartedEvent, connectionCheckedOutEvent,
86+
connectionCheckOutFailedEvent, poolCreatedEvent, poolReadyEvent, poolClearedEvent, poolClosedEvent,
87+
topologyDescriptionChanged, topologyOpening, topologyClosed, serverDescriptionChanged, serverOpening,
88+
serverClosed, serverHeartbeatStarted, serverHeartbeatSucceeded, serverHeartbeatFailed
9189
}
9290

9391
extension CommandEvent {

Tests/MongoSwiftTests/AsyncAwaitTestUtils.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ extension MongoClient {
165165
}
166166

167167
internal func getUnmetRequirement(_ testRequirement: TestRequirement) async throws -> UnmetRequirement? {
168-
async let topologyType = try self.topologyType()
169-
async let serverVersion = try self.serverVersion()
170-
async let params = try self.serverParameters()
168+
async let topologyType = try await self.topologyType()
169+
async let serverVersion = try await self.serverVersion()
170+
async let params = try await self.serverParameters()
171171
return try await testRequirement.getUnmetRequirement(givenCurrent: serverVersion, topologyType, params)
172172
}
173173

Tests/MongoSwiftTests/AsyncAwaitTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ final class MongoCursorAsyncAwaitTests: MongoSwiftTestCase {
234234
}
235235

236236
testAsync {
237-
let opts = CreateCollectionOptions(capped: true, size: 5)
237+
let opts = CreateCollectionOptions(capped: true, max: 5, size: 100_000)
238238
try await self.withTestNamespace(collectionOptions: opts) { _, _, coll in
239239
try await coll.insertMany([["x": 1], ["x": 2], ["x": 3]])
240240

Tests/MongoSwiftTests/ChangeStreamTests.swift

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ final class ChangeStreamTests: MongoSwiftTestCase {
171171
// TODO: SWIFT-1458 Unskip.
172172
"change-streams-showExpandedEvents.json",
173173
// TODO: SWIFT-1472 Unskip.
174-
"change-streams-pre_and_post_images.json"
174+
"change-streams-pre_and_post_images.json",
175+
// TODO: SWIFT-1614 Unskip.
176+
"change-streams-disambiguatedPaths.json"
175177
]
176178
let tests = try retrieveSpecTestFiles(
177179
specName: "change-streams",
@@ -182,5 +184,29 @@ final class ChangeStreamTests: MongoSwiftTestCase {
182184
let testRunner = try await UnifiedTestRunner()
183185
try await testRunner.runFiles(tests)
184186
}
187+
188+
func testClusterTime() async throws {
189+
try await self.withTestClient { client in
190+
// cluster time is only included as of 4.0.
191+
let requirement = TestRequirement(
192+
minServerVersion: ServerVersion(major: 4, minor: 0, patch: 0),
193+
acceptableTopologies: [.replicaSet, .sharded, .shardedReplicaSet, .loadBalanced]
194+
)
195+
let unmetRequirement = try await client.getUnmetRequirement(requirement)
196+
guard unmetRequirement == nil else {
197+
printSkipMessage(testName: self.name, unmetRequirement: unmetRequirement!)
198+
return
199+
}
200+
let db = client.db(Self.testDatabase)
201+
try await db.collection(self.getCollectionName()).drop()
202+
let coll = try await db.createCollection(self.getCollectionName())
203+
204+
let stream = try await coll.watch()
205+
206+
_ = try await coll.insertOne(["x": 1])
207+
let event = try await stream.next()
208+
expect(event?.clusterTime).toNot(beNil())
209+
}
210+
}
185211
}
186212
#endif

Tests/MongoSwiftTests/Failpoint.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ extension FailPoint {
2929
/// Enables the failpoint, and returns a `EnabledFailpoint` which can handle disabling when needed
3030
internal func enable(
3131
using client: MongoClient,
32-
options: RunCommandOptions? = nil
32+
options: RunCommandOptions? = nil,
33+
session: ClientSession? = nil
3334
) async throws -> EnabledFailpoint {
34-
try await client.db("admin").runCommand(self.failPoint, options: options)
35+
try await client.db("admin").runCommand(self.failPoint, options: options, session: session)
3536
return EnabledFailpoint(failPoint: self, client: client)
3637
}
3738

Tests/MongoSwiftTests/UnifiedTestRunner/SpecialTestOperations.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@ struct UnifiedFailPoint: UnifiedOperationProtocol {
1212
/// The client entity to use for setting the failpoint.
1313
let client: String
1414

15+
/// Optional name of a session entity to use for setting the failpoint.
16+
let session: String?
17+
1518
static var knownArguments: Set<String> {
16-
["failPoint", "client"]
19+
["failPoint", "client", "session"]
1720
}
1821

1922
func execute(on _: UnifiedOperation.Object, context: Context) async throws -> UnifiedOperationResult {
2023
let testClient = try context.entities.getEntity(id: self.client).asTestClient()
24+
let session = try context.entities.resolveSession(id: self.session)
2125
let opts = RunCommandOptions(readPreference: .primary)
22-
let fpGuard = try await self.failPoint.enable(using: testClient.client, options: opts)
26+
let fpGuard = try await self.failPoint.enable(using: testClient.client, options: opts, session: session)
2327
context.enabledFailPoints.append(fpGuard)
2428
return .none
2529
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
{
2+
"description": "change-streams-clusterTime",
3+
"schemaVersion": "1.3",
4+
"createEntities": [
5+
{
6+
"client": {
7+
"id": "client0",
8+
"useMultipleMongoses": false
9+
}
10+
},
11+
{
12+
"database": {
13+
"id": "database0",
14+
"client": "client0",
15+
"databaseName": "database0"
16+
}
17+
},
18+
{
19+
"collection": {
20+
"id": "collection0",
21+
"database": "database0",
22+
"collectionName": "collection0"
23+
}
24+
}
25+
],
26+
"runOnRequirements": [
27+
{
28+
"minServerVersion": "4.0.0",
29+
"topologies": [
30+
"replicaset",
31+
"sharded-replicaset",
32+
"load-balanced",
33+
"sharded"
34+
]
35+
}
36+
],
37+
"initialData": [
38+
{
39+
"collectionName": "collection0",
40+
"databaseName": "database0",
41+
"documents": []
42+
}
43+
],
44+
"tests": [
45+
{
46+
"description": "clusterTime is present",
47+
"operations": [
48+
{
49+
"name": "createChangeStream",
50+
"object": "collection0",
51+
"arguments": {
52+
"pipeline": []
53+
},
54+
"saveResultAsEntity": "changeStream0"
55+
},
56+
{
57+
"name": "insertOne",
58+
"object": "collection0",
59+
"arguments": {
60+
"document": {
61+
"_id": 1
62+
}
63+
}
64+
},
65+
{
66+
"name": "iterateUntilDocumentOrError",
67+
"object": "changeStream0",
68+
"expectResult": {
69+
"ns": {
70+
"db": "database0",
71+
"coll": "collection0"
72+
},
73+
"clusterTime": {
74+
"$$exists": true
75+
}
76+
}
77+
}
78+
]
79+
}
80+
]
81+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
description: "change-streams-clusterTime"
2+
schemaVersion: "1.3"
3+
createEntities:
4+
- client:
5+
id: &client0 client0
6+
useMultipleMongoses: false
7+
- database:
8+
id: &database0 database0
9+
client: *client0
10+
databaseName: *database0
11+
- collection:
12+
id: &collection0 collection0
13+
database: *database0
14+
collectionName: *collection0
15+
16+
runOnRequirements:
17+
- minServerVersion: "4.0.0"
18+
topologies: [ replicaset, sharded-replicaset, load-balanced, sharded ]
19+
20+
initialData:
21+
- collectionName: *collection0
22+
databaseName: *database0
23+
documents: []
24+
25+
tests:
26+
- description: "clusterTime is present"
27+
operations:
28+
- name: createChangeStream
29+
object: *collection0
30+
arguments: { pipeline: [] }
31+
saveResultAsEntity: &changeStream0 changeStream0
32+
- name: insertOne
33+
object: *collection0
34+
arguments:
35+
document: { _id: 1 }
36+
- name: iterateUntilDocumentOrError
37+
object: *changeStream0
38+
expectResult:
39+
ns: { db: *database0, coll: *collection0 }
40+
clusterTime: { $$exists: true }

0 commit comments

Comments
 (0)