diff --git a/.evergreen/config.yml b/.evergreen/config.yml index 352e674f9..1ef3b821e 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -1510,6 +1510,8 @@ functions: include_expansions_in_env: - PROJECT_DIRECTORY - OPENSSL + - SINGLE_MONGOS_LB_URI + - MULTI_MONGOS_LB_URI - MONGODB_URI - MONGODB_API_VERSION - PATH diff --git a/src/action/bulk_write.rs b/src/action/bulk_write.rs index 326cf61b8..49ed5a665 100644 --- a/src/action/bulk_write.rs +++ b/src/action/bulk_write.rs @@ -148,8 +148,7 @@ where &self.models[total_attempted..], total_attempted, self.options.as_ref(), - ) - .await; + ); let result = self .client .execute_operation::>( diff --git a/src/client/executor.rs b/src/client/executor.rs index cd91769a1..937d78ba9 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -158,7 +158,7 @@ impl Client { .execute_operation_with_details(op.borrow_mut(), None) .await?; let pinned = - self.pin_connection_for_cursor(&details.output, &mut details.connection)?; + self.pin_connection_for_cursor(&details.output, &mut details.connection, None)?; Ok(Cursor::new( self.clone(), details.output, @@ -181,8 +181,11 @@ impl Client { .execute_operation_with_details(op.borrow_mut(), &mut *session) .await?; - let pinned = - self.pin_connection_for_session(&details.output, &mut details.connection, session)?; + let pinned = self.pin_connection_for_cursor( + &details.output, + &mut details.connection, + Some(session), + )?; Ok(SessionCursor::new(self.clone(), details.output, pinned)) } @@ -194,25 +197,16 @@ impl Client { &self, spec: &CursorSpecification, conn: &mut PooledConnection, + session: Option<&mut ClientSession>, ) -> Result> { - if self.is_load_balanced() && spec.info.id != 0 { - Ok(Some(conn.pin()?)) - } else { - Ok(None) - } - } - - fn pin_connection_for_session( - &self, - spec: &CursorSpecification, - conn: &mut PooledConnection, - session: &mut ClientSession, - ) -> Result> { - if let Some(handle) = session.transaction.pinned_connection() { + if let Some(handle) = session.and_then(|s| s.transaction.pinned_connection()) { // Cursor operations on a transaction share the same pinned connection. Ok(Some(handle.replicate())) + } else if self.is_load_balanced() && spec.info.id != 0 { + // Cursor operations on load balanced topologies always pin connections. + Ok(Some(conn.pin()?)) } else { - self.pin_connection_for_cursor(spec, conn) + Ok(None) } } @@ -245,7 +239,8 @@ impl Client { details.implicit_session = Some(session); } let (cursor_spec, cs_data) = details.output; - let pinned = self.pin_connection_for_cursor(&cursor_spec, &mut details.connection)?; + let pinned = + self.pin_connection_for_cursor(&cursor_spec, &mut details.connection, None)?; let cursor = Cursor::new(self.clone(), cursor_spec, details.implicit_session, pinned); Ok(ChangeStream::new(cursor, args, cs_data)) @@ -277,8 +272,11 @@ impl Client { .execute_operation_with_details(&mut op, &mut *session) .await?; let (cursor_spec, cs_data) = details.output; - let pinned = - self.pin_connection_for_session(&cursor_spec, &mut details.connection, session)?; + let pinned = self.pin_connection_for_cursor( + &cursor_spec, + &mut details.connection, + Some(session), + )?; let cursor = SessionCursor::new(self.clone(), cursor_spec, pinned); Ok(SessionChangeStream::new(cursor, args, cs_data)) @@ -1063,6 +1061,7 @@ struct ExecutionDetails { implicit_session: Option, } +#[derive(Debug)] struct ExecutionRetry { prior_txn_number: Option, first_error: Error, diff --git a/src/client/session.rs b/src/client/session.rs index 9dc3a3daf..071bceb5d 100644 --- a/src/client/session.rs +++ b/src/client/session.rs @@ -143,6 +143,11 @@ impl Transaction { self.recovery_token = None; } + #[cfg(test)] + pub(crate) fn is_pinned(&self) -> bool { + self.pinned.is_some() + } + pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> { match &self.pinned { Some(TransactionPin::Mongos(s)) => Some(s), diff --git a/src/cmap/conn.rs b/src/cmap/conn.rs index f1ddc5910..681824471 100644 --- a/src/cmap/conn.rs +++ b/src/cmap/conn.rs @@ -327,16 +327,26 @@ impl PinnedConnectionHandle { } } - /// Retrieve the pinned connection, blocking until it's available for use. Will fail if the - /// connection has been unpinned. + /// Retrieve the pinned connection. Will fail if the connection has been unpinned or is still in + /// use. pub(crate) async fn take_connection(&self) -> Result { + use tokio::sync::mpsc::error::TryRecvError; let mut receiver = self.receiver.lock().await; - let mut connection = receiver.recv().await.ok_or_else(|| { - Error::internal(format!( - "cannot take connection after unpin (id={})", - self.id - )) - })?; + let mut connection = match receiver.try_recv() { + Ok(conn) => conn, + Err(TryRecvError::Disconnected) => { + return Err(Error::internal(format!( + "cannot take connection after unpin (id={})", + self.id + ))) + } + Err(TryRecvError::Empty) => { + return Err(Error::internal(format!( + "cannot take in-use connection (id={})", + self.id + ))) + } + }; connection.mark_pinned_in_use(); Ok(connection) } diff --git a/src/operation/bulk_write.rs b/src/operation/bulk_write.rs index 67459c9cb..dee2ad412 100644 --- a/src/operation/bulk_write.rs +++ b/src/operation/bulk_write.rs @@ -51,7 +51,7 @@ impl<'a, R> BulkWrite<'a, R> where R: BulkWriteResult, { - pub(crate) async fn new( + pub(crate) fn new( client: Client, models: &'a [WriteModel], offset: usize, @@ -260,7 +260,7 @@ where fn handle_response_async<'b>( &'b self, response: RawCommandResponse, - context: ExecutionContext<'b>, + mut context: ExecutionContext<'b>, ) -> BoxFuture<'b, Result> { async move { let response: WriteResponseBody = response.body()?; @@ -292,9 +292,12 @@ where None, self.options.and_then(|options| options.comment.clone()), ); - let pinned_connection = self - .client - .pin_connection_for_cursor(&specification, context.connection)?; + + let pinned_connection = self.client.pin_connection_for_cursor( + &specification, + context.connection, + context.session.as_deref_mut(), + )?; let iteration_result = match context.session { Some(session) => { let mut session_cursor = diff --git a/src/test/bulk_write.rs b/src/test/bulk_write.rs index 6ebe62a0b..802e20c72 100644 --- a/src/test/bulk_write.rs +++ b/src/test/bulk_write.rs @@ -173,6 +173,11 @@ async fn write_error_batches() { log_uncaptured("skipping write_error_batches: bulkWrite requires 8.0+"); return; } + // TODO RUST-2131 + if client.is_load_balanced() { + log_uncaptured("skipping write_error_batches: load-balanced topology"); + return; + } let max_write_batch_size = client.server_info.max_write_batch_size.unwrap() as usize; @@ -230,6 +235,11 @@ async fn successful_cursor_iteration() { log_uncaptured("skipping successful_cursor_iteration: bulkWrite requires 8.0+"); return; } + // TODO RUST-2131 + if client.is_load_balanced() { + log_uncaptured("skipping successful_cursor_iteration: load-balanced topology"); + return; + } let max_bson_object_size = client.server_info.max_bson_object_size as usize; @@ -271,6 +281,11 @@ async fn cursor_iteration_in_a_transaction() { ); return; } + // TODO RUST-2131 + if client.is_load_balanced() { + log_uncaptured("skipping cursor_iteration_in_a_transaction: load-balanced topology"); + return; + } let max_bson_object_size = client.server_info.max_bson_object_size as usize; @@ -321,6 +336,11 @@ async fn failed_cursor_iteration() { log_uncaptured("skipping failed_cursor_iteration: bulkWrite requires 8.0+"); return; } + // TODO RUST-2131 + if client.is_load_balanced() { + log_uncaptured("skipping failed_cursor_iteration: load-balanced topology"); + return; + } let max_bson_object_size = client.server_info.max_bson_object_size as usize; diff --git a/src/test/spec/unified_runner/operation/session.rs b/src/test/spec/unified_runner/operation/session.rs index a82a82aae..0eb18da4b 100644 --- a/src/test/spec/unified_runner/operation/session.rs +++ b/src/test/spec/unified_runner/operation/session.rs @@ -78,7 +78,7 @@ impl TestOperation for AssertSessionPinned { async move { let is_pinned = with_mut_session!(test_runner, self.session.as_str(), |session| async { - session.transaction.pinned_mongos().is_some() + session.transaction.is_pinned() }) .await; assert!(is_pinned);