Skip to content

Commit ade133c

Browse files
committed
Fix race condition
ConnectionSource and Connection also need to be retained pre kill cursors and released in the kill cursors callback
1 parent 5ff0c8f commit ade133c

File tree

5 files changed

+82
-43
lines changed

5 files changed

+82
-43
lines changed

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ <R> void executeWithConnection(
296296
if (error instanceof MongoSocketException) {
297297
onCorruptedConnection(connection);
298298
}
299+
connection.release();
299300
callback.onResult(result, error);
300301
});
301302
}
@@ -308,9 +309,9 @@ void doClose() {
308309
}
309310

310311
if (getServerCursor() != null) {
311-
executeWithConnection(conn -> {
312-
releaseServerResources(conn);
313-
conn.release();
312+
executeWithConnection(c -> {
313+
releaseServerResources(c);
314+
c.release();
314315
}, (r, t) -> {
315316
// guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
316317
unsetServerCursor();
@@ -323,14 +324,16 @@ void doClose() {
323324

324325
@Override
325326
void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final AsyncConnection connection) {
326-
connection
327-
.retain()
328-
.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, serverCursor),
329-
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(),
330-
assertNotNull(getConnectionSource()), (r, t) -> {
331-
connection.release();
332-
releaseClientResources();
333-
});
327+
connection.retain();
328+
AsyncConnectionSource connectionSource = assertNotNull(getConnectionSource()).retain();
329+
connection.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, serverCursor),
330+
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(),
331+
connectionSource, (r, t) -> {
332+
connectionSource.release();
333+
connection.release();
334+
releaseClientResources();
335+
});
334336
}
335337
}
338+
336339
}

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,8 @@ void doClose() {
374374
}
375375

376376
@Override
377-
void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final Connection connection) {
378-
connection.command(namespace.getDatabaseName(), getKillCursorsCommand(namespace, serverCursor),
377+
void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor, final Connection localConnection) {
378+
localConnection.command(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
379379
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(), assertNotNull(getConnectionSource()));
380380
releaseClientResources();
381381
}

driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.locks.Lock;
3232
import java.util.concurrent.locks.StampedLock;
3333
import java.util.function.Consumer;
34+
import java.util.function.Supplier;
3435

3536
import static com.mongodb.assertions.Assertions.assertNotNull;
3637
import static com.mongodb.assertions.Assertions.assertNull;
@@ -139,8 +140,7 @@ boolean operable() {
139140
* @throws IllegalStateException Iff another operation is in progress.
140141
*/
141142
boolean tryStartOperation() throws IllegalStateException {
142-
lock.lock();
143-
try {
143+
return withLock(() -> {
144144
State localState = state;
145145
if (!localState.operable()) {
146146
return false;
@@ -152,30 +152,26 @@ boolean tryStartOperation() throws IllegalStateException {
152152
} else {
153153
throw fail(state.toString());
154154
}
155-
} finally {
156-
lock.unlock();
157-
}
155+
});
158156
}
159157

160158
/**
161159
* Thread-safe.
162160
*/
163161
void endOperation() {
164-
boolean doClose = false;
165-
lock.lock();
166-
try {
162+
boolean doClose = withLock(() -> {
167163
State localState = state;
168164
if (localState == State.OPERATION_IN_PROGRESS) {
169165
state = State.IDLE;
170166
} else if (localState == State.CLOSE_PENDING) {
171167
state = State.CLOSED;
172-
doClose = true;
173-
} else {
168+
return true;
169+
} else if (localState != State.CLOSED) {
174170
fail(localState.toString());
175171
}
176-
} finally {
177-
lock.unlock();
178-
}
172+
return false;
173+
});
174+
179175
if (doClose) {
180176
doClose();
181177
}
@@ -185,19 +181,17 @@ void endOperation() {
185181
* Thread-safe.
186182
*/
187183
void close() {
188-
boolean doClose = false;
189-
lock.lock();
190-
try {
184+
boolean doClose = withLock(() -> {
191185
State localState = state;
192186
if (localState == State.OPERATION_IN_PROGRESS) {
193187
state = State.CLOSE_PENDING;
194188
} else if (localState != State.CLOSED) {
195189
state = State.CLOSED;
196-
doClose = true;
190+
return true;
197191
}
198-
} finally {
199-
lock.unlock();
200-
}
192+
return false;
193+
});
194+
201195
if (doClose) {
202196
doClose();
203197
}
@@ -221,7 +215,9 @@ ServerCursor getServerCursor() {
221215
}
222216

223217
void unsetServerCursor() {
224-
this.serverCursor = null;
218+
withLock(() -> {
219+
this.serverCursor = null;
220+
});
225221
}
226222

227223
void setServerCursor(@Nullable final ServerCursor serverCursor) {
@@ -236,10 +232,11 @@ void setServerCursor(@Nullable final ServerCursor serverCursor) {
236232
}
237233

238234
void releaseServerAndClientResources(final C connection) {
239-
lock.lock();
240-
ServerCursor localServerCursor = serverCursor;
241-
serverCursor = null;
242-
lock.unlock();
235+
ServerCursor localServerCursor = withLockNullable(() -> {
236+
ServerCursor local = serverCursor;
237+
serverCursor = null;
238+
return local;
239+
});
243240
if (localServerCursor != null) {
244241
killServerCursor(namespace, localServerCursor, connection);
245242
} else {
@@ -274,6 +271,34 @@ void releaseClientResources() {
274271
}
275272
}
276273

274+
private void withLock(final Runnable runnable) {
275+
try {
276+
lock.lock();
277+
runnable.run();
278+
} finally {
279+
lock.unlock();
280+
}
281+
}
282+
283+
@Nullable
284+
private <T> T withLockNullable(final Supplier<T> supplier) {
285+
try {
286+
lock.lock();
287+
return supplier.get();
288+
} finally {
289+
lock.unlock();
290+
}
291+
}
292+
293+
private <T> T withLock(final Supplier<T> supplier) {
294+
try {
295+
lock.lock();
296+
return supplier.get();
297+
} finally {
298+
lock.unlock();
299+
}
300+
}
301+
277302
enum State {
278303
IDLE(true, false),
279304
OPERATION_IN_PROGRESS(true, true),

driver-core/src/test/functional/com/mongodb/internal/connection/ServerHelper.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,20 @@ public static void waitForLastRelease(final ServerAddress address, final Cluster
6464
private static void checkPool(final ServerAddress address, final Cluster cluster) {
6565
ConcurrentPool<UsageTrackingInternalConnection> pool = connectionPool(
6666
cluster.selectServer(new ServerAddressSelector(address), new OperationContext()).getServer());
67-
if (pool.getInUseCount() > 0) {
68-
throw new IllegalStateException("Connection pool in use count is " + pool.getInUseCount());
67+
68+
int counter = 0;
69+
while (counter < 5) {
70+
if (pool.getInUseCount() == 0) {
71+
return;
72+
}
73+
try {
74+
Thread.sleep(500);
75+
} catch (InterruptedException e) {
76+
throw new RuntimeException(e);
77+
}
78+
counter++;
6979
}
80+
throw new IllegalStateException("Connection pool in use count is " + pool.getInUseCount());
7081
}
7182

7283
private static ConcurrentPool<UsageTrackingInternalConnection> connectionPool(final Server server) {

driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,14 @@ void theServerCursorShouldNotBeNull() {
116116
@Test
117117
@DisplayName("should get Exceptions for operations on the cursor after closing")
118118
void shouldGetExceptionsForOperationsOnTheCursorAfterClosing() {
119-
BsonDocument commandResult = executeFindCommand();
119+
BsonDocument commandResult = executeFindCommand(5);
120120
cursor = new AsyncCommandBatchCursor<>(getServerAddress(), commandResult, 0, 0, 0, DOCUMENT_DECODER,
121121
null, connectionSource, connection);
122122

123123
cursor.close();
124-
125124
assertDoesNotThrow(() -> cursor.close());
125+
126+
checkReferenceCountReachesTarget(connectionSource, 1);
126127
assertThrows(MongoException.class, this::cursorNext);
127128
assertNull(cursor.getServerCursor());
128129
}
@@ -170,7 +171,6 @@ void testLimitExhaustion(final int limit, final int batchSize, final int expecte
170171
@MethodSource
171172
@DisplayName("should block waiting for next batch on a tailable cursor")
172173
void shouldBlockWaitingForNextBatchOnATailableCursor(final boolean awaitData, final int maxTimeMS) {
173-
174174
getCollectionHelper().create(getCollectionName(), new CreateCollectionOptions().capped(true).sizeInBytes(1000));
175175
getCollectionHelper().insertDocuments(DOCUMENT_DECODER, new Document("_id", 1).append("ts", new BsonTimestamp(5, 0)));
176176

0 commit comments

Comments
 (0)