34
34
import software .amazon .awssdk .enhanced .dynamodb .mapper .StaticAttributeTag ;
35
35
import software .amazon .awssdk .enhanced .dynamodb .mapper .StaticTableMetadata ;
36
36
import software .amazon .awssdk .services .dynamodb .model .AttributeValue ;
37
- import software .amazon .awssdk .utils .Pair ;
37
+ import software .amazon .awssdk .utils .Validate ;
38
38
39
39
/**
40
40
* This extension implements optimistic locking on record writes by means of a 'record version number' that is used
@@ -61,20 +61,19 @@ public final class VersionedRecordExtension implements DynamoDbEnhancedClientExt
61
61
private static final Function <String , String > VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER = key -> ":old_" + key + "_value" ;
62
62
private static final String CUSTOM_METADATA_KEY = "VersionedRecordExtension:VersionAttribute" ;
63
63
private static final VersionAttribute VERSION_ATTRIBUTE = new VersionAttribute ();
64
- private static final AttributeValue DEFAULT_VALUE = AttributeValue .fromNul (Boolean .TRUE );
65
-
66
- private final int startAt ;
67
- private final int incrementBy ;
68
-
69
- /**
70
- * Creates a new {@link VersionedRecordExtension} using the supplied starting and incrementing value.
71
- *
72
- * @param startAt the value used to compare if a record is the initial version of a record.
73
- * @param incrementBy the amount to increment the version by with each subsequent update.
74
- */
75
- private VersionedRecordExtension (int startAt , int incrementBy ) {
76
- this .startAt = startAt ;
77
- this .incrementBy = incrementBy ;
64
+
65
+ private final long startAt ;
66
+ private final long incrementBy ;
67
+
68
+ private VersionedRecordExtension (Long startAt , Long incrementBy ) {
69
+ Validate .isNotNegativeOrNull (startAt , "startAt" );
70
+
71
+ if (incrementBy != null && incrementBy < 1 ) {
72
+ throw new IllegalArgumentException ("IncrementBy must be greater than 0." );
73
+ }
74
+
75
+ this .startAt = startAt != null ? startAt : 0L ;
76
+ this .incrementBy = incrementBy != null ? incrementBy : 1L ;
78
77
}
79
78
80
79
public static Builder builder () {
@@ -89,7 +88,7 @@ public static StaticAttributeTag versionAttribute() {
89
88
return VERSION_ATTRIBUTE ;
90
89
}
91
90
92
- public static StaticAttributeTag versionAttribute (Integer startAt , Integer incrementBy ) {
91
+ public static StaticAttributeTag versionAttribute (Long startAt , Long incrementBy ) {
93
92
return new VersionAttribute (startAt , incrementBy );
94
93
}
95
94
}
@@ -98,15 +97,15 @@ private static final class VersionAttribute implements StaticAttributeTag {
98
97
private static final String START_AT_METADATA_KEY = "VersionedRecordExtension:StartAt" ;
99
98
private static final String INCREMENT_BY_METADATA_KEY = "VersionedRecordExtension:IncrementBy" ;
100
99
101
- private final Integer startAt ;
102
- private final Integer incrementBy ;
100
+ private final Long startAt ;
101
+ private final Long incrementBy ;
103
102
104
103
private VersionAttribute () {
105
104
this .startAt = null ;
106
105
this .incrementBy = null ;
107
106
}
108
107
109
- private VersionAttribute (Integer startAt , Integer incrementBy ) {
108
+ private VersionAttribute (Long startAt , Long incrementBy ) {
110
109
this .startAt = startAt ;
111
110
this .incrementBy = incrementBy ;
112
111
}
@@ -120,16 +119,13 @@ public Consumer<StaticTableMetadata.Builder> modifyMetadata(String attributeName
120
119
"is supported." , attributeName , attributeValueType .name ()));
121
120
}
122
121
123
- if (startAt != null && startAt < 0 ) {
124
- throw new IllegalArgumentException ("StartAt cannot be negative." );
125
- }
122
+ Validate .isNotNegativeOrNull (startAt , "startAt" );
126
123
127
124
if (incrementBy != null && incrementBy < 1 ) {
128
125
throw new IllegalArgumentException ("IncrementBy must be greater than 0." );
129
126
}
130
127
131
- return metadata -> metadata
132
- .addCustomMetadataObject (CUSTOM_METADATA_KEY , attributeName )
128
+ return metadata -> metadata .addCustomMetadataObject (CUSTOM_METADATA_KEY , attributeName )
133
129
.addCustomMetadataObject (START_AT_METADATA_KEY , startAt )
134
130
.addCustomMetadataObject (INCREMENT_BY_METADATA_KEY , incrementBy )
135
131
.markAttributeAsKey (attributeName , attributeValueType );
@@ -145,109 +141,69 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
145
141
return WriteModification .builder ().build ();
146
142
}
147
143
148
- Pair <AttributeValue , Expression > updates = getRecordUpdates (versionAttributeKey .get (), context );
149
-
150
- AttributeValue newVersionValue = updates .left ();
151
- Expression condition = updates .right ();
152
-
153
144
Map <String , AttributeValue > itemToTransform = new HashMap <>(context .items ());
154
- itemToTransform .put (versionAttributeKey .get (), newVersionValue );
155
-
156
- return WriteModification .builder ()
157
- .transformedItem (Collections .unmodifiableMap (itemToTransform ))
158
- .additionalConditionalExpression (condition )
159
- .build ();
160
- }
161
-
162
- private Pair <AttributeValue , Expression > getRecordUpdates (String versionAttributeKey ,
163
- DynamoDbExtensionContext .BeforeWrite context ) {
164
- Map <String , AttributeValue > itemToTransform = context .items ();
165
-
166
- // Default to NUL if not present to reduce additional checks further along
167
- AttributeValue existingVersionValue = itemToTransform .getOrDefault (versionAttributeKey , DEFAULT_VALUE );
168
145
169
- if (isInitialVersion (existingVersionValue , context )) {
170
- return createInitialRecord (versionAttributeKey , context );
171
- }
172
- // Existing record, increment version
173
- return updateExistingRecord (versionAttributeKey , existingVersionValue , context );
174
- }
175
-
176
- private boolean isInitialVersion (AttributeValue existingVersionValue , DynamoDbExtensionContext .BeforeWrite context ) {
177
- Optional <Integer > versionStartAtFromAnnotation = context .tableMetadata ()
178
- .customMetadataObject (VersionAttribute .START_AT_METADATA_KEY ,
179
- Integer .class );
180
-
181
- return isNullAttributeValue (existingVersionValue )
182
- || (versionStartAtFromAnnotation .isPresent ()
183
- && getExistingVersion (existingVersionValue ) == versionStartAtFromAnnotation .get ())
184
- || getExistingVersion (existingVersionValue ) == this .startAt ;
185
- }
186
-
187
- private Pair <AttributeValue , Expression > createInitialRecord (String versionAttributeKey ,
188
- DynamoDbExtensionContext .BeforeWrite context ) {
189
- Optional <Integer > versionStartAtFromAnnotation = context .tableMetadata ()
190
- .customMetadataObject (VersionAttribute .START_AT_METADATA_KEY ,
191
- Integer .class );
192
-
193
- AttributeValue newVersionValue = versionStartAtFromAnnotation .isPresent () ?
194
- incrementVersion (versionStartAtFromAnnotation .get (), context ) :
195
- incrementVersion (this .startAt , context );
196
-
197
-
198
- String attributeKeyRef = keyRef (versionAttributeKey );
199
-
200
- Expression condition = Expression .builder ()
201
- // Check that the version does not exist before setting the initial value.
202
- .expression (String .format ("attribute_not_exists(%s)" , attributeKeyRef ))
203
- .expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey ))
204
- .build ();
205
-
206
- return Pair .of (newVersionValue , condition );
207
- }
208
-
209
- private Pair <AttributeValue , Expression > updateExistingRecord (String versionAttributeKey ,
210
- AttributeValue existingVersionValue ,
211
- DynamoDbExtensionContext .BeforeWrite context ) {
212
- int existingVersion = getExistingVersion (existingVersionValue );
213
- AttributeValue newVersionValue = incrementVersion (existingVersion , context );
214
-
215
- String attributeKeyRef = keyRef (versionAttributeKey );
216
- String existingVersionValueKey = VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER .apply (versionAttributeKey );
146
+ String attributeKeyRef = keyRef (versionAttributeKey .get ());
147
+ AttributeValue newVersionValue ;
148
+ Expression condition ;
149
+ Optional <AttributeValue > existingVersionValue =
150
+ Optional .ofNullable (itemToTransform .get (versionAttributeKey .get ()));
151
+
152
+ Optional <Long > versionStartAtFromAnnotation = context .tableMetadata ()
153
+ .customMetadataObject (VersionAttribute .START_AT_METADATA_KEY ,
154
+ Long .class );
155
+
156
+ Optional <Long > versionIncrementByFromAnnotation = context .tableMetadata ()
157
+ .customMetadataObject (VersionAttribute .INCREMENT_BY_METADATA_KEY ,
158
+ Long .class );
159
+
160
+ if (!existingVersionValue .isPresent () || isNullAttributeValue (existingVersionValue .get ()) ||
161
+ (existingVersionValue .get ().n () != null &&
162
+ ((versionStartAtFromAnnotation .isPresent () &&
163
+ Long .parseLong (existingVersionValue .get ().n ()) == versionStartAtFromAnnotation .get ()) ||
164
+ Long .parseLong (existingVersionValue .get ().n ()) == this .startAt ))) {
165
+
166
+ long startValue = versionStartAtFromAnnotation .orElse (this .startAt );
167
+ long increment = versionIncrementByFromAnnotation .orElse (this .incrementBy );
168
+
169
+ newVersionValue = AttributeValue .builder ().n (Long .toString (startValue + increment )).build ();
170
+ condition = Expression .builder ()
171
+ .expression (String .format ("attribute_not_exists(%s)" , attributeKeyRef ))
172
+ .expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey .get ()))
173
+ .build ();
174
+ } else {
175
+ // Existing record, increment version
176
+ if (existingVersionValue .get ().n () == null ) {
177
+ // In this case a non-null version attribute is present, but it's not an N
178
+ throw new IllegalArgumentException ("Version attribute appears to be the wrong type. N is required." );
179
+ }
217
180
218
- Expression condition = Expression .builder ()
219
- // Check that the version matches the existing value before setting the updated value.
220
- .expression (String .format ("%s = %s" , attributeKeyRef , existingVersionValueKey ))
221
- .expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey ))
222
- .expressionValues (Collections .singletonMap (existingVersionValueKey ,
223
- existingVersionValue ))
224
- .build ();
181
+ long existingVersion = Long .parseLong (existingVersionValue .get ().n ());
182
+ String existingVersionValueKey = VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER .apply (versionAttributeKey .get ());
225
183
226
- return Pair . of ( newVersionValue , condition );
227
- }
184
+ long increment = versionIncrementByFromAnnotation . orElse ( this . incrementBy );
185
+ newVersionValue = AttributeValue . builder (). n ( Long . toString ( existingVersion + increment )). build ();
228
186
229
- private int getExistingVersion (AttributeValue existingVersionValue ) {
230
- if (existingVersionValue .n () == null ) {
231
- throw new IllegalArgumentException ("Version attribute appears to be the wrong type. N is required." );
187
+ condition = Expression .builder ()
188
+ .expression (String .format ("%s = %s" , attributeKeyRef , existingVersionValueKey ))
189
+ .expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey .get ()))
190
+ .expressionValues (Collections .singletonMap (existingVersionValueKey ,
191
+ existingVersionValue .get ()))
192
+ .build ();
232
193
}
233
194
234
- return Integer .parseInt (existingVersionValue .n ());
235
- }
195
+ itemToTransform .put (versionAttributeKey .get (), newVersionValue );
236
196
237
- private AttributeValue incrementVersion (int version , DynamoDbExtensionContext .BeforeWrite context ) {
238
- Optional <Integer > versionIncrementByFromAnnotation = context .tableMetadata ()
239
- .customMetadataObject (VersionAttribute .INCREMENT_BY_METADATA_KEY ,
240
- Integer .class );
241
- if (versionIncrementByFromAnnotation .isPresent ()) {
242
- return AttributeValue .fromN (Integer .toString (version + versionIncrementByFromAnnotation .get ()));
243
- }
244
- return AttributeValue .fromN (Integer .toString (version + this .incrementBy ));
197
+ return WriteModification .builder ()
198
+ .transformedItem (Collections .unmodifiableMap (itemToTransform ))
199
+ .additionalConditionalExpression (condition )
200
+ .build ();
245
201
}
246
202
247
203
@ NotThreadSafe
248
204
public static final class Builder {
249
- private int startAt = 0 ;
250
- private int incrementBy = 1 ;
205
+ private Long startAt = 0L ;
206
+ private Long incrementBy = 1L ;
251
207
252
208
private Builder () {
253
209
}
@@ -259,10 +215,7 @@ private Builder() {
259
215
* @param startAt
260
216
* @return the builder instance
261
217
*/
262
- public Builder startAt (int startAt ) {
263
- if (startAt < 0 ) {
264
- throw new IllegalArgumentException ("StartAt cannot be negative." );
265
- }
218
+ public Builder startAt (Long startAt ) {
266
219
this .startAt = startAt ;
267
220
return this ;
268
221
}
@@ -274,10 +227,7 @@ public Builder startAt(int startAt) {
274
227
* @param incrementBy
275
228
* @return the builder instance
276
229
*/
277
- public Builder incrementBy (int incrementBy ) {
278
- if (incrementBy < 1 ) {
279
- throw new IllegalArgumentException ("IncrementBy must be greater than 0." );
280
- }
230
+ public Builder incrementBy (Long incrementBy ) {
281
231
this .incrementBy = incrementBy ;
282
232
return this ;
283
233
}
0 commit comments