Skip to content

RUST-2125 Run tests using LB URI when present #1279

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1510,6 +1510,8 @@ functions:
include_expansions_in_env:
- PROJECT_DIRECTORY
- OPENSSL
- SINGLE_MONGOS_LB_URI
- MULTI_MONGOS_LB_URI
- MONGODB_URI
- MONGODB_API_VERSION
- PATH
Expand Down
3 changes: 1 addition & 2 deletions src/action/bulk_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ where
&self.models[total_attempted..],
total_attempted,
self.options.as_ref(),
)
.await;
);
let result = self
.client
.execute_operation::<BulkWriteOperation<R>>(
Expand Down
41 changes: 20 additions & 21 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl Client {
.execute_operation_with_details(op.borrow_mut(), None)
.await?;
let pinned =
self.pin_connection_for_cursor(&details.output, &mut details.connection)?;
self.pin_connection_for_cursor(&details.output, &mut details.connection, None)?;
Ok(Cursor::new(
self.clone(),
details.output,
Expand All @@ -181,8 +181,11 @@ impl Client {
.execute_operation_with_details(op.borrow_mut(), &mut *session)
.await?;

let pinned =
self.pin_connection_for_session(&details.output, &mut details.connection, session)?;
let pinned = self.pin_connection_for_cursor(
&details.output,
&mut details.connection,
Some(session),
)?;
Ok(SessionCursor::new(self.clone(), details.output, pinned))
}

Expand All @@ -194,25 +197,16 @@ impl Client {
&self,
spec: &CursorSpecification,
conn: &mut PooledConnection,
session: Option<&mut ClientSession>,
) -> Result<Option<PinnedConnectionHandle>> {
if self.is_load_balanced() && spec.info.id != 0 {
Ok(Some(conn.pin()?))
} else {
Ok(None)
}
}

fn pin_connection_for_session(
&self,
spec: &CursorSpecification,
conn: &mut PooledConnection,
session: &mut ClientSession,
) -> Result<Option<PinnedConnectionHandle>> {
if let Some(handle) = session.transaction.pinned_connection() {
if let Some(handle) = session.and_then(|s| s.transaction.pinned_connection()) {
// Cursor operations on a transaction share the same pinned connection.
Ok(Some(handle.replicate()))
} else if self.is_load_balanced() && spec.info.id != 0 {
// Cursor operations on load balanced topologies always pin connections.
Ok(Some(conn.pin()?))
} else {
self.pin_connection_for_cursor(spec, conn)
Ok(None)
}
}

Expand Down Expand Up @@ -245,7 +239,8 @@ impl Client {
details.implicit_session = Some(session);
}
let (cursor_spec, cs_data) = details.output;
let pinned = self.pin_connection_for_cursor(&cursor_spec, &mut details.connection)?;
let pinned =
self.pin_connection_for_cursor(&cursor_spec, &mut details.connection, None)?;
let cursor = Cursor::new(self.clone(), cursor_spec, details.implicit_session, pinned);

Ok(ChangeStream::new(cursor, args, cs_data))
Expand Down Expand Up @@ -277,8 +272,11 @@ impl Client {
.execute_operation_with_details(&mut op, &mut *session)
.await?;
let (cursor_spec, cs_data) = details.output;
let pinned =
self.pin_connection_for_session(&cursor_spec, &mut details.connection, session)?;
let pinned = self.pin_connection_for_cursor(
&cursor_spec,
&mut details.connection,
Some(session),
)?;
let cursor = SessionCursor::new(self.clone(), cursor_spec, pinned);

Ok(SessionChangeStream::new(cursor, args, cs_data))
Expand Down Expand Up @@ -1063,6 +1061,7 @@ struct ExecutionDetails<T: Operation> {
implicit_session: Option<ClientSession>,
}

#[derive(Debug)]
struct ExecutionRetry {
prior_txn_number: Option<i64>,
first_error: Error,
Expand Down
5 changes: 5 additions & 0 deletions src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ impl Transaction {
self.recovery_token = None;
}

#[cfg(test)]
pub(crate) fn is_pinned(&self) -> bool {
self.pinned.is_some()
}

pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> {
match &self.pinned {
Some(TransactionPin::Mongos(s)) => Some(s),
Expand Down
26 changes: 18 additions & 8 deletions src/cmap/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,26 @@ impl PinnedConnectionHandle {
}
}

/// Retrieve the pinned connection, blocking until it's available for use. Will fail if the
/// connection has been unpinned.
/// Retrieve the pinned connection. Will fail if the connection has been unpinned or is still in
/// use.
pub(crate) async fn take_connection(&self) -> Result<PooledConnection> {
use tokio::sync::mpsc::error::TryRecvError;
let mut receiver = self.receiver.lock().await;
let mut connection = receiver.recv().await.ok_or_else(|| {
Error::internal(format!(
"cannot take connection after unpin (id={})",
self.id
))
})?;
let mut connection = match receiver.try_recv() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using try_recv rather than recv means the re-entrant case becomes an error rather than a deadlock, which seems much better behavior to me. The hypothetical downside is that it could fail in a race condition, but since any individual pinned connection is used in a strictly linear sequence that doesn't apply here.

Ok(conn) => conn,
Err(TryRecvError::Disconnected) => {
return Err(Error::internal(format!(
"cannot take connection after unpin (id={})",
self.id
)))
}
Err(TryRecvError::Empty) => {
return Err(Error::internal(format!(
"cannot take in-use connection (id={})",
self.id
)))
}
};
connection.mark_pinned_in_use();
Ok(connection)
}
Expand Down
13 changes: 8 additions & 5 deletions src/operation/bulk_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<'a, R> BulkWrite<'a, R>
where
R: BulkWriteResult,
{
pub(crate) async fn new(
pub(crate) fn new(
client: Client,
models: &'a [WriteModel],
offset: usize,
Expand Down Expand Up @@ -260,7 +260,7 @@ where
fn handle_response_async<'b>(
&'b self,
response: RawCommandResponse,
context: ExecutionContext<'b>,
mut context: ExecutionContext<'b>,
) -> BoxFuture<'b, Result<Self::O>> {
async move {
let response: WriteResponseBody<Response> = response.body()?;
Expand Down Expand Up @@ -292,9 +292,12 @@ where
None,
self.options.and_then(|options| options.comment.clone()),
);
let pinned_connection = self
.client
.pin_connection_for_cursor(&specification, context.connection)?;

let pinned_connection = self.client.pin_connection_for_cursor(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a distinct bug from RUST-2131, where the original code should have been conditionally using pin_connection_for_session when a session was present; not doing so caused tests of bulk write on load balancer running inside a transaction to fail with an internal "already pinned" error. I merged the two pin_connection_for_* methods to prevent this kind of easy mistake in the future and make it easier to reason about exactly when a connection for a cursor gets pinned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! combining the pinning methods makes sense

&specification,
context.connection,
context.session.as_deref_mut(),
)?;
let iteration_result = match context.session {
Some(session) => {
let mut session_cursor =
Expand Down
20 changes: 20 additions & 0 deletions src/test/bulk_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ async fn write_error_batches() {
log_uncaptured("skipping write_error_batches: bulkWrite requires 8.0+");
return;
}
// TODO RUST-2131
if client.is_load_balanced() {
log_uncaptured("skipping write_error_batches: load-balanced topology");
return;
}

let max_write_batch_size = client.server_info.max_write_batch_size.unwrap() as usize;

Expand Down Expand Up @@ -230,6 +235,11 @@ async fn successful_cursor_iteration() {
log_uncaptured("skipping successful_cursor_iteration: bulkWrite requires 8.0+");
return;
}
// TODO RUST-2131
if client.is_load_balanced() {
log_uncaptured("skipping successful_cursor_iteration: load-balanced topology");
return;
}

let max_bson_object_size = client.server_info.max_bson_object_size as usize;

Expand Down Expand Up @@ -271,6 +281,11 @@ async fn cursor_iteration_in_a_transaction() {
);
return;
}
// TODO RUST-2131
if client.is_load_balanced() {
log_uncaptured("skipping cursor_iteration_in_a_transaction: load-balanced topology");
return;
}

let max_bson_object_size = client.server_info.max_bson_object_size as usize;

Expand Down Expand Up @@ -321,6 +336,11 @@ async fn failed_cursor_iteration() {
log_uncaptured("skipping failed_cursor_iteration: bulkWrite requires 8.0+");
return;
}
// TODO RUST-2131
if client.is_load_balanced() {
log_uncaptured("skipping failed_cursor_iteration: load-balanced topology");
return;
}

let max_bson_object_size = client.server_info.max_bson_object_size as usize;

Expand Down
2 changes: 1 addition & 1 deletion src/test/spec/unified_runner/operation/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl TestOperation for AssertSessionPinned {
async move {
let is_pinned =
with_mut_session!(test_runner, self.session.as_str(), |session| async {
session.transaction.pinned_mongos().is_some()
session.transaction.is_pinned()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a distinct bug, just missing that there are now more cases than pinned_mongos :)

})
.await;
assert!(is_pinned);
Expand Down