-
Notifications
You must be signed in to change notification settings - Fork 65
SWIFT-1469 Provide AsyncSequence APIs for monitoring events #764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
144a8ca
80b5322
5eeec51
a83ba3f
65b2c1e
e79d817
96b9938
81e6ccb
ffd4d8f
f714156
c747cb6
b9887ab
ea0027f
e4c4429
99de8a1
f7171f3
d30aee6
6580240
b963059
8e81f6e
68b5a4b
0f17aa6
3453612
3473dac
257f3d5
e9eb042
f501c66
455cc3b
db7c6a7
a47a4ed
1f5a971
850ac0c
6010451
efe51e3
057aee0
403b286
4265515
ddfea22
14e41a2
4206a6d
f7bbd55
9746118
2617a14
c567588
b25aeb9
f6c6bad
8d5b7dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -277,12 +277,15 @@ public class MongoClient { | |
/// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete. | ||
private var wasClosed = false | ||
|
||
/// Handlers for command monitoring events. | ||
/// Handlers for command monitoring events. Should only be accessed when holding `eventHandlerLock`. | ||
internal var commandEventHandlers: [CommandEventHandler] | ||
|
||
/// Handlers for SDAM monitoring events. | ||
/// Handlers for SDAM monitoring events. Should only be accessed when holding `eventHandlerLock`. | ||
internal var sdamEventHandlers: [SDAMEventHandler] | ||
|
||
/// Lock used to synchronize access to the event handler arrays to prevent data races. | ||
private let eventHandlerLock: Lock = .init() | ||
|
||
/// Counter for generating client _ids. | ||
internal static var clientIDGenerator = NIOAtomic<Int>.makeAtomic(value: 0) | ||
|
||
|
@@ -402,6 +405,124 @@ public class MongoClient { | |
) | ||
} | ||
|
||
#if compiler(>=5.5.2) && canImport(_Concurrency) | ||
@available(macOS 10.15, *) | ||
internal class CmdHandler: CommandEventHandler { | ||
private let continuation: AsyncStream<CommandEvent>.Continuation | ||
internal init(continuation: AsyncStream<CommandEvent>.Continuation) { | ||
self.continuation = continuation | ||
} | ||
|
||
// Satisfies the protocol | ||
internal func handleCommandEvent(_ event: CommandEvent) { | ||
self.continuation.yield(event) | ||
} | ||
|
||
internal func finish() { | ||
self.continuation.finish() | ||
} | ||
} | ||
|
||
@available(macOS 10.15, *) | ||
internal class SDAMHandler: SDAMEventHandler { | ||
private let continuation: AsyncStream<SDAMEvent>.Continuation | ||
internal init(continuation: AsyncStream<SDAMEvent>.Continuation) { | ||
self.continuation = continuation | ||
} | ||
|
||
// Satisfies the protocol | ||
internal func handleSDAMEvent(_ event: SDAMEvent) { | ||
self.continuation.yield(event) | ||
} | ||
|
||
internal func finish() { | ||
self.continuation.finish() | ||
} | ||
} | ||
|
||
/** | ||
* Provides an `AsyncSequence` API for consuming command monitoring events. | ||
* | ||
* Example: printing the command events out would be written as | ||
* ``` | ||
* for try await event in client.commandEventStream() { | ||
* print(event) | ||
* } | ||
* ``` | ||
* If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`. | ||
* The stream will be ended automatically if the `Task` it is running in is cancelled. | ||
* - Returns: A `CommandEventStream` that implements `AsyncSequence`. | ||
* - Note: Only the most recent 100 events are stored in the stream. | ||
*/ | ||
@available(macOS 10.15, *) | ||
public func commandEventStream() -> CommandEventStream { | ||
var handler: CmdHandler? | ||
let stream = AsyncStream( | ||
CommandEvent.self, | ||
bufferingPolicy: .bufferingNewest(100) | ||
) { con in | ||
let cmdHandler = CmdHandler(continuation: con) | ||
handler = cmdHandler | ||
self.addCommandEventHandler(cmdHandler) | ||
} | ||
|
||
// Ok to force unwrap since handler is set in the closure | ||
// swiftlint:disable force_unwrapping | ||
let commandEvents = CommandEventStream(cmdHandler: handler!, stream: stream) | ||
|
||
return commandEvents | ||
} | ||
|
||
/** | ||
* Provides an `AsyncSequence` API for consuming SDAM monitoring events. | ||
* | ||
* Example: printing the SDAM events out would be written as | ||
* ``` | ||
* for try await event in client.sdamEventStream() { | ||
* print(event) | ||
* } | ||
* ``` | ||
* If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`. | ||
* The stream will be ended automatically if the `Task` it is running in is cancelled. | ||
* - Returns: An `SDAMEventStream` that implements `AsyncSequence`. | ||
* - Note: Only the most recent 100 events are stored in the stream. | ||
*/ | ||
@available(macOS 10.15, *) | ||
public func sdamEventStream() -> SDAMEventStream { | ||
var handler: SDAMHandler? | ||
let stream = AsyncStream( | ||
SDAMEvent.self, | ||
bufferingPolicy: .bufferingNewest(100) | ||
) { con in | ||
let sdamHandler = SDAMHandler(continuation: con) | ||
handler = sdamHandler | ||
self.addSDAMEventHandler(sdamHandler) | ||
} | ||
// Ok to force unwrap since handler is set just above | ||
// swiftlint:disable force_unwrapping | ||
let sdamEvents = SDAMEventStream(sdamHandler: handler!, stream: stream) | ||
return sdamEvents | ||
} | ||
#endif | ||
|
||
// Check which handlers are assoc. with streams and finish them | ||
private func closeHandlers() { | ||
#if compiler(>=5.5.2) && canImport(_Concurrency) | ||
if #available(macOS 10.15, *) { | ||
for handler in commandEventHandlers { | ||
if let cmdHandler = handler as? WeakEventHandler<CmdHandler> { | ||
cmdHandler.handler?.finish() | ||
} | ||
} | ||
for handler in sdamEventHandlers { | ||
if let sdamHandler = handler as? WeakEventHandler<SDAMHandler> { | ||
sdamHandler.handler?.finish() | ||
} | ||
} | ||
} | ||
#endif | ||
} | ||
|
||
/** | ||
* Closes this `MongoClient`, closing all connections to the server and cleaning up internal state. | ||
* | ||
|
@@ -422,6 +543,7 @@ public class MongoClient { | |
self.operationExecutor.shutdown() | ||
} | ||
closeResult.whenComplete { _ in | ||
self.closeHandlers() | ||
self.wasClosed = true | ||
} | ||
|
||
|
@@ -441,6 +563,7 @@ public class MongoClient { | |
public func syncClose() throws { | ||
try self.connectionPool.close() | ||
try self.operationExecutor.syncShutdown() | ||
self.closeHandlers() | ||
self.wasClosed = true | ||
} | ||
|
||
|
@@ -786,7 +909,9 @@ public class MongoClient { | |
* to continue to receive events. | ||
*/ | ||
public func addCommandEventHandler<T: CommandEventHandler>(_ handler: T) { | ||
self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler)) | ||
self.eventHandlerLock.withLock { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. related to my comment above re the lock (not something we need to fix today, but for the future): with the number of places in the code we now guard some value behind a lock and just have to remember that it should only be accessed via the lock, it would be a nice improvement to introduce some generic wrapper type like myLockedThing.withLockedValue { value in
// ... do something with value
} The method would acquire the lock, execute your provided closure with the value it guards, then releases the lock. the locked value could be private to the I filed SWIFT-1612 about this. it could be a nice little ticket to pick up if you end up having any free time waiting for reviews in the future There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a cool idea and increasingly relevant with more |
||
self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler)) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -796,7 +921,9 @@ public class MongoClient { | |
* strong reference cycle and potentially result in memory leaks. | ||
*/ | ||
public func addCommandEventHandler(_ handlerFunc: @escaping (CommandEvent) -> Void) { | ||
self.commandEventHandlers.append(CallbackEventHandler(handlerFunc)) | ||
self.eventHandlerLock.withLock { | ||
self.commandEventHandlers.append(CallbackEventHandler(handlerFunc)) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -806,7 +933,9 @@ public class MongoClient { | |
* to continue to receive events. | ||
*/ | ||
public func addSDAMEventHandler<T: SDAMEventHandler>(_ handler: T) { | ||
self.sdamEventHandlers.append(WeakEventHandler(referencing: handler)) | ||
self.eventHandlerLock.withLock { | ||
self.sdamEventHandlers.append(WeakEventHandler(referencing: handler)) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -816,7 +945,9 @@ public class MongoClient { | |
* strong reference cycle and potentially result in memory leaks. | ||
*/ | ||
public func addSDAMEventHandler(_ handlerFunc: @escaping (SDAMEvent) -> Void) { | ||
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc)) | ||
self.eventHandlerLock.withLock { | ||
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc)) | ||
} | ||
} | ||
|
||
/// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block | ||
|
@@ -882,7 +1013,7 @@ extension CallbackEventHandler: CommandEventHandler where EventType == CommandEv | |
|
||
/// Event handler that stores a weak reference to the underlying handler. | ||
private class WeakEventHandler<T: AnyObject> { | ||
private weak var handler: T? | ||
internal weak var handler: T? | ||
|
||
fileprivate init(referencing handler: T) { | ||
self.handler = handler | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reasoning behind the choice of 100 here? Would there be any benefit in allowing users to set a custom value?