Skip to content

Commit 5000d32

Browse files
authored
RUST-1505 3.0 event API: command events (#1004)
1 parent 472bd3c commit 5000d32

File tree

18 files changed

+215
-531
lines changed

18 files changed

+215
-531
lines changed

src/client.rs

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
concern::{ReadConcern, WriteConcern},
3333
db::Database,
3434
error::{Error, ErrorKind, Result},
35-
event::command::{handle_command_event, CommandEvent},
35+
event::command::CommandEvent,
3636
id_set::IdSet,
3737
options::{
3838
ClientOptions,
@@ -44,6 +44,7 @@ use crate::{
4444
},
4545
sdam::{server_selection, SelectedServer, Topology},
4646
tracking_arc::TrackingArc,
47+
BoxFuture,
4748
ClientSession,
4849
};
4950

@@ -257,27 +258,8 @@ impl Client {
257258
}
258259
}
259260

260-
#[cfg(not(feature = "tracing-unstable"))]
261-
pub(crate) async fn emit_command_event(&self, generate_event: impl FnOnce() -> CommandEvent) {
262-
let handler = self.inner.options.command_event_handler.as_ref();
263-
let test_channel = self.test_command_event_channel();
264-
if handler.is_none() && test_channel.is_none() {
265-
return;
266-
}
267-
268-
let event = generate_event();
269-
if let Some(tx) = test_channel {
270-
let (msg, ack) = crate::runtime::AcknowledgedMessage::package(event.clone());
271-
let _ = tx.send(msg).await;
272-
ack.wait_for_acknowledgment().await;
273-
}
274-
if let Some(handler) = handler {
275-
handle_command_event(handler.as_ref(), event);
276-
}
277-
}
278-
279-
#[cfg(feature = "tracing-unstable")]
280261
pub(crate) async fn emit_command_event(&self, generate_event: impl FnOnce() -> CommandEvent) {
262+
#[cfg(feature = "tracing-unstable")]
281263
let tracing_emitter = if trace_or_log_enabled!(
282264
target: COMMAND_TRACING_EVENT_TARGET,
283265
TracingOrLogLevel::Debug
@@ -289,9 +271,11 @@ impl Client {
289271
} else {
290272
None
291273
};
292-
let apm_event_handler = self.inner.options.command_event_handler.as_ref();
293274
let test_channel = self.test_command_event_channel();
294-
if !(tracing_emitter.is_some() || apm_event_handler.is_some() || test_channel.is_some()) {
275+
let should_send = test_channel.is_some() || self.options().command_event_handler.is_some();
276+
#[cfg(feature = "tracing-unstable")]
277+
let should_send = should_send || tracing_emitter.is_some();
278+
if !should_send {
295279
return;
296280
}
297281

@@ -301,15 +285,12 @@ impl Client {
301285
let _ = tx.send(msg).await;
302286
ack.wait_for_acknowledgment().await;
303287
}
304-
if let (Some(event_handler), Some(ref tracing_emitter)) =
305-
(apm_event_handler, &tracing_emitter)
306-
{
307-
handle_command_event(event_handler.as_ref(), event.clone());
308-
handle_command_event(tracing_emitter, event);
309-
} else if let Some(event_handler) = apm_event_handler {
310-
handle_command_event(event_handler.as_ref(), event);
311-
} else if let Some(ref tracing_emitter) = tracing_emitter {
312-
handle_command_event(tracing_emitter, event);
288+
#[cfg(feature = "tracing-unstable")]
289+
if let Some(ref tracing_emitter) = tracing_emitter {
290+
tracing_emitter.handle(event.clone());
291+
}
292+
if let Some(handler) = &self.options().command_event_handler {
293+
handler.handle(event);
313294
}
314295
}
315296

@@ -693,7 +674,6 @@ impl Client {
693674
.ok()
694675
}
695676

696-
#[cfg(test)]
697677
pub(crate) fn options(&self) -> &ClientOptions {
698678
&self.inner.options
699679
}
@@ -731,6 +711,3 @@ impl AsyncDropToken {
731711
Self { tx: self.tx.take() }
732712
}
733713
}
734-
735-
// TODO: merge this with other BoxFuture defs
736-
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;

src/client/auth/oidc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::{
1717
},
1818
cmap::Connection,
1919
error::{Error, Result},
20+
BoxFuture,
2021
};
2122

2223
use super::{sasl::SaslContinue, Credential, MONGODB_OIDC_STR};
@@ -27,8 +28,6 @@ pub struct Callbacks {
2728
inner: Arc<CallbacksInner>,
2829
}
2930

30-
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
31-
3231
impl Callbacks {
3332
/// Create a new instance with a token request callback.
3433
pub fn new<F>(on_request: F) -> Self

src/client/options.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
compression::Compressor,
3333
concern::{Acknowledgment, ReadConcern, WriteConcern},
3434
error::{Error, ErrorKind, Result},
35-
event::{cmap::CmapEventHandler, command::CommandEventHandler, sdam::SdamEventHandler},
35+
event::{cmap::CmapEventHandler, sdam::SdamEventHandler},
3636
options::ReadConcernLevel,
3737
sdam::{verify_max_staleness, DEFAULT_HEARTBEAT_FREQUENCY, MIN_HEARTBEAT_FREQUENCY},
3838
selection_criteria::{ReadPreference, SelectionCriteria, TagSet},
@@ -416,9 +416,10 @@ pub struct ClientOptions {
416416
///
417417
/// Note that monitoring command events may incur a performance penalty.
418418
#[derivative(Debug = "ignore", PartialEq = "ignore")]
419-
#[builder(default)]
419+
#[builder(default, setter(strip_option))]
420420
#[serde(skip)]
421-
pub command_event_handler: Option<Arc<dyn CommandEventHandler>>,
421+
pub command_event_handler:
422+
Option<crate::event::EventHandler<crate::event::command::CommandEvent>>,
422423

423424
/// The connect timeout passed to each underlying TcpStream when attemtping to connect to the
424425
/// server.

src/client/session.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::{
2020
options::{SessionOptions, TransactionOptions},
2121
sdam::{ServerInfo, TransactionSupportStatus},
2222
selection_criteria::SelectionCriteria,
23+
BoxFuture,
2324
Client,
2425
};
2526
pub use cluster_time::ClusterTime;
@@ -691,8 +692,6 @@ impl ClientSession {
691692
}
692693
}
693694

694-
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
695-
696695
struct DroppedClientSession {
697696
cluster_time: Option<ClusterTime>,
698697
server_session: ServerSession,

src/client/session/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ async fn cluster_time_in_commands() {
305305
let handler = Arc::new(EventHandler::new());
306306
let mut options = get_client_options().await.clone();
307307
options.heartbeat_freq = Some(Duration::from_secs(1000));
308-
options.command_event_handler = Some(handler.clone());
308+
options.command_event_handler = Some(handler.clone().into());
309309
options.sdam_event_handler = Some(handler.clone());
310310

311311
// Ensure we only connect to one server so the monitor checks from other servers

src/event.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,102 @@
33
pub mod cmap;
44
pub mod command;
55
pub mod sdam;
6+
7+
use std::sync::Arc;
8+
9+
use futures_core::future::BoxFuture;
10+
11+
use crate::event::command::CommandEvent;
12+
13+
/// A destination for events. Allows implicit conversion via [`From`] for concrete types for
14+
/// convenience with [`crate::options::ClientOptions`] construction:
15+
///
16+
/// ```rust
17+
/// # use mongodb::options::ClientOptions;
18+
/// # fn example() {
19+
/// let (tx, mut rx) = tokio::sync::mpsc::channel(100);
20+
/// tokio::spawn(async move {
21+
/// while let Some(ev) = rx.recv().await {
22+
/// println!("{:?}", ev);
23+
/// }
24+
/// });
25+
/// let options = ClientOptions::builder()
26+
/// .command_event_handler(tx)
27+
/// .build();
28+
/// # }
29+
/// ```
30+
///
31+
/// or explicit construction for `Fn` traits:
32+
///
33+
/// ```rust
34+
/// # use mongodb::options::ClientOptions;
35+
/// # use mongodb::event::EventHandler;
36+
/// # fn example() {
37+
/// let options = ClientOptions::builder()
38+
/// .command_event_handler(EventHandler::callback(|ev| println!("{:?}", ev)))
39+
/// .build();
40+
/// # }
41+
/// ```
42+
#[derive(Clone)]
43+
#[non_exhaustive]
44+
pub enum EventHandler<T> {
45+
/// A callback.
46+
Callback(Arc<dyn Fn(T) + Sync + Send>),
47+
/// An async callback.
48+
AsyncCallback(Arc<dyn Fn(T) -> BoxFuture<'static, ()> + Sync + Send>),
49+
/// A `tokio` channel sender.
50+
TokioMpsc(tokio::sync::mpsc::Sender<T>),
51+
}
52+
53+
impl<T> std::fmt::Debug for EventHandler<T> {
54+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55+
f.debug_tuple("EventHandler").finish()
56+
}
57+
}
58+
59+
impl<T> From<tokio::sync::mpsc::Sender<T>> for EventHandler<T> {
60+
fn from(value: tokio::sync::mpsc::Sender<T>) -> Self {
61+
Self::TokioMpsc(value)
62+
}
63+
}
64+
65+
#[allow(deprecated)]
66+
impl<T: crate::event::command::CommandEventHandler + 'static> From<Arc<T>>
67+
for EventHandler<CommandEvent>
68+
{
69+
fn from(value: Arc<T>) -> Self {
70+
Self::callback(move |ev| match ev {
71+
CommandEvent::Started(e) => value.handle_command_started_event(e),
72+
CommandEvent::Succeeded(e) => value.handle_command_succeeded_event(e),
73+
CommandEvent::Failed(e) => value.handle_command_failed_event(e),
74+
})
75+
}
76+
}
77+
78+
impl<T: Send + Sync + 'static> EventHandler<T> {
79+
/// Construct a new event handler with a callback.
80+
pub fn callback(f: impl Fn(T) + Send + Sync + 'static) -> Self {
81+
Self::Callback(Arc::new(f))
82+
}
83+
84+
/// Construct a new event handler with an async callback.
85+
pub fn async_callback(f: impl Fn(T) -> BoxFuture<'static, ()> + Send + Sync + 'static) -> Self {
86+
Self::AsyncCallback(Arc::new(f))
87+
}
88+
89+
pub(crate) fn handle(&self, event: T) {
90+
match self {
91+
// TODO RUST-1731 Use tokio's spawn_blocking
92+
Self::Callback(cb) => (cb)(event),
93+
Self::AsyncCallback(cb) => {
94+
crate::runtime::spawn((cb)(event));
95+
}
96+
Self::TokioMpsc(sender) => {
97+
let sender = sender.clone();
98+
crate::runtime::spawn(async move {
99+
let _ = sender.send(event).await;
100+
});
101+
}
102+
}
103+
}
104+
}

src/event/command.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ pub struct CommandFailedEvent {
9393
pub service_id: Option<ObjectId>,
9494
}
9595

96+
/// Usage of this trait is deprecated. Applications should use the simpler
97+
/// [`EventHandler`](crate::event::EventHandler) API.
98+
///
9699
/// Applications can implement this trait to specify custom logic to run on each command event sent
97100
/// by the driver.
98101
///
@@ -121,7 +124,7 @@ pub struct CommandFailedEvent {
121124
/// }
122125
///
123126
/// # fn do_stuff() -> Result<()> {
124-
/// let handler: Arc<dyn CommandEventHandler> = Arc::new(FailedCommandLogger);
127+
/// let handler = Arc::new(FailedCommandLogger);
125128
/// let options = ClientOptions::builder()
126129
/// .command_event_handler(handler)
127130
/// .build();
@@ -131,6 +134,7 @@ pub struct CommandFailedEvent {
131134
/// # Ok(())
132135
/// # }
133136
/// ```
137+
#[deprecated]
134138
pub trait CommandEventHandler: Send + Sync {
135139
/// A [`Client`](../../struct.Client.html) will call this method on each registered handler
136140
/// whenever a database command is initiated.
@@ -146,18 +150,11 @@ pub trait CommandEventHandler: Send + Sync {
146150
}
147151

148152
#[derive(Clone, Debug, Serialize)]
153+
#[allow(missing_docs)]
149154
#[serde(untagged)]
150-
pub(crate) enum CommandEvent {
155+
#[non_exhaustive]
156+
pub enum CommandEvent {
151157
Started(CommandStartedEvent),
152158
Succeeded(CommandSucceededEvent),
153159
Failed(CommandFailedEvent),
154160
}
155-
156-
/// Passes the specified event to the corresponding method on the provided handler.
157-
pub(crate) fn handle_command_event(handler: &dyn CommandEventHandler, event: CommandEvent) {
158-
match event {
159-
CommandEvent::Started(event) => handler.handle_command_started_event(event),
160-
CommandEvent::Succeeded(event) => handler.handle_command_succeeded_event(event),
161-
CommandEvent::Failed(event) => handler.handle_command_failed_event(event),
162-
}
163-
}

src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,9 +363,11 @@ pub use crate::{
363363
gridfs::{GridFsBucket, GridFsDownloadStream, GridFsUploadStream},
364364
};
365365

366-
pub use {client::session::ClusterTime, coll::Namespace, index::IndexModel, sdam::public::*, search_index::SearchIndexModel};
366+
pub use {client::action, client::session::ClusterTime, coll::Namespace, index::IndexModel, sdam::public::*, search_index::SearchIndexModel};
367+
368+
/// A boxed future.
369+
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
367370

368-
pub use client::action;
369371

370372
#[cfg(all(feature = "tokio-runtime", feature = "sync",))]
371373
compile_error!(

src/test/csfle.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,7 @@ use tokio::net::TcpListener;
3232
use crate::{
3333
client_encryption::{ClientEncryption, EncryptKey, MasterKey, RangeOptions},
3434
error::{ErrorKind, WriteError, WriteFailure},
35-
event::command::{
36-
CommandEventHandler,
37-
CommandFailedEvent,
38-
CommandStartedEvent,
39-
CommandSucceededEvent,
40-
},
35+
event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
4136
options::{
4237
CollectionOptions,
4338
CreateCollectionOptions,
@@ -568,7 +563,7 @@ async fn bson_size_limits() -> Result<()> {
568563
let mut opts = get_client_options().await.clone();
569564
let handler = Arc::new(EventHandler::new());
570565
let mut events = handler.subscribe();
571-
opts.command_event_handler = Some(handler.clone());
566+
opts.command_event_handler = Some(handler.clone().into());
572567
let client_encrypted =
573568
Client::encrypted_builder(opts, KV_NAMESPACE.clone(), LOCAL_KMS.clone())?
574569
.extra_options(EXTRA_OPTIONS.clone())
@@ -1627,7 +1622,7 @@ impl DeadlockTestCase {
16271622
let mut encrypted_events = event_handler.subscribe();
16281623
let mut opts = get_client_options().await.clone();
16291624
opts.max_pool_size = Some(self.max_pool_size);
1630-
opts.command_event_handler = Some(event_handler.clone());
1625+
opts.command_event_handler = Some(event_handler.clone().into());
16311626
opts.sdam_event_handler = Some(event_handler.clone());
16321627
let client_encrypted =
16331628
Client::encrypted_builder(opts, KV_NAMESPACE.clone(), LOCAL_KMS.clone())?
@@ -2701,7 +2696,7 @@ impl DecryptionEventsTestdata {
27012696
let ev_handler = DecryptionEventsHandler::new();
27022697
let mut opts = get_client_options().await.clone();
27032698
opts.retry_reads = Some(false);
2704-
opts.command_event_handler = Some(ev_handler.clone());
2699+
opts.command_event_handler = Some(ev_handler.clone().into());
27052700
let encrypted_client =
27062701
Client::encrypted_builder(opts, KV_NAMESPACE.clone(), LOCAL_KMS.clone())?
27072702
.extra_options(EXTRA_OPTIONS.clone())
@@ -2737,7 +2732,8 @@ impl DecryptionEventsHandler {
27372732
}
27382733
}
27392734

2740-
impl CommandEventHandler for DecryptionEventsHandler {
2735+
#[allow(deprecated)]
2736+
impl crate::event::command::CommandEventHandler for DecryptionEventsHandler {
27412737
fn handle_command_succeeded_event(&self, event: CommandSucceededEvent) {
27422738
if event.command_name == "aggregate" {
27432739
*self.succeeded.lock().unwrap() = Some(event);

0 commit comments

Comments
 (0)