34
34
import software .amazon .awssdk .enhanced .dynamodb .mapper .StaticAttributeTag ;
35
35
import software .amazon .awssdk .enhanced .dynamodb .mapper .StaticTableMetadata ;
36
36
import software .amazon .awssdk .services .dynamodb .model .AttributeValue ;
37
+ import software .amazon .awssdk .utils .Pair ;
37
38
38
39
/**
39
40
* This extension implements optimistic locking on record writes by means of a 'record version number' that is used
@@ -60,8 +61,20 @@ public final class VersionedRecordExtension implements DynamoDbEnhancedClientExt
60
61
private static final Function <String , String > VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER = key -> ":old_" + key + "_value" ;
61
62
private static final String CUSTOM_METADATA_KEY = "VersionedRecordExtension:VersionAttribute" ;
62
63
private static final VersionAttribute VERSION_ATTRIBUTE = new VersionAttribute ();
64
+ private static final AttributeValue DEFAULT_VALUE = AttributeValue .fromNul (Boolean .TRUE );
63
65
64
- private VersionedRecordExtension () {
66
+ private final int startAt ;
67
+ private final int incrementBy ;
68
+
69
+ /**
70
+ * Creates a new {@link VersionedRecordExtension} using the supplied starting and incrementing value.
71
+ *
72
+ * @param startAt the value used to compare if a record is the initial version of a record.
73
+ * @param incrementBy the amount to increment the version by with each subsequent update.
74
+ */
75
+ private VersionedRecordExtension (int startAt , int incrementBy ) {
76
+ this .startAt = startAt ;
77
+ this .incrementBy = incrementBy ;
65
78
}
66
79
67
80
public static Builder builder () {
@@ -75,19 +88,42 @@ private AttributeTags() {
75
88
public static StaticAttributeTag versionAttribute () {
76
89
return VERSION_ATTRIBUTE ;
77
90
}
91
+
92
+ public static StaticAttributeTag versionAttribute (int startAt , int incrementBy ) {
93
+ return new VersionAttribute (startAt , incrementBy );
94
+ }
78
95
}
79
96
80
- private static class VersionAttribute implements StaticAttributeTag {
97
+ private static final class VersionAttribute implements StaticAttributeTag {
98
+ private static final String START_AT_METADATA_KEY = "VersionedRecordExtension:StartAt" ;
99
+ private static final String INCREMENT_BY_METADATA_KEY = "VersionedRecordExtension:IncrementBy" ;
100
+
101
+ private final int startAt ;
102
+ private final int incrementBy ;
103
+
104
+ private VersionAttribute () {
105
+ this .startAt = 0 ;
106
+ this .incrementBy = 1 ;
107
+ }
108
+
109
+ private VersionAttribute (int startAt , int incrementBy ) {
110
+ this .startAt = startAt ;
111
+ this .incrementBy = incrementBy ;
112
+ }
113
+
81
114
@ Override
82
115
public Consumer <StaticTableMetadata .Builder > modifyMetadata (String attributeName ,
83
116
AttributeValueType attributeValueType ) {
84
117
if (attributeValueType != AttributeValueType .N ) {
85
118
throw new IllegalArgumentException (String .format (
86
119
"Attribute '%s' of type %s is not a suitable type to be used as a version attribute. Only type 'N' " +
87
- "is supported." , attributeName , attributeValueType .name ()));
120
+ "is supported." , attributeName , attributeValueType .name ()));
88
121
}
89
122
90
- return metadata -> metadata .addCustomMetadataObject (CUSTOM_METADATA_KEY , attributeName )
123
+ return metadata -> metadata
124
+ .addCustomMetadataObject (CUSTOM_METADATA_KEY , attributeName )
125
+ .addCustomMetadataObject (START_AT_METADATA_KEY , startAt )
126
+ .addCustomMetadataObject (INCREMENT_BY_METADATA_KEY , incrementBy )
91
127
.markAttributeAsKey (attributeName , attributeValueType );
92
128
}
93
129
}
@@ -101,39 +137,14 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
101
137
return WriteModification .builder ().build ();
102
138
}
103
139
104
- Map <String , AttributeValue > itemToTransform = new HashMap <>(context .items ());
105
140
106
- String attributeKeyRef = keyRef (versionAttributeKey .get ());
107
- AttributeValue newVersionValue ;
108
- Expression condition ;
109
- Optional <AttributeValue > existingVersionValue =
110
- Optional .ofNullable (itemToTransform .get (versionAttributeKey .get ()));
111
-
112
- if (!existingVersionValue .isPresent () || isNullAttributeValue (existingVersionValue .get ())) {
113
- // First version of the record
114
- newVersionValue = AttributeValue .builder ().n ("1" ).build ();
115
- condition = Expression .builder ()
116
- .expression (String .format ("attribute_not_exists(%s)" , attributeKeyRef ))
117
- .expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey .get ()))
118
- .build ();
119
- } else {
120
- // Existing record, increment version
121
- if (existingVersionValue .get ().n () == null ) {
122
- // In this case a non-null version attribute is present, but it's not an N
123
- throw new IllegalArgumentException ("Version attribute appears to be the wrong type. N is required." );
124
- }
141
+ Pair <AttributeValue , Expression > updates = getRecordUpdates (versionAttributeKey .get (), context );
125
142
126
- int existingVersion = Integer .parseInt (existingVersionValue .get ().n ());
127
- String existingVersionValueKey = VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER .apply (versionAttributeKey .get ());
128
- newVersionValue = AttributeValue .builder ().n (Integer .toString (existingVersion + 1 )).build ();
129
- condition = Expression .builder ()
130
- .expression (String .format ("%s = %s" , attributeKeyRef , existingVersionValueKey ))
131
- .expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey .get ()))
132
- .expressionValues (Collections .singletonMap (existingVersionValueKey ,
133
- existingVersionValue .get ()))
134
- .build ();
135
- }
143
+ // Unpack values from Pair
144
+ AttributeValue newVersionValue = updates .left ();
145
+ Expression condition = updates .right ();
136
146
147
+ Map <String , AttributeValue > itemToTransform = new HashMap <>(context .items ());
137
148
itemToTransform .put (versionAttributeKey .get (), newVersionValue );
138
149
139
150
return WriteModification .builder ()
@@ -142,13 +153,133 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
142
153
.build ();
143
154
}
144
155
156
+ private Pair <AttributeValue , Expression > getRecordUpdates (String versionAttributeKey ,
157
+ DynamoDbExtensionContext .BeforeWrite context ) {
158
+ Map <String , AttributeValue > itemToTransform = context .items ();
159
+
160
+ // Default to NUL if not present to reduce additional checks further along
161
+ AttributeValue existingVersionValue = itemToTransform .getOrDefault (versionAttributeKey , DEFAULT_VALUE );
162
+
163
+ if (isInitialVersion (existingVersionValue , context )) {
164
+ // First version of the record ensure it does not exist
165
+ return createInitialRecord (versionAttributeKey , context );
166
+ }
167
+ // Existing record, increment version
168
+ return updateExistingRecord (versionAttributeKey , existingVersionValue , context );
169
+ }
170
+
171
+ private boolean isInitialVersion (AttributeValue existingVersionValue , DynamoDbExtensionContext .BeforeWrite context ) {
172
+ if (isNullAttributeValue (existingVersionValue )) {
173
+ return true ;
174
+ }
175
+
176
+
177
+ Optional <Integer > versionStartAtFromAnnotation = context .tableMetadata ()
178
+ .customMetadataObject (VersionAttribute .START_AT_METADATA_KEY ,
179
+ Integer .class );
180
+
181
+ return isNullAttributeValue (existingVersionValue )
182
+ || (versionStartAtFromAnnotation .isPresent () &&
183
+ getExistingVersion (existingVersionValue ) == versionStartAtFromAnnotation .get ())
184
+ || (!versionStartAtFromAnnotation .isPresent () &&
185
+ getExistingVersion (existingVersionValue ) == this .startAt );
186
+ }
187
+
188
+ private Pair <AttributeValue , Expression > createInitialRecord (String versionAttributeKey ,
189
+ DynamoDbExtensionContext .BeforeWrite context ) {
190
+ Optional <Integer > versionStartAtFromAnnotation = context .tableMetadata ()
191
+ .customMetadataObject (VersionAttribute .START_AT_METADATA_KEY ,
192
+ Integer .class );
193
+
194
+ AttributeValue newVersionValue = versionStartAtFromAnnotation .isPresent () ?
195
+ incrementVersion (versionStartAtFromAnnotation .get (), context ) :
196
+ incrementVersion (this .startAt , context );
197
+
198
+
199
+ String attributeKeyRef = keyRef (versionAttributeKey );
200
+
201
+ Expression condition = Expression .builder ()
202
+ // Check that the version does not exist before setting the initial value.
203
+ .expression (String .format ("attribute_not_exists(%s)" , attributeKeyRef ))
204
+ .expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey ))
205
+ .build ();
206
+
207
+ return Pair .of (newVersionValue , condition );
208
+ }
209
+
210
+ private Pair <AttributeValue , Expression > updateExistingRecord (String versionAttributeKey ,
211
+ AttributeValue existingVersionValue ,
212
+ DynamoDbExtensionContext .BeforeWrite context ) {
213
+ int existingVersion = getExistingVersion (existingVersionValue );
214
+ AttributeValue newVersionValue = incrementVersion (existingVersion , context );
215
+
216
+ String attributeKeyRef = keyRef (versionAttributeKey );
217
+ String existingVersionValueKey = VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER .apply (versionAttributeKey );
218
+
219
+ Expression condition = Expression .builder ()
220
+ // Check that the version matches the existing value before setting the updated value.
221
+ .expression (String .format ("%s = %s" , attributeKeyRef , existingVersionValueKey ))
222
+ .expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey ))
223
+ .expressionValues (Collections .singletonMap (existingVersionValueKey ,
224
+ existingVersionValue ))
225
+ .build ();
226
+
227
+ return Pair .of (newVersionValue , condition );
228
+ }
229
+
230
+ private int getExistingVersion (AttributeValue existingVersionValue ) {
231
+ if (existingVersionValue .n () == null ) {
232
+ // In this case a non-null version attribute is present, but it's not an N
233
+ throw new IllegalArgumentException ("Version attribute appears to be the wrong type. N is required." );
234
+ }
235
+
236
+ return Integer .parseInt (existingVersionValue .n ());
237
+ }
238
+
239
+ private AttributeValue incrementVersion (int version , DynamoDbExtensionContext .BeforeWrite context ) {
240
+ Optional <Integer > versionIncrementByFromAnnotation = context .tableMetadata ()
241
+ .customMetadataObject (VersionAttribute .INCREMENT_BY_METADATA_KEY ,
242
+ Integer .class );
243
+ if (versionIncrementByFromAnnotation .isPresent ()) {
244
+ return AttributeValue .fromN (Integer .toString (version + versionIncrementByFromAnnotation .get ()));
245
+ }
246
+ return AttributeValue .fromN (Integer .toString (version + this .incrementBy ));
247
+ }
248
+
145
249
@ NotThreadSafe
146
250
public static final class Builder {
251
+ private int startAt = 0 ;
252
+ private int incrementBy = 1 ;
253
+
147
254
private Builder () {
148
255
}
149
256
257
+ /**
258
+ * Sets the startAt used to compare if a record is the initial version of a record.
259
+ * Default value - {@code 0}.
260
+ *
261
+ * @param startAt
262
+ * @return the builder instance
263
+ */
264
+ public Builder startAt (int startAt ) {
265
+ this .startAt = startAt ;
266
+ return this ;
267
+ }
268
+
269
+ /**
270
+ * Sets the amount to increment the version by with each subsequent update.
271
+ * Default value - {@code 1}.
272
+ *
273
+ * @param incrementBy
274
+ * @return the builder instance
275
+ */
276
+ public Builder incrementBy (int incrementBy ) {
277
+ this .incrementBy = incrementBy ;
278
+ return this ;
279
+ }
280
+
150
281
public VersionedRecordExtension build () {
151
- return new VersionedRecordExtension ();
282
+ return new VersionedRecordExtension (this . startAt , this . incrementBy );
152
283
}
153
284
}
154
285
}
0 commit comments