16
16
17
17
package org .springframework .kafka .retrytopic ;
18
18
19
- import java .util .Arrays ;
20
19
import java .util .List ;
21
20
import java .util .function .BiPredicate ;
22
- import java .util .stream .Collectors ;
23
21
import java .util .stream .IntStream ;
24
22
25
23
import org .springframework .classify .BinaryExceptionClassifier ;
36
34
* @author Tomaz Fernandes
37
35
* @author Gary Russell
38
36
* @author João Lima
37
+ * @author Wang Zhiyang
39
38
* @since 2.7
40
39
*
41
40
*/
@@ -47,19 +46,21 @@ public class DestinationTopicPropertiesFactory {
47
46
48
47
private final List <Long > backOffValues ;
49
48
50
- private final BinaryExceptionClassifier exceptionClassifier ;
51
-
52
49
private final int numPartitions ;
53
50
54
51
private final int maxAttempts ;
55
52
56
- private final KafkaOperations <?, ?> kafkaOperations ;
53
+ private final boolean isSameIntervalReuse ;
57
54
58
- private final DltStrategy dltStrategy ;
55
+ private final boolean isFixedDelay ;
56
+
57
+ private final int retryTopicsAmount ;
58
+
59
+ private final BiPredicate <Integer , Throwable > shouldRetryOn ;
59
60
60
- private final TopicSuffixingStrategy topicSuffixingStrategy ;
61
+ private final KafkaOperations <?, ?> kafkaOperations ;
61
62
62
- private final SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy ;
63
+ private final DltStrategy dltStrategy ;
63
64
64
65
private final long timeout ;
65
66
@@ -90,15 +91,19 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
90
91
91
92
this .dltStrategy = dltStrategy ;
92
93
this .kafkaOperations = kafkaOperations ;
93
- this .exceptionClassifier = exceptionClassifier ;
94
94
this .numPartitions = numPartitions ;
95
- this .topicSuffixingStrategy = topicSuffixingStrategy ;
96
- this .sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy ;
97
95
this .timeout = timeout ;
98
96
this .destinationTopicSuffixes = new DestinationTopicSuffixes (retryTopicSuffix , dltSuffix );
99
97
this .backOffValues = backOffValues ;
100
- // Max Attempts include the initial try.
101
- this .maxAttempts = this .backOffValues .size () + 1 ;
98
+ // Max Attempts to include the initial try.
99
+ int backOffValuesSize = this .backOffValues .size ();
100
+ this .maxAttempts = backOffValuesSize + 1 ;
101
+ this .shouldRetryOn = (attempt , throwable ) -> attempt < this .maxAttempts
102
+ && exceptionClassifier .classify (throwable );
103
+ this .isSameIntervalReuse = SameIntervalTopicReuseStrategy .SINGLE_TOPIC .equals (sameIntervalTopicReuseStrategy );
104
+ this .retryTopicsAmount = backOffValuesSize - reusableTopicAttempts ();
105
+ this .isFixedDelay = TopicSuffixingStrategy .SUFFIX_WITH_INDEX_VALUE .equals (topicSuffixingStrategy )
106
+ || backOffValuesSize > 1 && backOffValues .stream ().distinct ().count () == 1 ;
102
107
}
103
108
104
109
/**
@@ -113,71 +118,26 @@ public DestinationTopicPropertiesFactory autoStartDltHandler(@Nullable Boolean a
113
118
}
114
119
115
120
public List <DestinationTopic .Properties > createProperties () {
116
- return isSingleTopicFixedDelay ()
117
- ? createPropertiesForFixedDelaySingleTopic ()
118
- : createPropertiesForDefaultTopicStrategy ();
119
- }
120
-
121
- private List <DestinationTopic .Properties > createPropertiesForFixedDelaySingleTopic () {
122
- return isNoDltStrategy ()
123
- ? Arrays .asList (createMainTopicProperties (),
124
- createRetryProperties (1 , getShouldRetryOn ()))
125
- : Arrays .asList (createMainTopicProperties (),
126
- createRetryProperties (1 , getShouldRetryOn ()),
127
- createDltProperties ());
128
- }
129
-
130
- private boolean isSingleTopicFixedDelay () {
131
- return (this .backOffValues .size () == 1 || isFixedDelay ()) && isSingleTopicSameIntervalTopicReuseStrategy ();
132
- }
133
-
134
- private boolean isSingleTopicSameIntervalTopicReuseStrategy () {
135
- return SameIntervalTopicReuseStrategy .SINGLE_TOPIC .equals (this .sameIntervalTopicReuseStrategy );
136
- }
137
-
138
- private List <DestinationTopic .Properties > createPropertiesForDefaultTopicStrategy () {
139
-
140
- int retryTopicsAmount = retryTopicsAmount ();
141
-
142
- return IntStream .rangeClosed (0 , isNoDltStrategy ()
143
- ? retryTopicsAmount
144
- : retryTopicsAmount + 1 )
145
- .mapToObj (this ::createTopicProperties )
146
- .collect (Collectors .toList ());
147
- }
148
-
149
- int retryTopicsAmount () {
150
- return this .backOffValues .size () - reusableTopicAttempts ();
151
- }
152
-
153
- private int reusableTopicAttempts () {
154
- return this .backOffValues .size () > 0
155
- ? !isFixedDelay ()
156
- ? isSingleTopicSameIntervalTopicReuseStrategy ()
157
- // Assuming that duplicates are always in
158
- // the end of the list.
159
- ? amountOfDuplicates (this .backOffValues .get (this .backOffValues .size () - 1 )) - 1
160
- : 0
161
- : 0
162
- : 0 ;
163
- }
164
-
165
- private boolean isNoDltStrategy () {
166
- return DltStrategy .NO_DLT .equals (this .dltStrategy );
121
+ int topicAmount = DltStrategy .NO_DLT .equals (this .dltStrategy )
122
+ ? this .retryTopicsAmount
123
+ : this .retryTopicsAmount + 1 ;
124
+ return IntStream
125
+ .rangeClosed (0 , topicAmount )
126
+ .mapToObj (this ::createTopicProperties )
127
+ .toList ();
167
128
}
168
129
169
130
private DestinationTopic .Properties createTopicProperties (int index ) {
170
- BiPredicate <Integer , Throwable > shouldRetryOn = getShouldRetryOn ();
171
131
return index == 0
172
132
? createMainTopicProperties ()
173
- : ( index <= this .retryTopicsAmount ())
174
- ? createRetryProperties (index , shouldRetryOn )
133
+ : index <= this .retryTopicsAmount
134
+ ? createRetryProperties (index )
175
135
: createDltProperties ();
176
136
}
177
137
178
138
private DestinationTopic .Properties createMainTopicProperties () {
179
139
return new DestinationTopic .Properties (0 , MAIN_TOPIC_SUFFIX , DestinationTopic .Type .MAIN , this .maxAttempts ,
180
- this .numPartitions , this .dltStrategy , this .kafkaOperations , getShouldRetryOn () , this .timeout );
140
+ this .numPartitions , this .dltStrategy , this .kafkaOperations , this . shouldRetryOn , this .timeout );
181
141
}
182
142
183
143
private DestinationTopic .Properties createDltProperties () {
@@ -186,49 +146,42 @@ private DestinationTopic.Properties createDltProperties() {
186
146
this .kafkaOperations , (a , e ) -> false , this .timeout , this .autoStartDltHandler );
187
147
}
188
148
189
- private BiPredicate <Integer , Throwable > getShouldRetryOn () {
190
- return (attempt , throwable ) -> attempt < this .maxAttempts && this .exceptionClassifier .classify (throwable );
191
- }
192
-
193
- private DestinationTopic .Properties createRetryProperties (int index ,
194
- BiPredicate <Integer , Throwable > shouldRetryOn ) {
195
-
149
+ private DestinationTopic .Properties createRetryProperties (int index ) {
196
150
int indexInBackoffValues = index - 1 ;
197
- Long thisBackOffValue = this .backOffValues .get (indexInBackoffValues );
198
- DestinationTopic .Type topicTypeToUse = isDelayWithReusedTopic (thisBackOffValue )
199
- ? Type .REUSABLE_RETRY_TOPIC
200
- : Type .RETRY ;
201
- return createProperties (topicTypeToUse , shouldRetryOn , indexInBackoffValues ,
202
- getTopicSuffix (indexInBackoffValues , thisBackOffValue ));
203
- }
204
-
205
- private String getTopicSuffix (int indexInBackoffValues , Long thisBackOffValue ) {
206
- return isSingleTopicFixedDelay ()
207
- ? this .destinationTopicSuffixes .getRetrySuffix ()
208
- : isSuffixWithIndexStrategy () || isFixedDelay ()
209
- ? joinWithRetrySuffix (indexInBackoffValues )
210
- : hasDuplicates (thisBackOffValue )
211
- ? joinWithRetrySuffix (thisBackOffValue )
212
- .concat (suffixForRepeatedInterval (indexInBackoffValues , thisBackOffValue ))
213
- : joinWithRetrySuffix (thisBackOffValue );
214
- }
215
-
216
- private String suffixForRepeatedInterval (int indexInBackoffValues , Long thisBackOffValue ) {
217
- return isSingleTopicSameIntervalTopicReuseStrategy ()
218
- ? ""
219
- : "-" + getIndexInBackoffValues (indexInBackoffValues , thisBackOffValue );
151
+ long thisBackOffValue = this .backOffValues .get (indexInBackoffValues );
152
+ return createProperties (thisBackOffValue , getTopicSuffix (indexInBackoffValues , thisBackOffValue ));
220
153
}
221
154
222
- private boolean isDelayWithReusedTopic (Long backoffValue ) {
223
- return hasDuplicates (backoffValue ) && isSingleTopicSameIntervalTopicReuseStrategy ();
155
+ private String getTopicSuffix (int indexInBackoffValues , long thisBackOffValue ) {
156
+ if (this .isSameIntervalReuse && this .retryTopicsAmount == 1 ) {
157
+ return this .destinationTopicSuffixes .getRetrySuffix ();
158
+ }
159
+ else if (this .isFixedDelay ) {
160
+ return joinWithRetrySuffix (indexInBackoffValues );
161
+ }
162
+ else {
163
+ String retrySuffix = joinWithRetrySuffix (thisBackOffValue );
164
+ if (!this .isSameIntervalReuse && hasDuplicates (thisBackOffValue )) {
165
+ return retrySuffix .concat ("-" + getIndexInBackoffValues (indexInBackoffValues , thisBackOffValue ));
166
+ }
167
+ return retrySuffix ;
168
+ }
224
169
}
225
170
226
171
private int getIndexInBackoffValues (int indexInBackoffValues , Long thisBackOffValue ) {
227
172
return indexInBackoffValues - this .backOffValues .indexOf (thisBackOffValue );
228
173
}
229
174
230
- private boolean isSuffixWithIndexStrategy () {
231
- return TopicSuffixingStrategy .SUFFIX_WITH_INDEX_VALUE .equals (this .topicSuffixingStrategy );
175
+ private DestinationTopic .Type getDestinationTopicType (Long backOffValue ) {
176
+ return this .isSameIntervalReuse && hasDuplicates (backOffValue ) ? Type .REUSABLE_RETRY_TOPIC : Type .RETRY ;
177
+ }
178
+
179
+ private int reusableTopicAttempts () {
180
+ if (this .isSameIntervalReuse && this .backOffValues .size () > 1 ) {
181
+ // Assuming that duplicates are always at the end of the list.
182
+ return amountOfDuplicates (this .backOffValues .get (this .backOffValues .size () - 1 )) - 1 ;
183
+ }
184
+ return 0 ;
232
185
}
233
186
234
187
private boolean hasDuplicates (Long thisBackOffValue ) {
@@ -238,22 +191,15 @@ private boolean hasDuplicates(Long thisBackOffValue) {
238
191
private int amountOfDuplicates (Long thisBackOffValue ) {
239
192
return Long .valueOf (this .backOffValues
240
193
.stream ()
241
- .filter (value -> value .equals (thisBackOffValue ))
242
- .count ()).intValue ();
243
- }
244
-
245
- private DestinationTopic .Properties createProperties (DestinationTopic .Type topicType ,
246
- BiPredicate <Integer , Throwable > shouldRetryOn ,
247
- int indexInBackoffValues ,
248
- String suffix ) {
249
- return new DestinationTopic .Properties (this .backOffValues .get (indexInBackoffValues ), suffix ,
250
- topicType , this .maxAttempts , this .numPartitions , this .dltStrategy ,
251
- this .kafkaOperations , shouldRetryOn , this .timeout );
194
+ .filter (thisBackOffValue ::equals )
195
+ .count ())
196
+ .intValue ();
252
197
}
253
198
254
- private boolean isFixedDelay () {
255
- // If all values are the same, such as in NoBackOffPolicy and FixedBackoffPolicy
256
- return this .backOffValues .size () > 1 && this .backOffValues .stream ().distinct ().count () == 1 ;
199
+ private DestinationTopic .Properties createProperties (long delayMs , String suffix ) {
200
+ return new DestinationTopic .Properties (delayMs , suffix , getDestinationTopicType (delayMs ),
201
+ this .maxAttempts , this .numPartitions , this .dltStrategy , this .kafkaOperations , this .shouldRetryOn ,
202
+ this .timeout );
257
203
}
258
204
259
205
private String joinWithRetrySuffix (long parameter ) {
0 commit comments