Skip to content

Commit cb7fbd5

Browse files
authored
RUST-2125 Run tests using LB URI when present (#1279)
1 parent 60b0727 commit cb7fbd5

File tree

8 files changed

+75
-37
lines changed

8 files changed

+75
-37
lines changed

.evergreen/config.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,6 +1531,8 @@ functions:
15311531
include_expansions_in_env:
15321532
- PROJECT_DIRECTORY
15331533
- OPENSSL
1534+
- SINGLE_MONGOS_LB_URI
1535+
- MULTI_MONGOS_LB_URI
15341536
- MONGODB_URI
15351537
- MONGODB_API_VERSION
15361538
- PATH

src/action/bulk_write.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,7 @@ where
148148
&self.models[total_attempted..],
149149
total_attempted,
150150
self.options.as_ref(),
151-
)
152-
.await;
151+
);
153152
let result = self
154153
.client
155154
.execute_operation::<BulkWriteOperation<R>>(

src/client/executor.rs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl Client {
158158
.execute_operation_with_details(op.borrow_mut(), None)
159159
.await?;
160160
let pinned =
161-
self.pin_connection_for_cursor(&details.output, &mut details.connection)?;
161+
self.pin_connection_for_cursor(&details.output, &mut details.connection, None)?;
162162
Ok(Cursor::new(
163163
self.clone(),
164164
details.output,
@@ -181,8 +181,11 @@ impl Client {
181181
.execute_operation_with_details(op.borrow_mut(), &mut *session)
182182
.await?;
183183

184-
let pinned =
185-
self.pin_connection_for_session(&details.output, &mut details.connection, session)?;
184+
let pinned = self.pin_connection_for_cursor(
185+
&details.output,
186+
&mut details.connection,
187+
Some(session),
188+
)?;
186189
Ok(SessionCursor::new(self.clone(), details.output, pinned))
187190
}
188191

@@ -194,25 +197,16 @@ impl Client {
194197
&self,
195198
spec: &CursorSpecification,
196199
conn: &mut PooledConnection,
200+
session: Option<&mut ClientSession>,
197201
) -> Result<Option<PinnedConnectionHandle>> {
198-
if self.is_load_balanced() && spec.info.id != 0 {
199-
Ok(Some(conn.pin()?))
200-
} else {
201-
Ok(None)
202-
}
203-
}
204-
205-
fn pin_connection_for_session(
206-
&self,
207-
spec: &CursorSpecification,
208-
conn: &mut PooledConnection,
209-
session: &mut ClientSession,
210-
) -> Result<Option<PinnedConnectionHandle>> {
211-
if let Some(handle) = session.transaction.pinned_connection() {
202+
if let Some(handle) = session.and_then(|s| s.transaction.pinned_connection()) {
212203
// Cursor operations on a transaction share the same pinned connection.
213204
Ok(Some(handle.replicate()))
205+
} else if self.is_load_balanced() && spec.info.id != 0 {
206+
// Cursor operations on load balanced topologies always pin connections.
207+
Ok(Some(conn.pin()?))
214208
} else {
215-
self.pin_connection_for_cursor(spec, conn)
209+
Ok(None)
216210
}
217211
}
218212

@@ -245,7 +239,8 @@ impl Client {
245239
details.implicit_session = Some(session);
246240
}
247241
let (cursor_spec, cs_data) = details.output;
248-
let pinned = self.pin_connection_for_cursor(&cursor_spec, &mut details.connection)?;
242+
let pinned =
243+
self.pin_connection_for_cursor(&cursor_spec, &mut details.connection, None)?;
249244
let cursor = Cursor::new(self.clone(), cursor_spec, details.implicit_session, pinned);
250245

251246
Ok(ChangeStream::new(cursor, args, cs_data))
@@ -277,8 +272,11 @@ impl Client {
277272
.execute_operation_with_details(&mut op, &mut *session)
278273
.await?;
279274
let (cursor_spec, cs_data) = details.output;
280-
let pinned =
281-
self.pin_connection_for_session(&cursor_spec, &mut details.connection, session)?;
275+
let pinned = self.pin_connection_for_cursor(
276+
&cursor_spec,
277+
&mut details.connection,
278+
Some(session),
279+
)?;
282280
let cursor = SessionCursor::new(self.clone(), cursor_spec, pinned);
283281

284282
Ok(SessionChangeStream::new(cursor, args, cs_data))
@@ -1063,6 +1061,7 @@ struct ExecutionDetails<T: Operation> {
10631061
implicit_session: Option<ClientSession>,
10641062
}
10651063

1064+
#[derive(Debug)]
10661065
struct ExecutionRetry {
10671066
prior_txn_number: Option<i64>,
10681067
first_error: Error,

src/client/session.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ impl Transaction {
143143
self.recovery_token = None;
144144
}
145145

146+
#[cfg(test)]
147+
pub(crate) fn is_pinned(&self) -> bool {
148+
self.pinned.is_some()
149+
}
150+
146151
pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> {
147152
match &self.pinned {
148153
Some(TransactionPin::Mongos(s)) => Some(s),

src/cmap/conn.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -327,16 +327,26 @@ impl PinnedConnectionHandle {
327327
}
328328
}
329329

330-
/// Retrieve the pinned connection, blocking until it's available for use. Will fail if the
331-
/// connection has been unpinned.
330+
/// Retrieve the pinned connection. Will fail if the connection has been unpinned or is still in
331+
/// use.
332332
pub(crate) async fn take_connection(&self) -> Result<PooledConnection> {
333+
use tokio::sync::mpsc::error::TryRecvError;
333334
let mut receiver = self.receiver.lock().await;
334-
let mut connection = receiver.recv().await.ok_or_else(|| {
335-
Error::internal(format!(
336-
"cannot take connection after unpin (id={})",
337-
self.id
338-
))
339-
})?;
335+
let mut connection = match receiver.try_recv() {
336+
Ok(conn) => conn,
337+
Err(TryRecvError::Disconnected) => {
338+
return Err(Error::internal(format!(
339+
"cannot take connection after unpin (id={})",
340+
self.id
341+
)))
342+
}
343+
Err(TryRecvError::Empty) => {
344+
return Err(Error::internal(format!(
345+
"cannot take in-use connection (id={})",
346+
self.id
347+
)))
348+
}
349+
};
340350
connection.mark_pinned_in_use();
341351
Ok(connection)
342352
}

src/operation/bulk_write.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl<'a, R> BulkWrite<'a, R>
5151
where
5252
R: BulkWriteResult,
5353
{
54-
pub(crate) async fn new(
54+
pub(crate) fn new(
5555
client: Client,
5656
models: &'a [WriteModel],
5757
offset: usize,
@@ -260,7 +260,7 @@ where
260260
fn handle_response_async<'b>(
261261
&'b self,
262262
response: RawCommandResponse,
263-
context: ExecutionContext<'b>,
263+
mut context: ExecutionContext<'b>,
264264
) -> BoxFuture<'b, Result<Self::O>> {
265265
async move {
266266
let response: WriteResponseBody<Response> = response.body()?;
@@ -292,9 +292,12 @@ where
292292
None,
293293
self.options.and_then(|options| options.comment.clone()),
294294
);
295-
let pinned_connection = self
296-
.client
297-
.pin_connection_for_cursor(&specification, context.connection)?;
295+
296+
let pinned_connection = self.client.pin_connection_for_cursor(
297+
&specification,
298+
context.connection,
299+
context.session.as_deref_mut(),
300+
)?;
298301
let iteration_result = match context.session {
299302
Some(session) => {
300303
let mut session_cursor =

src/test/bulk_write.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,11 @@ async fn write_error_batches() {
173173
log_uncaptured("skipping write_error_batches: bulkWrite requires 8.0+");
174174
return;
175175
}
176+
// TODO RUST-2131
177+
if client.is_load_balanced() {
178+
log_uncaptured("skipping write_error_batches: load-balanced topology");
179+
return;
180+
}
176181

177182
let max_write_batch_size = client.server_info.max_write_batch_size.unwrap() as usize;
178183

@@ -230,6 +235,11 @@ async fn successful_cursor_iteration() {
230235
log_uncaptured("skipping successful_cursor_iteration: bulkWrite requires 8.0+");
231236
return;
232237
}
238+
// TODO RUST-2131
239+
if client.is_load_balanced() {
240+
log_uncaptured("skipping successful_cursor_iteration: load-balanced topology");
241+
return;
242+
}
233243

234244
let max_bson_object_size = client.server_info.max_bson_object_size as usize;
235245

@@ -271,6 +281,11 @@ async fn cursor_iteration_in_a_transaction() {
271281
);
272282
return;
273283
}
284+
// TODO RUST-2131
285+
if client.is_load_balanced() {
286+
log_uncaptured("skipping cursor_iteration_in_a_transaction: load-balanced topology");
287+
return;
288+
}
274289

275290
let max_bson_object_size = client.server_info.max_bson_object_size as usize;
276291

@@ -321,6 +336,11 @@ async fn failed_cursor_iteration() {
321336
log_uncaptured("skipping failed_cursor_iteration: bulkWrite requires 8.0+");
322337
return;
323338
}
339+
// TODO RUST-2131
340+
if client.is_load_balanced() {
341+
log_uncaptured("skipping failed_cursor_iteration: load-balanced topology");
342+
return;
343+
}
324344

325345
let max_bson_object_size = client.server_info.max_bson_object_size as usize;
326346

src/test/spec/unified_runner/operation/session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl TestOperation for AssertSessionPinned {
7878
async move {
7979
let is_pinned =
8080
with_mut_session!(test_runner, self.session.as_str(), |session| async {
81-
session.transaction.pinned_mongos().is_some()
81+
session.transaction.is_pinned()
8282
})
8383
.await;
8484
assert!(is_pinned);

0 commit comments

Comments
 (0)