-
Notifications
You must be signed in to change notification settings - Fork 65
SWIFT-1469 Provide AsyncSequence APIs for monitoring events #764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good start! some minor comments as well as a few design-related questions
.commandStartedEvent, | ||
.commandSucceededEvent | ||
] | ||
Task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't await the result or completion of this Task
so I don't think we have any guarantee it has finished executing and making all of its assertions by the time the test finishes.
one suggestion is to have the task return i
and await
and assert on that value below. something like:
let eventsTask = Task {
// ...
return i
}
try await client.db("admin").runCommand(["ping": 1])
let taskResult = try await eventTask.result
expect(taskResult).to(equal(4))
and assert it's the right value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loop never exits so we never return. All relates to the fact that we dont save the continuation, which not sure how to do given the current initialization. Preliminarily pushing all minor commits and will look at this
try await self.withTestClient { client in | ||
Task { | ||
var i = 0 | ||
var eventHandler: [EventType] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we give this a more descriptive name like eventTypes
?
I think the thread sanitizer is likely to complain about this because the callback appending to this array isn't always necessarily going to execute on the same thread and so there might be data races as a result. one way to fix this would be using a Lock
that synchronizes access to the array. you could look at TestCommandMonitor
in APMUtils.swift
for an example of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Named it eventTypes
. Looked into NioConcurrencyHelpers.Lock
that APMUtils.swift
uses and I locked the appending to the array (since you cant pass in an async function into the .withLock{ ... }
) and left associated comments.
|
||
func testSDAMEventStreamClient() async throws { | ||
try await self.withTestClient { client in | ||
Task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the same concern as above applies where we are not awaiting completion of the Task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep will address
] | ||
Task { | ||
var i = 0 | ||
for try await event in client.commandEvents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given my comment above about never "finishing" the continuation, do we ever break out of this loop / does the task ever actually finish?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do not, need to cache as commented above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After caching the continuation, I am able to break the loop and end the task by doing something like
let output = client.commandEventStream() ... output.finish()
Although I can only close the loop at a specific time if I call .finish()
inside the loop. For example, if I want to check that 4 events happened, I'll call .finish()
via if i == 4
. Feels a little bit circular but I can confirm that we break out of the loop at expected time. Not sure if you had a different idea about how to test for ending the task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few comments now but given the discussed design change to allow multiple streams I'll hold off on re-reviewing the rest as I think a lot of the logic is likely to change
Sources/MongoSwift/MongoClient.swift
Outdated
if self._commandEvents == nil { | ||
self._commandEvents = CommandEventStream(stream: | ||
AsyncStream { con in | ||
self.addCommandEventHandler { event in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the client currently owns the handler, users will just get the same one next time they invoke the computed property, even if they drop it.
ah right, yeah I think I wrote that comment before realizing in my next comment below that there was only ever one of each stream.
yeah, I think overall as long as there aren't any technical limitations to it, the on-demand/factory approach would be a bit nicer so we can allow multiple streams, and can support stopping/restarting monitoring at will.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I mostly have little testing nits here and there
Sources/MongoSwift/APM.swift
Outdated
public typealias AsyncIterator = EventStreamIterator<T> | ||
|
||
/// Creates the asynchronous iterator that produces elements of this `EventStream`. | ||
public func makeAsyncIterator() -> EventStreamIterator<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with using AsyncStream<T>.AsyncIterator
directly is that if we ever want to change the internals of how CommandEventStream
or SDAMEventStream
are implemented, we'll still need to somehow create an AsyncStream
iterator. If we have a wrapper type, we're free to change the internals of our event stream types as needed.
We also looked into having the event stream types be their own iterators, but that would prevent users from being able to iterate over the same stream from different AsyncIterators
for not much benefit, so we decided to just keep it as is.
Tests/MongoSwiftTests/APMTests.swift
Outdated
try await coll.insertOne(["hello": "world"]) | ||
} | ||
// Tasks start and finish | ||
try await assertIsEventuallyTrue(description: "each task is started") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this statement doesn't verify the tasks have actually started, since the condition will always be true once this part is hit. To verify they've started, you'll need an atomic counter or something that each task can increment once they start executing.
Also, it's important that this assertion happens before we start generating the events (i.e. before the ping
command) so as to ensure each task receives all the events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see - since the update happens in the loop but outside the task it basically runs synchronously. I think we have to return to using (a much simpler) actor for updating things async. Since the tasks dont kick off in the same order, using atomic counters often ends up updating an outdated version itself (ex. on the 4th iter of the loop, the load()
function sees the 3rd iter-version of the counter meaning the counter never sees a value of 5
). Using
actor TaskCounter {
var taskCounter = 0
func incr() {
taskCounter += 1
}
}
seems to work best. Also moved commands to below the check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the neat things about atomics is that you can also increment them atomically. So instead of doing a load
+ store
, you can do atomic.add(1)
, and it'll avoid the problems you were seeing.
That being said, this actor
approach also works, so we can just leave it as-is.
Sources/MongoSwift/MongoClient.swift
Outdated
} | ||
|
||
/// Provides an `AsyncSequence` API for consuming command monitoring events. | ||
/// Example: printing the command events out would be written as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a block comment for the example code would be a bit more readable than just inline.
e.g.
for await event in client.commandEventStream() {
print(event)
}
Ditto for the SDAM event example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took this opportunity to beef up the docstring and make sure all the elements are in the right sections of the documentation (summary/discussion/etc.). Docstring is now
/**
* Provides an `AsyncSequence` API for consuming SDAM monitoring events.
*
* Example: printing the SDAM events out would be written as
for try await event in client.sdamEventStream() {
print(event)
}
* Wrapping in a `Task { ... }` may be desired for asynchronicity.
* - Returns: An `SDAMEventStream` that implements `AsyncSequence`.
* - Note: Only the most recent 100 events are stored in the stream.
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in order the code to be formatted as code, you need the triple backticks (```) like in markdown. I think you also need to keep prepending the line with *
. For an existing example, check out ClientSession.swift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per discussion, this method works but used the backticks method for codebase consistency
Failed bc waiting on SWIFT-1604 to be merged in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good! I just have one minor docstring comment
Tests/MongoSwiftTests/APMTests.swift
Outdated
try await coll.insertOne(["hello": "world"]) | ||
} | ||
// Tasks start and finish | ||
try await assertIsEventuallyTrue(description: "each task is started") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the neat things about atomics is that you can also increment them atomically. So instead of doing a load
+ store
, you can do atomic.add(1)
, and it'll avoid the problems you were seeing.
That being said, this actor
approach also works, so we can just leave it as-is.
Sources/MongoSwift/MongoClient.swift
Outdated
} | ||
|
||
/// Provides an `AsyncSequence` API for consuming command monitoring events. | ||
/// Example: printing the command events out would be written as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in order the code to be formatted as code, you need the triple backticks (```) like in markdown. I think you also need to keep prepending the line with *
. For an existing example, check out ClientSession.swift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is looking really close!! I just have a few minor comments and questions.
@@ -786,7 +907,9 @@ public class MongoClient { | |||
* to continue to receive events. | |||
*/ | |||
public func addCommandEventHandler<T: CommandEventHandler>(_ handler: T) { | |||
self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler)) | |||
self.eventHandlerLock.withLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
related to my comment above re the lock (not something we need to fix today, but for the future): with the number of places in the code we now guard some value behind a lock and just have to remember that it should only be accessed via the lock, it would be a nice improvement to introduce some generic wrapper type like Locked<T>
that stores a value of type T
plus its corresponding Lock
and has a method that lets you do something like
myLockedThing.withLockedValue { value in
// ... do something with value
}
The method would acquire the lock, execute your provided closure with the value it guards, then releases the lock. the locked value could be private to the Locked
type so it's impossible to access it other than when you hold the lock, which would make it harder for us to forget to acquire the lock.
I filed SWIFT-1612 about this. it could be a nice little ticket to pick up if you end up having any free time waiting for reviews in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cool idea and increasingly relevant with more async
usage in the driver. Def a cool idea to pick up in some downtime.
let workTask = Task { () -> Bool in | ||
while !Task.isCancelled { | ||
guard try await block() else { | ||
try? await Task.sleep(seconds: sleepInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to try?
here instead of just try
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per patricks prev comment,
I think we'll want to use a
try?
here so that aCancellationError
isn't throw, otherwise ourXCTFail
below won't be executed.
Basically if the task gets cancelled, we dont want to throw a cancellation error, we want to hit the continue
, then break out of the while !Task.iscancelled()
and return false to hit the XCT condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oooo ok. can we add a comment saying that this is to ignore CancellationError
s then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple formatting suggestions and one very minor comment! LGTM otherwise
let workTask = Task { () -> Bool in | ||
while !Task.isCancelled { | ||
guard try await block() else { | ||
try? await Task.sleep(seconds: sleepInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oooo ok. can we add a comment saying that this is to ignore CancellationError
s then?
Co-authored-by: Kaitlin Mahar <kaitlinmahar@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! 🎉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good! just one question and a small typo fix
* If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`. | ||
* The stream will be ended automatically if the `Task` it is running in is cancelled. | ||
* - Returns: A `CommandEventStream` that implements `AsyncSequence`. | ||
* - Note: Only the most recent 100 events are stored in the stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reasoning behind the choice of 100 here? Would there be any benefit in allowing users to set a custom value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm after our discussion in standup (mod the one typo fix)!
Co-authored-by: Isabel Atkinson <isabelatkinson@gmail.com>
Need to write local tests