|
20 | 20 | import com.mongodb.MongoOperationTimeoutException;
|
21 | 21 | import com.mongodb.ReadConcern;
|
22 | 22 | import com.mongodb.ReadPreference;
|
| 23 | +import com.mongodb.WriteConcern; |
| 24 | +import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; |
23 | 25 | import com.mongodb.connection.ClusterConnectionMode;
|
24 | 26 | import com.mongodb.connection.ServerType;
|
| 27 | +import com.mongodb.internal.IgnorableRequestContext; |
25 | 28 | import com.mongodb.internal.TimeoutContext;
|
| 29 | +import com.mongodb.internal.TimeoutSettings; |
| 30 | +import com.mongodb.internal.client.model.bulk.ConcreteClientBulkWriteOptions; |
26 | 31 | import com.mongodb.internal.connection.MessageSequences.EmptyMessageSequences;
|
| 32 | +import com.mongodb.internal.operation.ClientBulkWriteOperation; |
| 33 | +import com.mongodb.internal.operation.ClientBulkWriteOperation.ClientBulkWriteCommand.OpsAndNsInfo; |
27 | 34 | import com.mongodb.internal.session.SessionContext;
|
28 | 35 | import com.mongodb.internal.validator.NoOpFieldNameValidator;
|
| 36 | +import org.bson.BsonArray; |
| 37 | +import org.bson.BsonBoolean; |
29 | 38 | import org.bson.BsonDocument;
|
| 39 | +import org.bson.BsonInt32; |
30 | 40 | import org.bson.BsonString;
|
31 | 41 | import org.bson.BsonTimestamp;
|
32 | 42 | import org.junit.jupiter.api.Test;
|
33 | 43 |
|
| 44 | +import java.util.List; |
| 45 | +import java.util.stream.Collectors; |
| 46 | +import java.util.stream.IntStream; |
| 47 | + |
| 48 | +import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry; |
| 49 | +import static com.mongodb.client.model.bulk.ClientBulkWriteOptions.clientBulkWriteOptions; |
34 | 50 | import static com.mongodb.internal.mockito.MongoMockito.mock;
|
35 | 51 | import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_ZERO_WIRE_VERSION;
|
| 52 | +import static com.mongodb.internal.operation.ServerVersionHelper.LATEST_WIRE_VERSION; |
| 53 | +import static java.util.Arrays.asList; |
| 54 | +import static java.util.Collections.singletonList; |
| 55 | +import static org.junit.jupiter.api.Assertions.assertEquals; |
36 | 56 | import static org.junit.jupiter.api.Assertions.assertThrows;
|
37 | 57 | import static org.mockito.ArgumentMatchers.any;
|
38 | 58 | import static org.mockito.Mockito.doThrow;
|
@@ -105,4 +125,49 @@ void encodeShouldNotAddExtraElementsFromTimeoutContextWhenConnectedToMongoCrypt(
|
105 | 125 | verifyNoInteractions(timeoutContext);
|
106 | 126 | }
|
107 | 127 | }
|
| 128 | + |
| 129 | + @Test |
| 130 | + void getCommandDocumentFromClientBulkWrite() { |
| 131 | + MongoNamespace ns = new MongoNamespace("db", "test"); |
| 132 | + boolean retryWrites = false; |
| 133 | + BsonDocument command = new BsonDocument("bulkWrite", new BsonInt32(1)) |
| 134 | + .append("errorsOnly", BsonBoolean.valueOf(false)) |
| 135 | + .append("ordered", BsonBoolean.valueOf(true)); |
| 136 | + List<BsonDocument> documents = IntStream.range(0, 2).mapToObj(i -> new BsonDocument("_id", new BsonInt32(i))) |
| 137 | + .collect(Collectors.toList()); |
| 138 | + List<ClientNamespacedWriteModel> writeModels = asList( |
| 139 | + ClientNamespacedWriteModel.insertOne(ns, documents.get(0)), |
| 140 | + ClientNamespacedWriteModel.insertOne(ns, documents.get(1))); |
| 141 | + OpsAndNsInfo opsAndNsInfo = new OpsAndNsInfo( |
| 142 | + retryWrites, |
| 143 | + writeModels, |
| 144 | + null, |
| 145 | + new ClientBulkWriteOperation( |
| 146 | + writeModels, |
| 147 | + clientBulkWriteOptions(), |
| 148 | + WriteConcern.MAJORITY, |
| 149 | + retryWrites, |
| 150 | + getDefaultCodecRegistry() |
| 151 | + ).new BatchEncoder(), |
| 152 | + (ConcreteClientBulkWriteOptions) clientBulkWriteOptions(), |
| 153 | + () -> 1L); |
| 154 | + BsonDocument expectedCommandDocument = command.clone() |
| 155 | + .append("$db", new BsonString(ns.getDatabaseName())) |
| 156 | + .append("ops", new BsonArray(asList( |
| 157 | + new BsonDocument("insert", new BsonInt32(0)).append("document", documents.get(0)), |
| 158 | + new BsonDocument("insert", new BsonInt32(0)).append("document", documents.get(1))))) |
| 159 | + .append("nsInfo", new BsonArray(singletonList(new BsonDocument("ns", new BsonString(ns.toString()))))); |
| 160 | + CommandMessage commandMessage = new CommandMessage( |
| 161 | + ns, command, NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), |
| 162 | + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), true, opsAndNsInfo, ClusterConnectionMode.MULTIPLE, null); |
| 163 | + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { |
| 164 | + commandMessage.encode( |
| 165 | + output, |
| 166 | + new OperationContext( |
| 167 | + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, |
| 168 | + new TimeoutContext(TimeoutSettings.DEFAULT), null)); |
| 169 | + BsonDocument actualCommandDocument = commandMessage.getCommandDocument(output); |
| 170 | + assertEquals(expectedCommandDocument, actualCommandDocument); |
| 171 | + } |
| 172 | + } |
108 | 173 | }
|
0 commit comments