Skip to content

SWIFT-1469 Provide AsyncSequence APIs for monitoring events #764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 47 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
144a8ca
EventStream struct + client field members
rchhaya Jun 27, 2022
80b5322
handing api differences using extensions
rchhaya Jun 27, 2022
5eeec51
comment debugging
rchhaya Jun 28, 2022
a83ba3f
comment debugging
rchhaya Jun 29, 2022
65b2c1e
setting up iterators and streams with directives
rchhaya Jun 29, 2022
e79d817
Fixing version issues via computed properties
rchhaya Jun 29, 2022
96b9938
visibility modifiers and docstrings
rchhaya Jul 1, 2022
81e6ccb
commenting force cast and clean ups
rchhaya Jul 1, 2022
ffd4d8f
Prelim tests
rchhaya Jul 1, 2022
f714156
prelim test
rchhaya Jul 5, 2022
c747cb6
writing async tests for command/sdam
rchhaya Jul 5, 2022
b9887ab
Improving SDAM test and creating extension
rchhaya Jul 5, 2022
ea0027f
Fixing SDAM test, cleaning up comments
rchhaya Jul 6, 2022
e4c4429
sourcery updates
rchhaya Jul 6, 2022
99de8a1
skip swiftlint for sync exports
rchhaya Jul 6, 2022
f7171f3
sourcery compiler directive issues
rchhaya Jul 7, 2022
d30aee6
sourcery skip before available
rchhaya Jul 7, 2022
6580240
lint
rchhaya Jul 7, 2022
b963059
removing comments
rchhaya Jul 7, 2022
8e81f6e
Fixing comments, tests, locks, annotations
rchhaya Jul 8, 2022
68b5a4b
Playing around with caching continuation + buffers
rchhaya Jul 8, 2022
0f17aa6
Fixing client.finish
rchhaya Jul 8, 2022
3453612
Fixing client.close()
rchhaya Jul 8, 2022
3473dac
Skipping client.close() for now
rchhaya Jul 8, 2022
257f3d5
On-demand approach, no cache,strong handler
rchhaya Jul 11, 2022
e9eb042
Caching continuation, .finish() for cmd test
rchhaya Jul 11, 2022
f501c66
Weak handler, only works with outputter not .cmdstream
rchhaya Jul 12, 2022
455cc3b
Modified compiler directives+documentation
rchhaya Jul 12, 2022
db7c6a7
Annotations on test, good but lot of extra comments
rchhaya Jul 12, 2022
a47a4ed
Removing commented code/prints, beefing up docstring
rchhaya Jul 12, 2022
1f5a971
Removing old task code
rchhaya Jul 12, 2022
850ac0c
Removing generic types, new tests, peer programming
rchhaya Jul 13, 2022
6010451
Jagged array check
rchhaya Jul 14, 2022
efe51e3
Locking handler access when appending + fixing test
rchhaya Jul 14, 2022
057aee0
Using concurrent streams helper for command+SDAM tests
rchhaya Jul 14, 2022
403b286
Accounting for load balancer events
rchhaya Jul 14, 2022
4265515
Removing a forgotten print statement
rchhaya Jul 14, 2022
ddfea22
Removing no-longer-necessary struct wrappers
rchhaya Jul 14, 2022
14e41a2
Minor test changes, change unwrapping procedures, docs, comments
rchhaya Jul 15, 2022
4206a6d
Returning task from clients, removing need for actors, comments
rchhaya Jul 18, 2022
f7bbd55
Fixing tests + docstrings
rchhaya Jul 25, 2022
9746118
Adding drop expectation bc of namespace in cmd tests
rchhaya Jul 25, 2022
2617a14
Adding backticks to docstring
rchhaya Jul 26, 2022
c567588
Minor variable/comment/doc changes + taskGroup for concurrent stream
rchhaya Jul 27, 2022
b25aeb9
Apply suggestions from code review
rchhaya Jul 27, 2022
f6c6bad
Comment explaining optional try for cancellationerror
rchhaya Jul 27, 2022
8d5b7dd
Update Tests/MongoSwiftTests/AsyncAwaitTestUtils.swift
rchhaya Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions Sources/MongoSwift/APM.swift
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,111 @@ private protocol CommandEventProtocol {
var serviceID: BSONObjectID? { get }
}

#if compiler(>=5.5.2) && canImport(_Concurrency)
/// An asynchronous way to monitor command events that uses `AsyncSequence`.
/// Only available for Swift 5.5.2 and higher.
@available(macOS 10.15, *)
// sourcery: skipSyncExport
public struct CommandEventStream {
fileprivate let stream: AsyncStream<CommandEvent>
private let cmdHandler: CommandEventHandler
/// Initialize the stream with a `CommandEventHandler`
internal init(cmdHandler: CommandEventHandler, stream: AsyncStream<CommandEvent>) {
self.cmdHandler = cmdHandler
self.stream = stream
}
}

/// An asynchronous way to monitor SDAM events that uses `AsyncSequence`.
/// Only available for Swift 5.5.2 and higher.
@available(macOS 10.15, *)
// sourcery: skipSyncExport
public struct SDAMEventStream {
fileprivate let stream: AsyncStream<SDAMEvent>
private let sdamHandler: SDAMEventHandler
/// Initialize the stream with an `SDAMEventHandler`
internal init(sdamHandler: SDAMEventHandler, stream: AsyncStream<SDAMEvent>) {
self.sdamHandler = sdamHandler
self.stream = stream
}
}

@available(macOS 10.15, *)
extension CommandEventStream: AsyncSequence {
/// The type of element produced by this `CommandEventStream`.
public typealias Element = CommandEvent

/// The asynchronous iterator of type `CommandEventStreamIterator`
/// that produces elements of this asynchronous sequence.
public typealias AsyncIterator = CommandEventStreamIterator

/// Creates the asynchronous iterator that produces elements of this `CommandEventStream`.
public func makeAsyncIterator() -> CommandEventStreamIterator {
CommandEventStreamIterator(cmdEventStream: self)
}
}

@available(macOS 10.15, *)
extension SDAMEventStream: AsyncSequence {
/// The type of element produced by this `SDAMEventStream`.
public typealias Element = SDAMEvent

/// The asynchronous iterator of type `SDAMEventStreamIterator`
/// that produces elements of this asynchronous sequence.
public typealias AsyncIterator = SDAMEventStreamIterator

/// Creates the asynchronous iterator that produces elements of this `SDAMEventStream`.
public func makeAsyncIterator() -> SDAMEventStreamIterator {
SDAMEventStreamIterator(sdamEventStream: self)
}
}

/// The associated iterator for the `CommandEventStream`.
@available(macOS 10.15, *)
// sourcery: skipSyncExport
public struct CommandEventStreamIterator: AsyncIteratorProtocol {
private var iterator: AsyncStream<CommandEvent>.AsyncIterator
private let cmdEventStream: CommandEventStream

/// Initialize the iterator
internal init(cmdEventStream: CommandEventStream) {
self.iterator = cmdEventStream.stream.makeAsyncIterator()
self.cmdEventStream = cmdEventStream
}

/// Asynchronously advances to the next element and returns it, or ends the sequence if there is no next element.
public mutating func next() async -> CommandEvent? {
await self.iterator.next()
}

/// The type of element iterated over by this `CommandEventStreamIterator`.
public typealias Element = CommandEvent
}

/// The associated iterator for the `SDAMEventStream`.
@available(macOS 10.15, *)
// sourcery: skipSyncExport
public struct SDAMEventStreamIterator: AsyncIteratorProtocol {
private var iterator: AsyncStream<SDAMEvent>.AsyncIterator
private let sdamEventStream: SDAMEventStream

/// Initialize the iterator
internal init(sdamEventStream: SDAMEventStream) {
self.iterator = sdamEventStream.stream.makeAsyncIterator()
self.sdamEventStream = sdamEventStream
}

/// Asynchronously advances to the next element and returns it, or ends the sequence if there is no next element.
public mutating func next() async -> SDAMEvent? {
await self.iterator.next()
}

/// The type of element iterated over by this `SDAMEventStreamIterator`.
public typealias Element = SDAMEvent
}

#endif

/// An event published when a command starts.
public struct CommandStartedEvent: MongoSwiftEvent, CommandEventProtocol {
/// Wrapper around a `mongoc_apm_command_started_t`.
Expand Down
145 changes: 138 additions & 7 deletions Sources/MongoSwift/MongoClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,15 @@ public class MongoClient {
/// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete.
private var wasClosed = false

/// Handlers for command monitoring events.
/// Handlers for command monitoring events. Should only be accessed when holding `eventHandlerLock`.
internal var commandEventHandlers: [CommandEventHandler]

/// Handlers for SDAM monitoring events.
/// Handlers for SDAM monitoring events. Should only be accessed when holding `eventHandlerLock`.
internal var sdamEventHandlers: [SDAMEventHandler]

/// Lock used to synchronize access to the event handler arrays to prevent data races.
private let eventHandlerLock: Lock = .init()

/// Counter for generating client _ids.
internal static var clientIDGenerator = NIOAtomic<Int>.makeAtomic(value: 0)

Expand Down Expand Up @@ -402,6 +405,124 @@ public class MongoClient {
)
}

#if compiler(>=5.5.2) && canImport(_Concurrency)
@available(macOS 10.15, *)
internal class CmdHandler: CommandEventHandler {
private let continuation: AsyncStream<CommandEvent>.Continuation
internal init(continuation: AsyncStream<CommandEvent>.Continuation) {
self.continuation = continuation
}

// Satisfies the protocol
internal func handleCommandEvent(_ event: CommandEvent) {
self.continuation.yield(event)
}

internal func finish() {
self.continuation.finish()
}
}

@available(macOS 10.15, *)
internal class SDAMHandler: SDAMEventHandler {
private let continuation: AsyncStream<SDAMEvent>.Continuation
internal init(continuation: AsyncStream<SDAMEvent>.Continuation) {
self.continuation = continuation
}

// Satisfies the protocol
internal func handleSDAMEvent(_ event: SDAMEvent) {
self.continuation.yield(event)
}

internal func finish() {
self.continuation.finish()
}
}

/**
* Provides an `AsyncSequence` API for consuming command monitoring events.
*
* Example: printing the command events out would be written as
* ```
* for try await event in client.commandEventStream() {
* print(event)
* }
* ```
* If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`.
* The stream will be ended automatically if the `Task` it is running in is cancelled.
* - Returns: A `CommandEventStream` that implements `AsyncSequence`.
* - Note: Only the most recent 100 events are stored in the stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind the choice of 100 here? Would there be any benefit in allowing users to set a custom value?

*/
@available(macOS 10.15, *)
public func commandEventStream() -> CommandEventStream {
var handler: CmdHandler?
let stream = AsyncStream(
CommandEvent.self,
bufferingPolicy: .bufferingNewest(100)
) { con in
let cmdHandler = CmdHandler(continuation: con)
handler = cmdHandler
self.addCommandEventHandler(cmdHandler)
}

// Ok to force unwrap since handler is set in the closure
// swiftlint:disable force_unwrapping
let commandEvents = CommandEventStream(cmdHandler: handler!, stream: stream)

return commandEvents
}

/**
* Provides an `AsyncSequence` API for consuming SDAM monitoring events.
*
* Example: printing the SDAM events out would be written as
* ```
* for try await event in client.sdamEventStream() {
* print(event)
* }
* ```
* If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`.
* The stream will be ended automatically if the `Task` it is running in is cancelled.
* - Returns: An `SDAMEventStream` that implements `AsyncSequence`.
* - Note: Only the most recent 100 events are stored in the stream.
*/
@available(macOS 10.15, *)
public func sdamEventStream() -> SDAMEventStream {
var handler: SDAMHandler?
let stream = AsyncStream(
SDAMEvent.self,
bufferingPolicy: .bufferingNewest(100)
) { con in
let sdamHandler = SDAMHandler(continuation: con)
handler = sdamHandler
self.addSDAMEventHandler(sdamHandler)
}
// Ok to force unwrap since handler is set just above
// swiftlint:disable force_unwrapping
let sdamEvents = SDAMEventStream(sdamHandler: handler!, stream: stream)
return sdamEvents
}
#endif

// Check which handlers are assoc. with streams and finish them
private func closeHandlers() {
#if compiler(>=5.5.2) && canImport(_Concurrency)
if #available(macOS 10.15, *) {
for handler in commandEventHandlers {
if let cmdHandler = handler as? WeakEventHandler<CmdHandler> {
cmdHandler.handler?.finish()
}
}
for handler in sdamEventHandlers {
if let sdamHandler = handler as? WeakEventHandler<SDAMHandler> {
sdamHandler.handler?.finish()
}
}
}
#endif
}

/**
* Closes this `MongoClient`, closing all connections to the server and cleaning up internal state.
*
Expand All @@ -422,6 +543,7 @@ public class MongoClient {
self.operationExecutor.shutdown()
}
closeResult.whenComplete { _ in
self.closeHandlers()
self.wasClosed = true
}

Expand All @@ -441,6 +563,7 @@ public class MongoClient {
public func syncClose() throws {
try self.connectionPool.close()
try self.operationExecutor.syncShutdown()
self.closeHandlers()
self.wasClosed = true
}

Expand Down Expand Up @@ -786,7 +909,9 @@ public class MongoClient {
* to continue to receive events.
*/
public func addCommandEventHandler<T: CommandEventHandler>(_ handler: T) {
self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler))
self.eventHandlerLock.withLock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to my comment above re the lock (not something we need to fix today, but for the future): with the number of places in the code we now guard some value behind a lock and just have to remember that it should only be accessed via the lock, it would be a nice improvement to introduce some generic wrapper type like Locked<T> that stores a value of type T plus its corresponding Lock and has a method that lets you do something like

myLockedThing.withLockedValue { value in
    // ... do something with value
}

The method would acquire the lock, execute your provided closure with the value it guards, then releases the lock. the locked value could be private to the Locked type so it's impossible to access it other than when you hold the lock, which would make it harder for us to forget to acquire the lock.

I filed SWIFT-1612 about this. it could be a nice little ticket to pick up if you end up having any free time waiting for reviews in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cool idea and increasingly relevant with more async usage in the driver. Def a cool idea to pick up in some downtime.

self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler))
}
}

/**
Expand All @@ -796,7 +921,9 @@ public class MongoClient {
* strong reference cycle and potentially result in memory leaks.
*/
public func addCommandEventHandler(_ handlerFunc: @escaping (CommandEvent) -> Void) {
self.commandEventHandlers.append(CallbackEventHandler(handlerFunc))
self.eventHandlerLock.withLock {
self.commandEventHandlers.append(CallbackEventHandler(handlerFunc))
}
}

/**
Expand All @@ -806,7 +933,9 @@ public class MongoClient {
* to continue to receive events.
*/
public func addSDAMEventHandler<T: SDAMEventHandler>(_ handler: T) {
self.sdamEventHandlers.append(WeakEventHandler(referencing: handler))
self.eventHandlerLock.withLock {
self.sdamEventHandlers.append(WeakEventHandler(referencing: handler))
}
}

/**
Expand All @@ -816,7 +945,9 @@ public class MongoClient {
* strong reference cycle and potentially result in memory leaks.
*/
public func addSDAMEventHandler(_ handlerFunc: @escaping (SDAMEvent) -> Void) {
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc))
self.eventHandlerLock.withLock {
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc))
}
}

/// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block
Expand Down Expand Up @@ -882,7 +1013,7 @@ extension CallbackEventHandler: CommandEventHandler where EventType == CommandEv

/// Event handler that stores a weak reference to the underlying handler.
private class WeakEventHandler<T: AnyObject> {
private weak var handler: T?
internal weak var handler: T?

fileprivate init(referencing handler: T) {
self.handler = handler
Expand Down
2 changes: 1 addition & 1 deletion Sources/MongoSwiftSync/Exports.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated using Sourcery 1.6.0 — https://github.com/krzysztofzablocki/Sourcery
// Generated using Sourcery 1.6.1 — https://github.com/krzysztofzablocki/Sourcery
// DO NOT EDIT

// Re-export the BSON library
Expand Down
Loading