diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index d3cd2eab867..c1593851d2d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -546,7 +546,7 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d } private T getCommandResult(final Decoder decoder, final ResponseBuffers responseBuffers, final int messageId) { - T result = new ReplyMessage<>(responseBuffers, decoder, messageId).getDocuments().get(0); + T result = new ReplyMessage<>(responseBuffers, decoder, messageId).getDocument(); MongoException writeConcernBasedError = createSpecialWriteConcernException(responseBuffers, description.getServerAddress()); if (writeConcernBasedError != null) { throw new MongoWriteConcernWithResponseException(writeConcernBasedError, result); diff --git a/driver-core/src/main/com/mongodb/internal/connection/ReplyHeader.java b/driver-core/src/main/com/mongodb/internal/connection/ReplyHeader.java index 2588db6e312..f1b723778a7 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ReplyHeader.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ReplyHeader.java @@ -40,17 +40,10 @@ public final class ReplyHeader { */ public static final int TOTAL_REPLY_HEADER_LENGTH = REPLY_HEADER_LENGTH + MESSAGE_HEADER_LENGTH; - private static final int CURSOR_NOT_FOUND_RESPONSE_FLAG = 1; - private static final int QUERY_FAILURE_RESPONSE_FLAG = 2; - private final int messageLength; private final int requestId; private final int responseTo; - private final int responseFlags; - private final long cursorId; - private final int startingFrom; - private final int numberReturned; - private final int opMsgFlagBits; + private final boolean hasMoreToCome; ReplyHeader(final ByteBuf header, final MessageHeader messageHeader) { this(messageHeader.getMessageLength(), messageHeader.getOpCode(), messageHeader, header); @@ -66,27 +59,23 @@ private ReplyHeader(final int messageLength, final int opCode, final MessageHead this.requestId = messageHeader.getRequestId(); this.responseTo = messageHeader.getResponseTo(); if (opCode == OP_MSG.getValue()) { - responseFlags = 0; - cursorId = 0; - startingFrom = 0; - numberReturned = 1; - - opMsgFlagBits = header.getInt(); - header.get(); // ignore payload type + int flagBits = header.getInt(); + hasMoreToCome = (flagBits & (1 << 1)) != 0; + header.get(); // ignored payload type } else if (opCode == OP_REPLY.getValue()) { if (messageLength < TOTAL_REPLY_HEADER_LENGTH) { - throw new MongoInternalException(format("The reply message length %d is less than the mimimum message length %d", + throw new MongoInternalException(format("The reply message length %d is less than the minimum message length %d", messageLength, TOTAL_REPLY_HEADER_LENGTH)); } + hasMoreToCome = false; - responseFlags = header.getInt(); - cursorId = header.getLong(); - startingFrom = header.getInt(); - numberReturned = header.getInt(); - opMsgFlagBits = 0; + header.getInt(); // ignored responseFlags + header.getLong(); // ignored cursorId + header.getInt(); // ignored startingFrom + int numberReturned = header.getInt(); - if (numberReturned < 0) { - throw new MongoInternalException(format("The reply message number of returned documents, %d, is less than 0", + if (numberReturned != 1) { + throw new MongoInternalException(format("The reply message number of returned documents, %d, is expected to be 1", numberReturned)); } } else { @@ -123,78 +112,7 @@ public int getResponseTo() { return responseTo; } - /** - * Gets additional information about the response. - *
    - *
  • 0 - CursorNotFound: Set when getMore is called but the cursor id is not valid at the server. Returned with zero - * results.
  • - *
  • 1 - QueryFailure: Set when query failed. Results consist of one document containing an "$err" field describing the - * failure. - *
  • 2 - ShardConfigStale: Drivers should ignore this. Only mongos will ever see this set, in which case, - * it needs to update config from the server. - *
  • 3 - AwaitCapable: Set when the server supports the AwaitData Query option. If it doesn't, - * a client should sleep a little between getMore's of a Tailable cursor. Mongod version 1.6 supports AwaitData and thus always - * sets AwaitCapable. - *
  • 4-31 - Reserved: Ignore - *
- * - * @return bit vector - see details above - */ - public int getResponseFlags() { - return responseFlags; - } - - /** - * Gets the cursor ID that this response is a part of. If there are no more documents to fetch from the server, the cursor ID will be 0. - * This cursor ID must be used in any messages used to get more data, and also must be closed by the client when no longer needed. - * - * @return cursor ID to use if the client needs to fetch more from the server - */ - public long getCursorId() { - return cursorId; - } - - /** - * Returns the position in the cursor that is the start point of this reply. - * - * @return where in the cursor this reply is starting - */ - public int getStartingFrom() { - return startingFrom; - } - - /** - * Gets the number of documents to expect in the body of this reply. - * - * @return number of documents in the reply - */ - public int getNumberReturned() { - return numberReturned; - } - - /** - * Gets whether this query was performed with a cursor ID that was not valid on the server. - * - * @return true if this reply indicates the request to get more data was performed with a cursor ID that's not valid on the server - */ - public boolean isCursorNotFound() { - return (responseFlags & CURSOR_NOT_FOUND_RESPONSE_FLAG) == CURSOR_NOT_FOUND_RESPONSE_FLAG; - } - - /** - * Gets whether the query failed or not. - * - * @return true if this reply indicates the query failed. - */ - public boolean isQueryFailure() { - return (responseFlags & QUERY_FAILURE_RESPONSE_FLAG) == QUERY_FAILURE_RESPONSE_FLAG; - } - - public int getOpMsgFlagBits() { - return opMsgFlagBits; - } - public boolean hasMoreToCome() { - return (opMsgFlagBits & (1 << 1)) != 0; + return hasMoreToCome; } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ReplyMessage.java b/driver-core/src/main/com/mongodb/internal/connection/ReplyMessage.java index 624b909a76a..68af818281e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ReplyMessage.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ReplyMessage.java @@ -23,9 +23,6 @@ import org.bson.io.BsonInput; import org.bson.io.ByteBufferBsonInput; -import java.util.ArrayList; -import java.util.List; - import static java.lang.String.format; /** @@ -35,50 +32,24 @@ */ public class ReplyMessage { - private final ReplyHeader replyHeader; - private final List documents; + private final T document; public ReplyMessage(final ResponseBuffers responseBuffers, final Decoder decoder, final long requestId) { - this(responseBuffers.getReplyHeader(), requestId); - - if (replyHeader.getNumberReturned() > 0) { - try (BsonInput bsonInput = new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer().duplicate())) { - while (documents.size() < replyHeader.getNumberReturned()) { - try (BsonBinaryReader reader = new BsonBinaryReader(bsonInput)) { - documents.add(decoder.decode(reader, DecoderContext.builder().build())); - } - } - } finally { - responseBuffers.reset(); - } - } - } - - ReplyMessage(final ReplyHeader replyHeader, final long requestId) { - if (requestId != replyHeader.getResponseTo()) { + if (requestId != responseBuffers.getReplyHeader().getResponseTo()) { throw new MongoInternalException(format("The responseTo (%d) in the response does not match the requestId (%d) in the " - + "request", replyHeader.getResponseTo(), requestId)); + + "request", responseBuffers.getReplyHeader().getResponseTo(), requestId)); } - this.replyHeader = replyHeader; - - documents = new ArrayList<>(replyHeader.getNumberReturned()); - } - /** - * Gets the reply header. - * - * @return the reply header - */ - public ReplyHeader getReplyHeader() { - return replyHeader; + try (BsonInput bsonInput = new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer().duplicate())) { + try (BsonBinaryReader reader = new BsonBinaryReader(bsonInput)) { + document = decoder.decode(reader, DecoderContext.builder().build()); + } + } finally { + responseBuffers.reset(); + } } - /** - * Gets the documents. - * - * @return the documents - */ - public List getDocuments() { - return documents; + public T getDocument() { + return document; } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ResponseBuffers.java b/driver-core/src/main/com/mongodb/internal/connection/ResponseBuffers.java index d10b594ef6f..e984862fe0f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ResponseBuffers.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ResponseBuffers.java @@ -49,7 +49,7 @@ public ReplyHeader getReplyHeader() { T getResponseDocument(final int messageId, final Decoder decoder) { ReplyMessage replyMessage = new ReplyMessage<>(this, decoder, messageId); reset(); - return replyMessage.getDocuments().get(0); + return replyMessage.getDocument(); } /** diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/ReplyHeaderSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/ReplyHeaderSpecification.groovy index 9436559e910..0407baeca8a 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/ReplyHeaderSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/ReplyHeaderSpecification.groovy @@ -35,7 +35,7 @@ class ReplyHeaderSpecification extends Specification { writeInt(responseFlags) writeLong(9000) writeInt(4) - writeInt(30) + writeInt(1) } def byteBuf = outputBuffer.byteBuffers.get(0) @@ -46,12 +46,6 @@ class ReplyHeaderSpecification extends Specification { replyHeader.messageLength == 186 replyHeader.requestId == 45 replyHeader.responseTo == 23 - replyHeader.responseFlags == responseFlags - replyHeader.cursorId == 9000 - replyHeader.startingFrom == 4 - replyHeader.numberReturned == 30 - replyHeader.cursorNotFound == cursorNotFound - replyHeader.queryFailure == queryFailure where: responseFlags << [0, 1, 2, 3] @@ -72,7 +66,7 @@ class ReplyHeaderSpecification extends Specification { writeInt(responseFlags) writeLong(9000) writeInt(4) - writeInt(30) + writeInt(1) } def byteBuf = outputBuffer.byteBuffers.get(0) def compressedHeader = new CompressedHeader(byteBuf, new MessageHeader(byteBuf, getDefaultMaxMessageSize())) @@ -84,12 +78,6 @@ class ReplyHeaderSpecification extends Specification { replyHeader.messageLength == 274 replyHeader.requestId == 45 replyHeader.responseTo == 23 - replyHeader.responseFlags == responseFlags - replyHeader.cursorId == 9000 - replyHeader.startingFrom == 4 - replyHeader.numberReturned == 30 - replyHeader.cursorNotFound == cursorNotFound - replyHeader.queryFailure == queryFailure where: responseFlags << [0, 1, 2, 3] @@ -138,7 +126,7 @@ class ReplyHeaderSpecification extends Specification { then: def ex = thrown(MongoInternalException) - ex.getMessage() == 'The reply message length 35 is less than the mimimum message length 36' + ex.getMessage() == 'The reply message length 35 is less than the minimum message length 36' } def 'should throw MongoInternalException on message size > max message size'() { @@ -182,7 +170,7 @@ class ReplyHeaderSpecification extends Specification { then: def ex = thrown(MongoInternalException) - ex.getMessage() == 'The reply message number of returned documents, -1, is less than 0' + ex.getMessage() == 'The reply message number of returned documents, -1, is expected to be 1' } def 'should throw MongoInternalException on num documents < 0 with compressed header'() { @@ -208,6 +196,6 @@ class ReplyHeaderSpecification extends Specification { then: def ex = thrown(MongoInternalException) - ex.getMessage() == 'The reply message number of returned documents, -1, is less than 0' + ex.getMessage() == 'The reply message number of returned documents, -1, is expected to be 1' } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy index 6449d202f1b..12d22e31fd1 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy @@ -28,7 +28,6 @@ import com.mongodb.internal.session.SessionContext import com.mongodb.internal.validator.NoOpFieldNameValidator import org.bson.BsonArray import org.bson.BsonBinary -import org.bson.BsonBinaryReader import org.bson.BsonDocument import org.bson.BsonInt32 import org.bson.BsonMaximumSizeExceededException @@ -37,10 +36,7 @@ import org.bson.BsonTimestamp import org.bson.ByteBuf import org.bson.ByteBufNIO import org.bson.codecs.BsonDocumentCodec -import org.bson.codecs.DecoderContext import org.bson.io.BasicOutputBuffer -import org.bson.io.BsonInput -import org.bson.io.ByteBufferBsonInput import spock.lang.Specification import java.nio.ByteBuffer @@ -63,7 +59,7 @@ class CommandMessageSpecification extends Specification { .serverType(serverType as ServerType) .sessionSupported(true) .build(), - responseExpected, exhaustAllowed, null, null, clusterConnectionMode, null) + responseExpected, null, null, clusterConnectionMode, null) def output = new BasicOutputBuffer() when: @@ -76,8 +72,7 @@ class CommandMessageSpecification extends Specification { messageHeader.opCode == OpCode.OP_MSG.value replyHeader.requestId < RequestMessage.currentGlobalId replyHeader.responseTo == 0 - ((replyHeader.opMsgFlagBits & (1 << 16)) != 0) == exhaustAllowed - ((replyHeader.opMsgFlagBits & (1 << 1)) == 0) == responseExpected + replyHeader.hasMoreToCome() != responseExpected def expectedCommandDocument = command.clone() .append('$db', new BsonString(namespace.databaseName)) @@ -97,7 +92,7 @@ class CommandMessageSpecification extends Specification { getCommandDocument(byteBuf, replyHeader) == expectedCommandDocument where: - [readPreference, serverType, clusterConnectionMode, sessionContext, responseExpected, exhaustAllowed] << [ + [readPreference, serverType, clusterConnectionMode, sessionContext, responseExpected] << [ [ReadPreference.primary(), ReadPreference.secondary()], [ServerType.REPLICA_SET_PRIMARY, ServerType.SHARD_ROUTER], [ClusterConnectionMode.SINGLE, ClusterConnectionMode.MULTIPLE], @@ -126,7 +121,6 @@ class CommandMessageSpecification extends Specification { getReadConcern() >> ReadConcern.DEFAULT } ], - [true, false], [true, false] ].combinations() } @@ -372,12 +366,6 @@ class CommandMessageSpecification extends Specification { } private static BsonDocument getCommandDocument(ByteBufNIO byteBuf, ReplyHeader replyHeader) { - new ReplyMessage(new ResponseBuffers(replyHeader, byteBuf), new BsonDocumentCodec(), 0).documents.get(0) - } - - private static BsonDocument getCommandDocument(ByteBufNIO byteBuf) { - BsonInput bsonInput = new ByteBufferBsonInput(byteBuf) - BsonBinaryReader reader = new BsonBinaryReader(bsonInput) - new BsonDocumentCodec().decode(reader, DecoderContext.builder().build()) + new ReplyMessage(new ResponseBuffers(replyHeader, byteBuf), new BsonDocumentCodec(), 0).document } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/MessageHelper.java b/driver-core/src/test/unit/com/mongodb/internal/connection/MessageHelper.java index c98351bf793..2ef3c59cb95 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/MessageHelper.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/MessageHelper.java @@ -55,24 +55,15 @@ public static ResponseBuffers buildSuccessfulReply(final int responseTo, final S return buildReply(responseTo, json, 0); } - public static ResponseBuffers buildFailedReply(final String json) { - return buildFailedReply(0, json); - } - - public static ResponseBuffers buildFailedReply(final int responseTo, final String json) { - return buildReply(responseTo, json, 2); - } - public static ResponseBuffers buildReply(final int responseTo, final String json, final int responseFlags) { ByteBuf body = encodeJson(json); body.flip(); - ReplyHeader header = buildReplyHeader(responseTo, 1, body.remaining(), responseFlags); + ReplyHeader header = buildReplyHeader(responseTo, body.remaining(), responseFlags); return new ResponseBuffers(header, body); } - private static ReplyHeader buildReplyHeader(final int responseTo, final int numDocuments, final int documentsSize, - final int responseFlags) { + private static ReplyHeader buildReplyHeader(final int responseTo, final int documentsSize, final int responseFlags) { ByteBuffer headerByteBuffer = ByteBuffer.allocate(36); headerByteBuffer.order(ByteOrder.LITTLE_ENDIAN); headerByteBuffer.putInt(36 + documentsSize); // length @@ -82,7 +73,7 @@ private static ReplyHeader buildReplyHeader(final int responseTo, final int numD headerByteBuffer.putInt(responseFlags); // responseFlags headerByteBuffer.putLong(0); // cursorId headerByteBuffer.putInt(0); // startingFrom - headerByteBuffer.putInt(numDocuments); //numberReturned + headerByteBuffer.putInt(1); //numberReturned ((Buffer) headerByteBuffer).flip(); ByteBufNIO buffer = new ByteBufNIO(headerByteBuffer); diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ReplyMessageTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ReplyMessageTest.java index 7432ad713e9..8f454a30168 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/ReplyMessageTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ReplyMessageTest.java @@ -17,58 +17,22 @@ package com.mongodb.internal.connection; import com.mongodb.MongoInternalException; -import org.bson.ByteBufNIO; -import org.bson.Document; -import org.junit.Test; +import org.bson.codecs.BsonDocumentCodec; +import org.junit.jupiter.api.Test; -import java.nio.Buffer; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import static com.mongodb.connection.ConnectionDescription.getDefaultMaxMessageSize; +import static com.mongodb.internal.connection.MessageHelper.buildReply; +import static org.junit.jupiter.api.Assertions.assertThrows; public class ReplyMessageTest { - @Test(expected = MongoInternalException.class) + @Test public void shouldThrowExceptionIfRequestIdDoesNotMatchResponseTo() { int badResponseTo = 34565; int expectedResponseTo = 5; - ByteBuffer headerByteBuffer = ByteBuffer.allocate(36); - headerByteBuffer.order(ByteOrder.LITTLE_ENDIAN); - headerByteBuffer.putInt(36); - headerByteBuffer.putInt(2456); - headerByteBuffer.putInt(badResponseTo); - headerByteBuffer.putInt(1); - headerByteBuffer.putInt(0); - headerByteBuffer.putLong(0); - headerByteBuffer.putInt(0); - headerByteBuffer.putInt(0); - ((Buffer) headerByteBuffer).flip(); - - ByteBufNIO byteBuf = new ByteBufNIO(headerByteBuffer); - ReplyHeader replyHeader = new ReplyHeader(byteBuf, new MessageHeader(byteBuf, getDefaultMaxMessageSize())); - new ReplyMessage(replyHeader, expectedResponseTo); - } - - @Test(expected = MongoInternalException.class) - public void shouldThrowExceptionIfOpCodeIsIncorrect() { - int badOpCode = 2; - - ByteBuffer headerByteBuffer = ByteBuffer.allocate(36); - headerByteBuffer.order(ByteOrder.LITTLE_ENDIAN); - headerByteBuffer.putInt(36); - headerByteBuffer.putInt(2456); - headerByteBuffer.putInt(5); - headerByteBuffer.putInt(badOpCode); - headerByteBuffer.putInt(0); - headerByteBuffer.putLong(0); - headerByteBuffer.putInt(0); - headerByteBuffer.putInt(0); - ((Buffer) headerByteBuffer).flip(); + ResponseBuffers responseBuffers = buildReply(badResponseTo, "{ok: 1}", 0); - ByteBufNIO byteBuf = new ByteBufNIO(headerByteBuffer); - ReplyHeader replyHeader = new ReplyHeader(byteBuf, new MessageHeader(byteBuf, getDefaultMaxMessageSize())); - new ReplyMessage(replyHeader, 5); + assertThrows(MongoInternalException.class, () -> + new ReplyMessage<>(responseBuffers, new BsonDocumentCodec(), expectedResponseTo)); } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java index ce8b109cd52..e8003f692a9 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java @@ -177,7 +177,7 @@ public T sendAndReceive(final CommandMessage message, final Decoder decod throw getCommandFailureException(getResponseDocument(responseBuffers, message, new BsonDocumentCodec()), description.getServerAddress()); } - return new ReplyMessage<>(responseBuffers, decoder, message.getId()).getDocuments().get(0); + return new ReplyMessage<>(responseBuffers, decoder, message.getId()).getDocument(); } } @@ -200,7 +200,7 @@ private T getResponseDocument(final ResponseBuffers res final CommandMessage commandMessage, final Decoder decoder) { ReplyMessage replyMessage = new ReplyMessage<>(responseBuffers, decoder, commandMessage.getId()); responseBuffers.reset(); - return replyMessage.getDocuments().get(0); + return replyMessage.getDocument(); } @Override @@ -222,10 +222,10 @@ private ReplyHeader replaceResponseTo(final ReplyHeader header, final int respon headerByteBuffer.putInt(header.getRequestId()); headerByteBuffer.putInt(responseTo); headerByteBuffer.putInt(1); - headerByteBuffer.putInt(header.getResponseFlags()); - headerByteBuffer.putLong(header.getCursorId()); - headerByteBuffer.putInt(header.getStartingFrom()); - headerByteBuffer.putInt(header.getNumberReturned()); + headerByteBuffer.putInt(0); + headerByteBuffer.putLong(0); + headerByteBuffer.putInt(0); + headerByteBuffer.putInt(1); ((Buffer) headerByteBuffer).flip(); ByteBufNIO buffer = new ByteBufNIO(headerByteBuffer);