70
70
import com .mongodb .internal .client .model .bulk .UnacknowledgedClientBulkWriteResult ;
71
71
import com .mongodb .internal .connection .Connection ;
72
72
import com .mongodb .internal .connection .DualMessageSequences ;
73
- import com .mongodb .internal .connection .DualMessageSequences .WritersProviderAndLimitsChecker .OrdinaryAndStoredBsonWriters ;
74
73
import com .mongodb .internal .connection .IdHoldingBsonWriter ;
75
74
import com .mongodb .internal .connection .MongoWriteConcernWithResponseException ;
76
75
import com .mongodb .internal .connection .OperationContext ;
81
80
import com .mongodb .internal .validator .UpdateFieldNameValidator ;
82
81
import com .mongodb .lang .Nullable ;
83
82
import org .bson .BsonArray ;
83
+ import org .bson .BsonBinaryWriter ;
84
84
import org .bson .BsonBoolean ;
85
85
import org .bson .BsonDocument ;
86
86
import org .bson .BsonElement ;
109
109
import static com .mongodb .assertions .Assertions .assertNotNull ;
110
110
import static com .mongodb .assertions .Assertions .assertTrue ;
111
111
import static com .mongodb .assertions .Assertions .fail ;
112
+ import static com .mongodb .internal .connection .BsonWriterHelper .decorateWithDocumentSizeChecking ;
112
113
import static com .mongodb .internal .connection .DualMessageSequences .WritersProviderAndLimitsChecker .WriteResult .FAIL_LIMIT_EXCEEDED ;
113
114
import static com .mongodb .internal .connection .DualMessageSequences .WritersProviderAndLimitsChecker .WriteResult .OK_LIMIT_NOT_REACHED ;
114
115
import static com .mongodb .internal .operation .BulkWriteBatch .logWriteModelDoesNotSupportRetries ;
@@ -230,7 +231,8 @@ private Integer executeBatch(
230
231
retryState .attach (AttachmentKeys .maxWireVersion (), connectionDescription .getMaxWireVersion (), true )
231
232
.attach (AttachmentKeys .commandDescriptionSupplier (), () -> BULK_WRITE_COMMAND_NAME , false );
232
233
ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand (
233
- retryState , effectiveRetryWrites , effectiveWriteConcern , sessionContext , unexecutedModels , batchEncoder ,
234
+ retryState , effectiveRetryWrites , effectiveWriteConcern , sessionContext , unexecutedModels ,
235
+ connectionDescription .getMaxDocumentSize (), batchEncoder ,
234
236
() -> retryState .attach (AttachmentKeys .retryableCommandFlag (), true , true ));
235
237
return executeBulkWriteCommandAndExhaustOkResponse (
236
238
retryState , connectionSource , connection , bulkWriteCommand , effectiveWriteConcern , operationContext );
@@ -337,6 +339,7 @@ private ClientBulkWriteCommand createBulkWriteCommand(
337
339
final WriteConcern effectiveWriteConcern ,
338
340
final SessionContext sessionContext ,
339
341
final List <? extends ClientNamespacedWriteModel > unexecutedModels ,
342
+ final int maxStoredDocumentSize ,
340
343
final BatchEncoder batchEncoder ,
341
344
final Runnable retriesEnabler ) {
342
345
BsonDocument commandDocument = new BsonDocument (BULK_WRITE_COMMAND_NAME , new BsonInt32 (1 ))
@@ -353,7 +356,11 @@ private ClientBulkWriteCommand createBulkWriteCommand(
353
356
return new ClientBulkWriteCommand (
354
357
commandDocument ,
355
358
new ClientBulkWriteCommand .OpsAndNsInfo (
356
- effectiveRetryWrites , unexecutedModels , batchEncoder , options ,
359
+ effectiveRetryWrites , unexecutedModels ,
360
+ // we must validate the size only if no response is expected, otherwise we must rely on the server validation
361
+ effectiveWriteConcern .isAcknowledged () ? null : maxStoredDocumentSize ,
362
+ batchEncoder ,
363
+ options ,
357
364
() -> {
358
365
retriesEnabler .run ();
359
366
return retryState .isFirstAttempt ()
@@ -669,19 +676,23 @@ OpsAndNsInfo getOpsAndNsInfo() {
669
676
public static final class OpsAndNsInfo extends DualMessageSequences {
670
677
private final boolean effectiveRetryWrites ;
671
678
private final List <? extends ClientNamespacedWriteModel > models ;
679
+ @ Nullable
680
+ private final Integer maxStoredDocumentSize ;
672
681
private final BatchEncoder batchEncoder ;
673
682
private final ConcreteClientBulkWriteOptions options ;
674
683
private final Supplier <Long > doIfCommandIsRetryableAndAdvanceGetTxnNumber ;
675
684
676
685
OpsAndNsInfo (
677
686
final boolean effectiveRetryWrites ,
678
687
final List <? extends ClientNamespacedWriteModel > models ,
688
+ @ Nullable final Integer maxStoredDocumentSize ,
679
689
final BatchEncoder batchEncoder ,
680
690
final ConcreteClientBulkWriteOptions options ,
681
691
final Supplier <Long > doIfCommandIsRetryableAndAdvanceGetTxnNumber ) {
682
692
super ("ops" , new OpsFieldNameValidator (models ), "nsInfo" , NoOpFieldNameValidator .INSTANCE );
683
693
this .effectiveRetryWrites = effectiveRetryWrites ;
684
694
this .models = models ;
695
+ this .maxStoredDocumentSize = maxStoredDocumentSize ;
685
696
this .batchEncoder = batchEncoder ;
686
697
this .options = options ;
687
698
this .doIfCommandIsRetryableAndAdvanceGetTxnNumber = doIfCommandIsRetryableAndAdvanceGetTxnNumber ;
@@ -704,10 +715,10 @@ public EncodeDocumentsResult encodeDocuments(final WritersProviderAndLimitsCheck
704
715
int namespaceIndexInBatch = indexedNamespaces .computeIfAbsent (namespace , k -> indexedNamespacesSizeBeforeCompute );
705
716
boolean writeNewNamespace = indexedNamespaces .size () != indexedNamespacesSizeBeforeCompute ;
706
717
int finalModelIndexInBatch = modelIndexInBatch ;
707
- writeResult = writersProviderAndLimitsChecker .tryWrite ((opsWriters , nsInfoWriters ) -> {
708
- batchEncoder .encodeWriteModel (opsWriters , namespacedModel .getModel (), finalModelIndexInBatch , namespaceIndexInBatch );
718
+ writeResult = writersProviderAndLimitsChecker .tryWrite ((opsWriter , nsInfoWriter ) -> {
719
+ batchEncoder .encodeWriteModel (opsWriter , maxStoredDocumentSize ,
720
+ namespacedModel .getModel (), finalModelIndexInBatch , namespaceIndexInBatch );
709
721
if (writeNewNamespace ) {
710
- BsonWriter nsInfoWriter = nsInfoWriters .getWriter ();
711
722
nsInfoWriter .writeStartDocument ();
712
723
nsInfoWriter .writeString ("ns" , namespace .getFullName ());
713
724
nsInfoWriter .writeEndDocument ();
@@ -940,7 +951,7 @@ private final class BatchEncoder {
940
951
/**
941
952
* Must be called at most once.
942
953
* Must not be called before calling
943
- * {@link #encodeWriteModel(OrdinaryAndStoredBsonWriters , ClientWriteModel, int, int)} at least once.
954
+ * {@link #encodeWriteModel(BsonBinaryWriter, Integer , ClientWriteModel, int, int)} at least once.
944
955
* Renders {@code this} unusable.
945
956
*/
946
957
EncodedBatchInfo intoEncodedBatchInfo () {
@@ -961,16 +972,16 @@ void reset(final int modelIndexInBatch) {
961
972
}
962
973
963
974
void encodeWriteModel (
964
- final OrdinaryAndStoredBsonWriters writers ,
975
+ final BsonBinaryWriter writer ,
976
+ @ Nullable final Integer maxStoredDocumentSize ,
965
977
final ClientWriteModel model ,
966
978
final int modelIndexInBatch ,
967
979
final int namespaceIndexInBatch ) {
968
980
assertNotNull (encodedBatchInfo ).modelsCount ++;
969
- BsonWriter writer = writers .getWriter ();
970
981
writer .writeStartDocument ();
971
982
if (model instanceof ConcreteClientInsertOneModel ) {
972
983
writer .writeInt32 ("insert" , namespaceIndexInBatch );
973
- encodeWriteModelInternals (writers , (ConcreteClientInsertOneModel ) model , modelIndexInBatch );
984
+ encodeWriteModelInternals (writer , maxStoredDocumentSize , (ConcreteClientInsertOneModel ) model , modelIndexInBatch );
974
985
} else if (model instanceof ConcreteClientUpdateOneModel ) {
975
986
writer .writeInt32 ("update" , namespaceIndexInBatch );
976
987
writer .writeBoolean ("multi" , false );
@@ -981,7 +992,7 @@ void encodeWriteModel(
981
992
encodeWriteModelInternals (writer , (ConcreteClientUpdateManyModel ) model );
982
993
} else if (model instanceof ConcreteClientReplaceOneModel ) {
983
994
writer .writeInt32 ("update" , namespaceIndexInBatch );
984
- encodeWriteModelInternals (writers , (ConcreteClientReplaceOneModel ) model );
995
+ encodeWriteModelInternals (writer , maxStoredDocumentSize , (ConcreteClientReplaceOneModel ) model );
985
996
} else if (model instanceof ConcreteClientDeleteOneModel ) {
986
997
writer .writeInt32 ("delete" , namespaceIndexInBatch );
987
998
writer .writeBoolean ("multi" , false );
@@ -996,12 +1007,16 @@ void encodeWriteModel(
996
1007
writer .writeEndDocument ();
997
1008
}
998
1009
999
- private void encodeWriteModelInternals (final OrdinaryAndStoredBsonWriters writers , final ConcreteClientInsertOneModel model , final int modelIndexInBatch ) {
1000
- writers .getWriter ().writeName ("document" );
1010
+ private void encodeWriteModelInternals (
1011
+ final BsonBinaryWriter writer ,
1012
+ @ Nullable final Integer maxStoredDocumentSize ,
1013
+ final ConcreteClientInsertOneModel model ,
1014
+ final int modelIndexInBatch ) {
1015
+ writer .writeName ("document" );
1001
1016
Object document = model .getDocument ();
1002
1017
assertNotNull (encodedBatchInfo ).insertModelDocumentIds .compute (modelIndexInBatch , (k , knownModelDocumentId ) -> {
1003
1018
IdHoldingBsonWriter documentIdHoldingBsonWriter = new IdHoldingBsonWriter (
1004
- writers . getStoredDocumentWriter ( ),
1019
+ decorateWithDocumentSizeChecking ( writer , maxStoredDocumentSize ),
1005
1020
// Reuse `knownModelDocumentId` if it may have been generated by `IdHoldingBsonWriter` in a previous attempt.
1006
1021
// If its type is not `BsonObjectId`, we know it could not have been generated.
1007
1022
knownModelDocumentId instanceof BsonObjectId ? knownModelDocumentId .asObjectId () : null );
@@ -1040,13 +1055,16 @@ private void encodeWriteModelInternals(final BsonWriter writer, final AbstractCl
1040
1055
options .isUpsert ().ifPresent (value -> writer .writeBoolean ("upsert" , value ));
1041
1056
}
1042
1057
1043
- private void encodeWriteModelInternals (final OrdinaryAndStoredBsonWriters writers , final ConcreteClientReplaceOneModel model ) {
1044
- BsonWriter writer = writers .getWriter ();
1058
+ private void encodeWriteModelInternals (
1059
+ final BsonBinaryWriter writer ,
1060
+ @ Nullable final Integer maxStoredDocumentSize ,
1061
+ final ConcreteClientReplaceOneModel model ) {
1045
1062
writer .writeBoolean ("multi" , false );
1046
1063
writer .writeName ("filter" );
1047
1064
encodeUsingRegistry (writer , model .getFilter ());
1048
1065
writer .writeName ("updateMods" );
1049
- encodeUsingRegistry (writers .getStoredDocumentWriter (), model .getReplacement (), COLLECTIBLE_DOCUMENT_ENCODER_CONTEXT );
1066
+ encodeUsingRegistry (decorateWithDocumentSizeChecking (writer , maxStoredDocumentSize ), model .getReplacement (),
1067
+ COLLECTIBLE_DOCUMENT_ENCODER_CONTEXT );
1050
1068
ConcreteClientReplaceOptions options = model .getOptions ();
1051
1069
options .getCollation ().ifPresent (value -> {
1052
1070
writer .writeName ("collation" );
0 commit comments