diff --git a/Sources/MongoSwift/APM.swift b/Sources/MongoSwift/APM.swift index 2f9f1df88..83bfc9bec 100644 --- a/Sources/MongoSwift/APM.swift +++ b/Sources/MongoSwift/APM.swift @@ -117,6 +117,111 @@ private protocol CommandEventProtocol { var serviceID: BSONObjectID? { get } } +#if compiler(>=5.5.2) && canImport(_Concurrency) +/// An asynchronous way to monitor command events that uses `AsyncSequence`. +/// Only available for Swift 5.5.2 and higher. +@available(macOS 10.15, *) +// sourcery: skipSyncExport +public struct CommandEventStream { + fileprivate let stream: AsyncStream + private let cmdHandler: CommandEventHandler + /// Initialize the stream with a `CommandEventHandler` + internal init(cmdHandler: CommandEventHandler, stream: AsyncStream) { + self.cmdHandler = cmdHandler + self.stream = stream + } +} + +/// An asynchronous way to monitor SDAM events that uses `AsyncSequence`. +/// Only available for Swift 5.5.2 and higher. +@available(macOS 10.15, *) +// sourcery: skipSyncExport +public struct SDAMEventStream { + fileprivate let stream: AsyncStream + private let sdamHandler: SDAMEventHandler + /// Initialize the stream with an `SDAMEventHandler` + internal init(sdamHandler: SDAMEventHandler, stream: AsyncStream) { + self.sdamHandler = sdamHandler + self.stream = stream + } +} + +@available(macOS 10.15, *) +extension CommandEventStream: AsyncSequence { + /// The type of element produced by this `CommandEventStream`. + public typealias Element = CommandEvent + + /// The asynchronous iterator of type `CommandEventStreamIterator` + /// that produces elements of this asynchronous sequence. + public typealias AsyncIterator = CommandEventStreamIterator + + /// Creates the asynchronous iterator that produces elements of this `CommandEventStream`. + public func makeAsyncIterator() -> CommandEventStreamIterator { + CommandEventStreamIterator(cmdEventStream: self) + } +} + +@available(macOS 10.15, *) +extension SDAMEventStream: AsyncSequence { + /// The type of element produced by this `SDAMEventStream`. + public typealias Element = SDAMEvent + + /// The asynchronous iterator of type `SDAMEventStreamIterator` + /// that produces elements of this asynchronous sequence. + public typealias AsyncIterator = SDAMEventStreamIterator + + /// Creates the asynchronous iterator that produces elements of this `SDAMEventStream`. + public func makeAsyncIterator() -> SDAMEventStreamIterator { + SDAMEventStreamIterator(sdamEventStream: self) + } +} + +/// The associated iterator for the `CommandEventStream`. +@available(macOS 10.15, *) +// sourcery: skipSyncExport +public struct CommandEventStreamIterator: AsyncIteratorProtocol { + private var iterator: AsyncStream.AsyncIterator + private let cmdEventStream: CommandEventStream + + /// Initialize the iterator + internal init(cmdEventStream: CommandEventStream) { + self.iterator = cmdEventStream.stream.makeAsyncIterator() + self.cmdEventStream = cmdEventStream + } + + /// Asynchronously advances to the next element and returns it, or ends the sequence if there is no next element. + public mutating func next() async -> CommandEvent? { + await self.iterator.next() + } + + /// The type of element iterated over by this `CommandEventStreamIterator`. + public typealias Element = CommandEvent +} + +/// The associated iterator for the `SDAMEventStream`. +@available(macOS 10.15, *) +// sourcery: skipSyncExport +public struct SDAMEventStreamIterator: AsyncIteratorProtocol { + private var iterator: AsyncStream.AsyncIterator + private let sdamEventStream: SDAMEventStream + + /// Initialize the iterator + internal init(sdamEventStream: SDAMEventStream) { + self.iterator = sdamEventStream.stream.makeAsyncIterator() + self.sdamEventStream = sdamEventStream + } + + /// Asynchronously advances to the next element and returns it, or ends the sequence if there is no next element. + public mutating func next() async -> SDAMEvent? { + await self.iterator.next() + } + + /// The type of element iterated over by this `SDAMEventStreamIterator`. + public typealias Element = SDAMEvent +} + +#endif + /// An event published when a command starts. public struct CommandStartedEvent: MongoSwiftEvent, CommandEventProtocol { /// Wrapper around a `mongoc_apm_command_started_t`. diff --git a/Sources/MongoSwift/MongoClient.swift b/Sources/MongoSwift/MongoClient.swift index e78d7fd0f..5a7f1a0e7 100644 --- a/Sources/MongoSwift/MongoClient.swift +++ b/Sources/MongoSwift/MongoClient.swift @@ -277,12 +277,15 @@ public class MongoClient { /// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete. private var wasClosed = false - /// Handlers for command monitoring events. + /// Handlers for command monitoring events. Should only be accessed when holding `eventHandlerLock`. internal var commandEventHandlers: [CommandEventHandler] - /// Handlers for SDAM monitoring events. + /// Handlers for SDAM monitoring events. Should only be accessed when holding `eventHandlerLock`. internal var sdamEventHandlers: [SDAMEventHandler] + /// Lock used to synchronize access to the event handler arrays to prevent data races. + private let eventHandlerLock: Lock = .init() + /// Counter for generating client _ids. internal static var clientIDGenerator = NIOAtomic.makeAtomic(value: 0) @@ -402,6 +405,124 @@ public class MongoClient { ) } +#if compiler(>=5.5.2) && canImport(_Concurrency) + @available(macOS 10.15, *) + internal class CmdHandler: CommandEventHandler { + private let continuation: AsyncStream.Continuation + internal init(continuation: AsyncStream.Continuation) { + self.continuation = continuation + } + + // Satisfies the protocol + internal func handleCommandEvent(_ event: CommandEvent) { + self.continuation.yield(event) + } + + internal func finish() { + self.continuation.finish() + } + } + + @available(macOS 10.15, *) + internal class SDAMHandler: SDAMEventHandler { + private let continuation: AsyncStream.Continuation + internal init(continuation: AsyncStream.Continuation) { + self.continuation = continuation + } + + // Satisfies the protocol + internal func handleSDAMEvent(_ event: SDAMEvent) { + self.continuation.yield(event) + } + + internal func finish() { + self.continuation.finish() + } + } + + /** + * Provides an `AsyncSequence` API for consuming command monitoring events. + * + * Example: printing the command events out would be written as + * ``` + * for try await event in client.commandEventStream() { + * print(event) + * } + * ``` + * If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`. + * The stream will be ended automatically if the `Task` it is running in is cancelled. + * - Returns: A `CommandEventStream` that implements `AsyncSequence`. + * - Note: Only the most recent 100 events are stored in the stream. + */ + @available(macOS 10.15, *) + public func commandEventStream() -> CommandEventStream { + var handler: CmdHandler? + let stream = AsyncStream( + CommandEvent.self, + bufferingPolicy: .bufferingNewest(100) + ) { con in + let cmdHandler = CmdHandler(continuation: con) + handler = cmdHandler + self.addCommandEventHandler(cmdHandler) + } + + // Ok to force unwrap since handler is set in the closure + // swiftlint:disable force_unwrapping + let commandEvents = CommandEventStream(cmdHandler: handler!, stream: stream) + + return commandEvents + } + + /** + * Provides an `AsyncSequence` API for consuming SDAM monitoring events. + * + * Example: printing the SDAM events out would be written as + * ``` + * for try await event in client.sdamEventStream() { + * print(event) + * } + * ``` + * If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`. + * The stream will be ended automatically if the `Task` it is running in is cancelled. + * - Returns: An `SDAMEventStream` that implements `AsyncSequence`. + * - Note: Only the most recent 100 events are stored in the stream. + */ + @available(macOS 10.15, *) + public func sdamEventStream() -> SDAMEventStream { + var handler: SDAMHandler? + let stream = AsyncStream( + SDAMEvent.self, + bufferingPolicy: .bufferingNewest(100) + ) { con in + let sdamHandler = SDAMHandler(continuation: con) + handler = sdamHandler + self.addSDAMEventHandler(sdamHandler) + } + // Ok to force unwrap since handler is set just above + // swiftlint:disable force_unwrapping + let sdamEvents = SDAMEventStream(sdamHandler: handler!, stream: stream) + return sdamEvents + } +#endif + + // Check which handlers are assoc. with streams and finish them + private func closeHandlers() { +#if compiler(>=5.5.2) && canImport(_Concurrency) + if #available(macOS 10.15, *) { + for handler in commandEventHandlers { + if let cmdHandler = handler as? WeakEventHandler { + cmdHandler.handler?.finish() + } + } + for handler in sdamEventHandlers { + if let sdamHandler = handler as? WeakEventHandler { + sdamHandler.handler?.finish() + } + } + } +#endif + } + /** * Closes this `MongoClient`, closing all connections to the server and cleaning up internal state. * @@ -422,6 +543,7 @@ public class MongoClient { self.operationExecutor.shutdown() } closeResult.whenComplete { _ in + self.closeHandlers() self.wasClosed = true } @@ -441,6 +563,7 @@ public class MongoClient { public func syncClose() throws { try self.connectionPool.close() try self.operationExecutor.syncShutdown() + self.closeHandlers() self.wasClosed = true } @@ -786,7 +909,9 @@ public class MongoClient { * to continue to receive events. */ public func addCommandEventHandler(_ handler: T) { - self.commandEventHandlers.append(WeakEventHandler(referencing: handler)) + self.eventHandlerLock.withLock { + self.commandEventHandlers.append(WeakEventHandler(referencing: handler)) + } } /** @@ -796,7 +921,9 @@ public class MongoClient { * strong reference cycle and potentially result in memory leaks. */ public func addCommandEventHandler(_ handlerFunc: @escaping (CommandEvent) -> Void) { - self.commandEventHandlers.append(CallbackEventHandler(handlerFunc)) + self.eventHandlerLock.withLock { + self.commandEventHandlers.append(CallbackEventHandler(handlerFunc)) + } } /** @@ -806,7 +933,9 @@ public class MongoClient { * to continue to receive events. */ public func addSDAMEventHandler(_ handler: T) { - self.sdamEventHandlers.append(WeakEventHandler(referencing: handler)) + self.eventHandlerLock.withLock { + self.sdamEventHandlers.append(WeakEventHandler(referencing: handler)) + } } /** @@ -816,7 +945,9 @@ public class MongoClient { * strong reference cycle and potentially result in memory leaks. */ public func addSDAMEventHandler(_ handlerFunc: @escaping (SDAMEvent) -> Void) { - self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc)) + self.eventHandlerLock.withLock { + self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc)) + } } /// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block @@ -882,7 +1013,7 @@ extension CallbackEventHandler: CommandEventHandler where EventType == CommandEv /// Event handler that stores a weak reference to the underlying handler. private class WeakEventHandler { - private weak var handler: T? + internal weak var handler: T? fileprivate init(referencing handler: T) { self.handler = handler diff --git a/Sources/MongoSwiftSync/Exports.swift b/Sources/MongoSwiftSync/Exports.swift index 091e4c6b4..2ebdd0ed6 100644 --- a/Sources/MongoSwiftSync/Exports.swift +++ b/Sources/MongoSwiftSync/Exports.swift @@ -1,4 +1,4 @@ -// Generated using Sourcery 1.6.0 — https://github.com/krzysztofzablocki/Sourcery +// Generated using Sourcery 1.6.1 — https://github.com/krzysztofzablocki/Sourcery // DO NOT EDIT // Re-export the BSON library diff --git a/Sources/TestsCommon/APMUtils.swift b/Sources/TestsCommon/APMUtils.swift index 56ec05fd5..a40aa9374 100644 --- a/Sources/TestsCommon/APMUtils.swift +++ b/Sources/TestsCommon/APMUtils.swift @@ -84,7 +84,10 @@ public enum EventType: String, Decodable { case commandStartedEvent, commandSucceededEvent, commandFailedEvent, connectionCreatedEvent, connectionReadyEvent, connectionClosedEvent, connectionCheckedInEvent, connectionCheckedOutEvent, connectionCheckOutFailedEvent, - poolCreatedEvent, poolReadyEvent, poolClearedEvent, poolClosedEvent + poolCreatedEvent, poolReadyEvent, poolClearedEvent, poolClosedEvent, + topologyDescriptionChanged, topologyOpening, topologyClosed, serverDescriptionChanged, + serverOpening, serverClosed, serverHeartbeatStarted, serverHeartbeatSucceeded, + serverHeartbeatFailed } extension CommandEvent { @@ -123,3 +126,113 @@ extension CommandEvent { return event } } + +extension SDAMEvent { + public var type: EventType { + switch self { + case .topologyDescriptionChanged: + return .topologyDescriptionChanged + case .topologyOpening: + return .topologyOpening + case .topologyClosed: + return .topologyClosed + case .serverDescriptionChanged: + return .serverDescriptionChanged + case .serverOpening: + return .serverOpening + case .serverClosed: + return .serverClosed + case .serverHeartbeatStarted: + return .serverHeartbeatStarted + case .serverHeartbeatSucceeded: + return .serverHeartbeatSucceeded + case .serverHeartbeatFailed: + return .serverHeartbeatFailed + } + } + + // Failable accessors for the different types of topology events. + + /// Returns this event as a `TopologyOpeningEvent` if it is one, nil otherwise. + public var topologyOpeningValue: TopologyOpeningEvent? { + guard case let .topologyOpening(event) = self else { + return nil + } + return event + } + + /// Returns this event as a `TopologyClosedEvent` if it is one, nil otherwise. + public var topologyClosedValue: TopologyClosedEvent? { + guard case let .topologyClosed(event) = self else { + return nil + } + return event + } + + /// Returns this event as a `TopologyDescriptionChangedEvent` if it is one, nil otherwise. + public var topologyDescriptionChangedValue: TopologyDescriptionChangedEvent? { + guard case let .topologyDescriptionChanged(event) = self else { + return nil + } + return event + } + + /// Returns this event as a `ServerOpeningEvent` if it is one, nil otherwise. + public var serverOpeningValue: ServerOpeningEvent? { + guard case let .serverOpening(event) = self else { + return nil + } + return event + } + + /// Returns this event as a `ServerClosedEvent` if it is one, nil otherwise. + public var serverClosedValue: ServerClosedEvent? { + guard case let .serverClosed(event) = self else { + return nil + } + return event + } + + /// Returns this event as a `ServerDescriptionChangedEvent` if it is one, nil otherwise. + public var serverDescriptionChangedValue: ServerDescriptionChangedEvent? { + guard case let .serverDescriptionChanged(event) = self else { + return nil + } + return event + } + + /// Returns this event as a `ServerHeartbeatStartedEvent` if it is one, nil otherwise. + public var serverHeartbeatStartedValue: ServerHeartbeatStartedEvent? { + guard case let .serverHeartbeatStarted(event) = self else { + return nil + } + return event + } + + /// Returns this event as a `ServerHeartbeatSucceededEvent` if it is one, nil otherwise. + public var serverHeartbeatSucceededValue: ServerHeartbeatSucceededEvent? { + guard case let .serverHeartbeatSucceeded(event) = self else { + return nil + } + return event + } + + /// Returns this event as a `ServerHeartbeatFailedEvent` if it is one, nil otherwise. + public var serverHeartbeatFailedValue: ServerHeartbeatFailedEvent? { + guard case let .serverHeartbeatFailed(event) = self else { + return nil + } + return event + } + + /// Checks whether or not this event is a `ServerHeartbeatStartedEvent`, `ServerHeartbeatSucceededEvent`, or + /// a `ServerHeartbeatFailedEvent`. + public var isHeartbeatEvent: Bool { + switch self { + case .serverHeartbeatFailed, .serverHeartbeatStarted, .serverHeartbeatSucceeded: + return true + default: + return false + } + } +} diff --git a/Tests/MongoSwiftSyncTests/SDAMTests.swift b/Tests/MongoSwiftSyncTests/SDAMTests.swift index b6cb83234..78b64806d 100644 --- a/Tests/MongoSwiftSyncTests/SDAMTests.swift +++ b/Tests/MongoSwiftSyncTests/SDAMTests.swift @@ -290,57 +290,3 @@ private class TestSDAMMonitor: SDAMEventHandler { self.topEvents.append(event) } } - -/// Failable accessors for the different types of topology events. -extension SDAMEvent { - fileprivate var topologyOpeningValue: TopologyOpeningEvent? { - guard case let .topologyOpening(event) = self else { - return nil - } - return event - } - - private var topologyClosedValue: TopologyClosedEvent? { - guard case let .topologyClosed(event) = self else { - return nil - } - return event - } - - fileprivate var topologyDescriptionChangedValue: TopologyDescriptionChangedEvent? { - guard case let .topologyDescriptionChanged(event) = self else { - return nil - } - return event - } - - fileprivate var serverOpeningValue: ServerOpeningEvent? { - guard case let .serverOpening(event) = self else { - return nil - } - return event - } - - private var serverClosedValue: ServerClosedEvent? { - guard case let .serverClosed(event) = self else { - return nil - } - return event - } - - fileprivate var serverDescriptionChangedValue: ServerDescriptionChangedEvent? { - guard case let .serverDescriptionChanged(event) = self else { - return nil - } - return event - } - - fileprivate var isHeartbeatEvent: Bool { - switch self { - case .serverHeartbeatFailed, .serverHeartbeatStarted, .serverHeartbeatSucceeded: - return true - default: - return false - } - } -} diff --git a/Tests/MongoSwiftTests/APMTests.swift b/Tests/MongoSwiftTests/APMTests.swift new file mode 100644 index 000000000..e481a5eac --- /dev/null +++ b/Tests/MongoSwiftTests/APMTests.swift @@ -0,0 +1,201 @@ +#if compiler(>=5.5.2) && canImport(_Concurrency) +import MongoSwift +import Nimble +import NIO +import NIOConcurrencyHelpers +import TestsCommon + +private protocol StreamableEvent { + var type: EventType { get } +} + +extension CommandEvent: StreamableEvent {} + +extension SDAMEvent: StreamableEvent {} + +@available(macOS 10.15, *) +final class APMTests: MongoSwiftTestCase { + func testClientFinishesCommandStreams() async throws { + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { elg.syncShutdownOrFail() } + let client = try MongoClient.makeTestClient(eventLoopGroup: elg) + + // Task to create a commandEventStream and should return when the client is closed + let eventTask = Task { () -> Bool in + for try await _ in client.commandEventStream() {} + return true + } + try await client.db("admin").runCommand(["ping": 1]) + try await client.close() + + // Ensure the task returned eventually + _ = try await eventTask.value + } + + func testCommandStreamHasCorrectEvents() async throws { + let commandStr: [String] = ["ping", "ping", "drop", "drop", "endSessions", "endSessions"] + let eventTypes: [EventType] = [ + .commandStartedEvent, + .commandSucceededEvent, + .commandStartedEvent, + .commandSucceededEvent, + .commandStartedEvent, + .commandSucceededEvent + ] + let clientTask = try await self.withTestNamespace { client, db, _ -> Task in + let cmdTask = Task { () -> Int in + var i = 0 + for try await event in client.commandEventStream() { + expect(event.commandName).to(equal(commandStr[i])) + expect(event.type).to(equal(eventTypes[i])) + i += 1 + } + return i + } + + try await db.runCommand(["ping": 1]) + return cmdTask + } + let numEvents = try await clientTask.value + expect(numEvents).to(equal(6)) + } + + func testCommandStreamBufferingPolicy() async throws { + let insertsDone = NIOAtomic.makeAtomic(value: false) + let clientTask = try await self.withTestNamespace { client, _, coll -> Task in + let cmdBufferTask = Task { () -> Int in + var i = 0 + let stream = client.commandEventStream() + try await assertIsEventuallyTrue(description: "inserts done") { + insertsDone.load() + } + + for try await _ in stream { + i += 1 + } + return i + } + for _ in 1...60 { // 120 events + try await coll.insertOne(["hello": "world"]) + } + insertsDone.store(true) + return cmdBufferTask + } + let numEventsBuffer = try await clientTask.value + // Cant check for exactly 100 because of potential load balancer events getting added in during loop + expect(numEventsBuffer).to(beLessThan(120)) + } + + /// Helper that tests kicking off multiple streams concurrently + fileprivate func concurrentStreamTestHelper(f: @escaping (MongoClient) -> T) async throws + where T: AsyncSequence, T.Element: StreamableEvent + { + let taskCounter = NIOAtomic.makeAtomic(value: 0) + let concurrentTaskGroupCorrect = try await withThrowingTaskGroup( + of: [EventType].self, + returning: [[EventType]].self + ) { taskGroup in + // Client used for stream and then closed with taskGroup in scope to ensure all tasks return + try await self.withTestNamespace { client, db, coll in + for _ in 0...4 { + // Add 5 tasks + taskGroup.addTask { + taskCounter.add(1) + var eventArr: [EventType] = [] + for try await event in f(client) { + eventArr.append(event.type) + } + return eventArr + } + } + // Ensure all tasks start, then run commands + try await assertIsEventuallyTrue(description: "each task is started") { + taskCounter.load() == 5 + } + try await db.runCommand(["ping": 1]) + try await coll.insertOne(["hello": "world"]) + } + var taskArr: [[EventType]] = [] + for try await result in taskGroup { + taskArr.append(result) + } + return taskArr + } + + // Expect all tasks received the same number (>0) of events + for i in 0...4 { + let eventArrOutput = concurrentTaskGroupCorrect[i] + expect(eventArrOutput.count).to(beGreaterThan(0)) + } + for i in 1...4 { + let eventArrOld = concurrentTaskGroupCorrect[i] + let eventArrCurr = concurrentTaskGroupCorrect[i] + expect(eventArrOld).to(equal(eventArrCurr)) + } + } + + func testCommandStreamConcurrentStreams() async throws { + try await self.concurrentStreamTestHelper { client in + client.commandEventStream() + } + } + + func testSDAMStreamConcurrentStreams() async throws { + try await self.concurrentStreamTestHelper { client in + client.sdamEventStream() + } + } + + func testClientFinishesSDAMStreams() async throws { + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { elg.syncShutdownOrFail() } + let client = try MongoClient.makeTestClient(eventLoopGroup: elg) + + // Task to create a commandEventStream and should return when the client is closed + let eventTask = Task { () -> Bool in + for try await _ in client.sdamEventStream() {} + return true + } + try await client.db("admin").runCommand(["ping": 1]) + try await client.close() + + // Ensure the task returned eventually + _ = try await eventTask.value + } + + func testSDAMEventStreamHasCorrectEvents() async throws { + let taskStarted = NIOAtomic.makeAtomic(value: false) + // Events seen by the regular SDAM handler + var handlerEvents: [EventType] = [] + let clientTask = try await self.withTestNamespace { client, db, coll -> Task<[EventType], Error> in + client.addSDAMEventHandler { event in + if !event.isHeartbeatEvent { + handlerEvents.append(event.type) + } + } + + let sdamTask = Task { () -> [EventType] in + var eventTypeSdam: [EventType] = [] + taskStarted.store(true) + for try await event in client.sdamEventStream() { + if !event.isHeartbeatEvent { + eventTypeSdam.append(event.type) + } + } + return eventTypeSdam + } + // Wait until the event stream has been opened before we start inserting data. + try await assertIsEventuallyTrue(description: "event stream should be started") { + taskStarted.load() + } + + try await db.runCommand(["ping": 1]) + try await coll.insertOne(["hello": "world"]) + return sdamTask + } + let streamEvents = try await clientTask.value + expect(streamEvents.count).to(beGreaterThan(0)) + expect(streamEvents).to(equal(handlerEvents)) + } +} +#endif diff --git a/Tests/MongoSwiftTests/AsyncAwaitTestUtils.swift b/Tests/MongoSwiftTests/AsyncAwaitTestUtils.swift index 0c3296b99..6eb7fbe81 100644 --- a/Tests/MongoSwiftTests/AsyncAwaitTestUtils.swift +++ b/Tests/MongoSwiftTests/AsyncAwaitTestUtils.swift @@ -51,6 +51,41 @@ func assertIsEventuallyTrue( XCTFail("Expected condition \"\(description)\" to be true within \(timeout) seconds, but was not") } +/// Asserts that the provided block returns true within the specified timeout. Nimble's `toEventually` can only be used +/// rom the main testing thread which is too restrictive for our purposes testing the async/await APIs. +@available(macOS 10.15.0, *) +func assertIsEventuallyTrue( + description: String, + timeout: TimeInterval = 5, + sleepInterval: TimeInterval = 0.1, + _ block: @escaping () async throws -> Bool +) async throws { + // Task that does the work as long as its not cancelled + let workTask = Task { () -> Bool in + while !Task.isCancelled { + guard try await block() else { + // Optional bc if task is cancelled, we want to return false and not encounter a `CancellationError` + try? await Task.sleep(seconds: sleepInterval) + continue + } + // task succeeded so we return true + return true + } + // Ran out of time before we succeeded, so return false + return false + } + + // Sleep until the timeout time is reached and then cancel the work + Task { + try await Task.sleep(seconds: timeout) + workTask.cancel() + } + guard try await workTask.value else { + XCTFail("Expected condition \"\(description)\" to be true within \(timeout) seconds, but was not") + return + } +} + @available(macOS 10.15.0, *) extension MongoSwiftTestCase { internal func withTestClient(