Skip to content

Commit 2d579e2

Browse files
authored
SWIFT-1469 Provide AsyncSequence APIs for monitoring events (#764)
1 parent 48e603d commit 2d579e2

File tree

6 files changed

+593
-62
lines changed

6 files changed

+593
-62
lines changed

Sources/MongoSwift/APM.swift

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,111 @@ private protocol CommandEventProtocol {
117117
var serviceID: BSONObjectID? { get }
118118
}
119119

120+
#if compiler(>=5.5.2) && canImport(_Concurrency)
121+
/// An asynchronous way to monitor command events that uses `AsyncSequence`.
122+
/// Only available for Swift 5.5.2 and higher.
123+
@available(macOS 10.15, *)
124+
// sourcery: skipSyncExport
125+
public struct CommandEventStream {
126+
fileprivate let stream: AsyncStream<CommandEvent>
127+
private let cmdHandler: CommandEventHandler
128+
/// Initialize the stream with a `CommandEventHandler`
129+
internal init(cmdHandler: CommandEventHandler, stream: AsyncStream<CommandEvent>) {
130+
self.cmdHandler = cmdHandler
131+
self.stream = stream
132+
}
133+
}
134+
135+
/// An asynchronous way to monitor SDAM events that uses `AsyncSequence`.
136+
/// Only available for Swift 5.5.2 and higher.
137+
@available(macOS 10.15, *)
138+
// sourcery: skipSyncExport
139+
public struct SDAMEventStream {
140+
fileprivate let stream: AsyncStream<SDAMEvent>
141+
private let sdamHandler: SDAMEventHandler
142+
/// Initialize the stream with an `SDAMEventHandler`
143+
internal init(sdamHandler: SDAMEventHandler, stream: AsyncStream<SDAMEvent>) {
144+
self.sdamHandler = sdamHandler
145+
self.stream = stream
146+
}
147+
}
148+
149+
@available(macOS 10.15, *)
150+
extension CommandEventStream: AsyncSequence {
151+
/// The type of element produced by this `CommandEventStream`.
152+
public typealias Element = CommandEvent
153+
154+
/// The asynchronous iterator of type `CommandEventStreamIterator`
155+
/// that produces elements of this asynchronous sequence.
156+
public typealias AsyncIterator = CommandEventStreamIterator
157+
158+
/// Creates the asynchronous iterator that produces elements of this `CommandEventStream`.
159+
public func makeAsyncIterator() -> CommandEventStreamIterator {
160+
CommandEventStreamIterator(cmdEventStream: self)
161+
}
162+
}
163+
164+
@available(macOS 10.15, *)
165+
extension SDAMEventStream: AsyncSequence {
166+
/// The type of element produced by this `SDAMEventStream`.
167+
public typealias Element = SDAMEvent
168+
169+
/// The asynchronous iterator of type `SDAMEventStreamIterator`
170+
/// that produces elements of this asynchronous sequence.
171+
public typealias AsyncIterator = SDAMEventStreamIterator
172+
173+
/// Creates the asynchronous iterator that produces elements of this `SDAMEventStream`.
174+
public func makeAsyncIterator() -> SDAMEventStreamIterator {
175+
SDAMEventStreamIterator(sdamEventStream: self)
176+
}
177+
}
178+
179+
/// The associated iterator for the `CommandEventStream`.
180+
@available(macOS 10.15, *)
181+
// sourcery: skipSyncExport
182+
public struct CommandEventStreamIterator: AsyncIteratorProtocol {
183+
private var iterator: AsyncStream<CommandEvent>.AsyncIterator
184+
private let cmdEventStream: CommandEventStream
185+
186+
/// Initialize the iterator
187+
internal init(cmdEventStream: CommandEventStream) {
188+
self.iterator = cmdEventStream.stream.makeAsyncIterator()
189+
self.cmdEventStream = cmdEventStream
190+
}
191+
192+
/// Asynchronously advances to the next element and returns it, or ends the sequence if there is no next element.
193+
public mutating func next() async -> CommandEvent? {
194+
await self.iterator.next()
195+
}
196+
197+
/// The type of element iterated over by this `CommandEventStreamIterator`.
198+
public typealias Element = CommandEvent
199+
}
200+
201+
/// The associated iterator for the `SDAMEventStream`.
202+
@available(macOS 10.15, *)
203+
// sourcery: skipSyncExport
204+
public struct SDAMEventStreamIterator: AsyncIteratorProtocol {
205+
private var iterator: AsyncStream<SDAMEvent>.AsyncIterator
206+
private let sdamEventStream: SDAMEventStream
207+
208+
/// Initialize the iterator
209+
internal init(sdamEventStream: SDAMEventStream) {
210+
self.iterator = sdamEventStream.stream.makeAsyncIterator()
211+
self.sdamEventStream = sdamEventStream
212+
}
213+
214+
/// Asynchronously advances to the next element and returns it, or ends the sequence if there is no next element.
215+
public mutating func next() async -> SDAMEvent? {
216+
await self.iterator.next()
217+
}
218+
219+
/// The type of element iterated over by this `SDAMEventStreamIterator`.
220+
public typealias Element = SDAMEvent
221+
}
222+
223+
#endif
224+
120225
/// An event published when a command starts.
121226
public struct CommandStartedEvent: MongoSwiftEvent, CommandEventProtocol {
122227
/// Wrapper around a `mongoc_apm_command_started_t`.

Sources/MongoSwift/MongoClient.swift

Lines changed: 138 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,15 @@ public class MongoClient {
277277
/// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete.
278278
private var wasClosed = false
279279

280-
/// Handlers for command monitoring events.
280+
/// Handlers for command monitoring events. Should only be accessed when holding `eventHandlerLock`.
281281
internal var commandEventHandlers: [CommandEventHandler]
282282

283-
/// Handlers for SDAM monitoring events.
283+
/// Handlers for SDAM monitoring events. Should only be accessed when holding `eventHandlerLock`.
284284
internal var sdamEventHandlers: [SDAMEventHandler]
285285

286+
/// Lock used to synchronize access to the event handler arrays to prevent data races.
287+
private let eventHandlerLock: Lock = .init()
288+
286289
/// Counter for generating client _ids.
287290
internal static var clientIDGenerator = NIOAtomic<Int>.makeAtomic(value: 0)
288291

@@ -402,6 +405,124 @@ public class MongoClient {
402405
)
403406
}
404407

408+
#if compiler(>=5.5.2) && canImport(_Concurrency)
409+
@available(macOS 10.15, *)
410+
internal class CmdHandler: CommandEventHandler {
411+
private let continuation: AsyncStream<CommandEvent>.Continuation
412+
internal init(continuation: AsyncStream<CommandEvent>.Continuation) {
413+
self.continuation = continuation
414+
}
415+
416+
// Satisfies the protocol
417+
internal func handleCommandEvent(_ event: CommandEvent) {
418+
self.continuation.yield(event)
419+
}
420+
421+
internal func finish() {
422+
self.continuation.finish()
423+
}
424+
}
425+
426+
@available(macOS 10.15, *)
427+
internal class SDAMHandler: SDAMEventHandler {
428+
private let continuation: AsyncStream<SDAMEvent>.Continuation
429+
internal init(continuation: AsyncStream<SDAMEvent>.Continuation) {
430+
self.continuation = continuation
431+
}
432+
433+
// Satisfies the protocol
434+
internal func handleSDAMEvent(_ event: SDAMEvent) {
435+
self.continuation.yield(event)
436+
}
437+
438+
internal func finish() {
439+
self.continuation.finish()
440+
}
441+
}
442+
443+
/**
444+
* Provides an `AsyncSequence` API for consuming command monitoring events.
445+
*
446+
* Example: printing the command events out would be written as
447+
* ```
448+
* for try await event in client.commandEventStream() {
449+
* print(event)
450+
* }
451+
* ```
452+
* If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`.
453+
* The stream will be ended automatically if the `Task` it is running in is cancelled.
454+
* - Returns: A `CommandEventStream` that implements `AsyncSequence`.
455+
* - Note: Only the most recent 100 events are stored in the stream.
456+
*/
457+
@available(macOS 10.15, *)
458+
public func commandEventStream() -> CommandEventStream {
459+
var handler: CmdHandler?
460+
let stream = AsyncStream(
461+
CommandEvent.self,
462+
bufferingPolicy: .bufferingNewest(100)
463+
) { con in
464+
let cmdHandler = CmdHandler(continuation: con)
465+
handler = cmdHandler
466+
self.addCommandEventHandler(cmdHandler)
467+
}
468+
469+
// Ok to force unwrap since handler is set in the closure
470+
// swiftlint:disable force_unwrapping
471+
let commandEvents = CommandEventStream(cmdHandler: handler!, stream: stream)
472+
473+
return commandEvents
474+
}
475+
476+
/**
477+
* Provides an `AsyncSequence` API for consuming SDAM monitoring events.
478+
*
479+
* Example: printing the SDAM events out would be written as
480+
* ```
481+
* for try await event in client.sdamEventStream() {
482+
* print(event)
483+
* }
484+
* ```
485+
* If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`.
486+
* The stream will be ended automatically if the `Task` it is running in is cancelled.
487+
* - Returns: An `SDAMEventStream` that implements `AsyncSequence`.
488+
* - Note: Only the most recent 100 events are stored in the stream.
489+
*/
490+
@available(macOS 10.15, *)
491+
public func sdamEventStream() -> SDAMEventStream {
492+
var handler: SDAMHandler?
493+
let stream = AsyncStream(
494+
SDAMEvent.self,
495+
bufferingPolicy: .bufferingNewest(100)
496+
) { con in
497+
let sdamHandler = SDAMHandler(continuation: con)
498+
handler = sdamHandler
499+
self.addSDAMEventHandler(sdamHandler)
500+
}
501+
// Ok to force unwrap since handler is set just above
502+
// swiftlint:disable force_unwrapping
503+
let sdamEvents = SDAMEventStream(sdamHandler: handler!, stream: stream)
504+
return sdamEvents
505+
}
506+
#endif
507+
508+
// Check which handlers are assoc. with streams and finish them
509+
private func closeHandlers() {
510+
#if compiler(>=5.5.2) && canImport(_Concurrency)
511+
if #available(macOS 10.15, *) {
512+
for handler in commandEventHandlers {
513+
if let cmdHandler = handler as? WeakEventHandler<CmdHandler> {
514+
cmdHandler.handler?.finish()
515+
}
516+
}
517+
for handler in sdamEventHandlers {
518+
if let sdamHandler = handler as? WeakEventHandler<SDAMHandler> {
519+
sdamHandler.handler?.finish()
520+
}
521+
}
522+
}
523+
#endif
524+
}
525+
405526
/**
406527
* Closes this `MongoClient`, closing all connections to the server and cleaning up internal state.
407528
*
@@ -422,6 +543,7 @@ public class MongoClient {
422543
self.operationExecutor.shutdown()
423544
}
424545
closeResult.whenComplete { _ in
546+
self.closeHandlers()
425547
self.wasClosed = true
426548
}
427549

@@ -441,6 +563,7 @@ public class MongoClient {
441563
public func syncClose() throws {
442564
try self.connectionPool.close()
443565
try self.operationExecutor.syncShutdown()
566+
self.closeHandlers()
444567
self.wasClosed = true
445568
}
446569

@@ -786,7 +909,9 @@ public class MongoClient {
786909
* to continue to receive events.
787910
*/
788911
public func addCommandEventHandler<T: CommandEventHandler>(_ handler: T) {
789-
self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler))
912+
self.eventHandlerLock.withLock {
913+
self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler))
914+
}
790915
}
791916

792917
/**
@@ -796,7 +921,9 @@ public class MongoClient {
796921
* strong reference cycle and potentially result in memory leaks.
797922
*/
798923
public func addCommandEventHandler(_ handlerFunc: @escaping (CommandEvent) -> Void) {
799-
self.commandEventHandlers.append(CallbackEventHandler(handlerFunc))
924+
self.eventHandlerLock.withLock {
925+
self.commandEventHandlers.append(CallbackEventHandler(handlerFunc))
926+
}
800927
}
801928

802929
/**
@@ -806,7 +933,9 @@ public class MongoClient {
806933
* to continue to receive events.
807934
*/
808935
public func addSDAMEventHandler<T: SDAMEventHandler>(_ handler: T) {
809-
self.sdamEventHandlers.append(WeakEventHandler(referencing: handler))
936+
self.eventHandlerLock.withLock {
937+
self.sdamEventHandlers.append(WeakEventHandler(referencing: handler))
938+
}
810939
}
811940

812941
/**
@@ -816,7 +945,9 @@ public class MongoClient {
816945
* strong reference cycle and potentially result in memory leaks.
817946
*/
818947
public func addSDAMEventHandler(_ handlerFunc: @escaping (SDAMEvent) -> Void) {
819-
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc))
948+
self.eventHandlerLock.withLock {
949+
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc))
950+
}
820951
}
821952

822953
/// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block
@@ -882,7 +1013,7 @@ extension CallbackEventHandler: CommandEventHandler where EventType == CommandEv
8821013

8831014
/// Event handler that stores a weak reference to the underlying handler.
8841015
private class WeakEventHandler<T: AnyObject> {
885-
private weak var handler: T?
1016+
internal weak var handler: T?
8861017

8871018
fileprivate init(referencing handler: T) {
8881019
self.handler = handler

0 commit comments

Comments
 (0)