|
44 | 44 | import java.util.List;
|
45 | 45 | import java.util.concurrent.atomic.AtomicBoolean;
|
46 | 46 | import java.util.concurrent.atomic.AtomicInteger;
|
47 |
| -import java.util.concurrent.atomic.AtomicReference; |
48 | 47 | import java.util.concurrent.locks.Lock;
|
49 | 48 | import java.util.concurrent.locks.ReentrantLock;
|
50 | 49 | import java.util.function.Consumer;
|
@@ -78,10 +77,10 @@ class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T>
|
78 | 77 |
|
79 | 78 | private final AtomicInteger count = new AtomicInteger();
|
80 | 79 | private final AtomicBoolean processedInitial = new AtomicBoolean();
|
81 |
| - private final AtomicReference<CommandCursorResult<T>> commandCursorResult = new AtomicReference<>(); |
82 | 80 | private final int maxWireVersion;
|
83 | 81 | private final boolean firstBatchEmpty;
|
84 | 82 | private final ResourceManager resourceManager;
|
| 83 | + private volatile CommandCursorResult<T> commandCursorResult; |
85 | 84 | private int batchSize;
|
86 | 85 |
|
87 | 86 | AsyncCommandBatchCursor(
|
@@ -131,7 +130,7 @@ public void next(final SingleResultCallback<List<T>> callback) {
|
131 | 130 | boolean cursorClosed = localServerCursor == null;
|
132 | 131 | List<T> batchResults = emptyList();
|
133 | 132 | if (!processedInitial.getAndSet(true) && !firstBatchEmpty) {
|
134 |
| - batchResults = commandCursorResult.get().getResults(); |
| 133 | + batchResults = commandCursorResult.getResults(); |
135 | 134 | }
|
136 | 135 |
|
137 | 136 | if (cursorClosed || !batchResults.isEmpty()) {
|
@@ -167,12 +166,12 @@ public void close() {
|
167 | 166 |
|
168 | 167 | @Override
|
169 | 168 | public BsonDocument getPostBatchResumeToken() {
|
170 |
| - return commandCursorResult.get().getPostBatchResumeToken(); |
| 169 | + return commandCursorResult.getPostBatchResumeToken(); |
171 | 170 | }
|
172 | 171 |
|
173 | 172 | @Override
|
174 | 173 | public BsonTimestamp getOperationTime() {
|
175 |
| - return commandCursorResult.get().getOperationTime(); |
| 174 | + return commandCursorResult.getOperationTime(); |
176 | 175 | }
|
177 | 176 |
|
178 | 177 | @Override
|
@@ -249,7 +248,7 @@ private CommandCursorResult<T> initFromCommandCursorDocument(
|
249 | 248 | LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", cursorResult.getResults().size(),
|
250 | 249 | cursorResult.getCursorId(), cursorResult.getServerAddress()));
|
251 | 250 | }
|
252 |
| - this.commandCursorResult.set(cursorResult); |
| 251 | + this.commandCursorResult = cursorResult; |
253 | 252 | return cursorResult;
|
254 | 253 | }
|
255 | 254 |
|
|
0 commit comments