Skip to content

Commit 1a9d63d

Browse files
committed
Polish DestinationTopicPropertiesFactory and DestinationTopicPropertiesFactoryTests
1 parent f6b766e commit 1a9d63d

File tree

2 files changed

+80
-149
lines changed

2 files changed

+80
-149
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java

Lines changed: 62 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616

1717
package org.springframework.kafka.retrytopic;
1818

19-
import java.util.Arrays;
2019
import java.util.List;
2120
import java.util.function.BiPredicate;
22-
import java.util.stream.Collectors;
2321
import java.util.stream.IntStream;
2422

2523
import org.springframework.classify.BinaryExceptionClassifier;
@@ -36,6 +34,7 @@
3634
* @author Tomaz Fernandes
3735
* @author Gary Russell
3836
* @author João Lima
37+
* @author Wang Zhiyang
3938
* @since 2.7
4039
*
4140
*/
@@ -47,19 +46,21 @@ public class DestinationTopicPropertiesFactory {
4746

4847
private final List<Long> backOffValues;
4948

50-
private final BinaryExceptionClassifier exceptionClassifier;
51-
5249
private final int numPartitions;
5350

5451
private final int maxAttempts;
5552

56-
private final KafkaOperations<?, ?> kafkaOperations;
53+
private final boolean isSameIntervalReuse;
5754

58-
private final DltStrategy dltStrategy;
55+
private final boolean isFixedDelay;
56+
57+
private final int retryTopicsAmount;
58+
59+
private final BiPredicate<Integer, Throwable> shouldRetryOn;
5960

60-
private final TopicSuffixingStrategy topicSuffixingStrategy;
61+
private final KafkaOperations<?, ?> kafkaOperations;
6162

62-
private final SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy;
63+
private final DltStrategy dltStrategy;
6364

6465
private final long timeout;
6566

@@ -90,15 +91,19 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
9091

9192
this.dltStrategy = dltStrategy;
9293
this.kafkaOperations = kafkaOperations;
93-
this.exceptionClassifier = exceptionClassifier;
9494
this.numPartitions = numPartitions;
95-
this.topicSuffixingStrategy = topicSuffixingStrategy;
96-
this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy;
9795
this.timeout = timeout;
9896
this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix);
9997
this.backOffValues = backOffValues;
100-
// Max Attempts include the initial try.
101-
this.maxAttempts = this.backOffValues.size() + 1;
98+
// Max Attempts to include the initial try.
99+
int backOffValuesSize = this.backOffValues.size();
100+
this.maxAttempts = backOffValuesSize + 1;
101+
this.shouldRetryOn = (attempt, throwable) -> attempt < this.maxAttempts
102+
&& exceptionClassifier.classify(throwable);
103+
this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy);
104+
this.retryTopicsAmount = backOffValuesSize - reusableTopicAttempts();
105+
this.isFixedDelay = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE.equals(topicSuffixingStrategy)
106+
|| backOffValuesSize > 1 && backOffValues.stream().distinct().count() == 1;
102107
}
103108

104109
/**
@@ -113,71 +118,26 @@ public DestinationTopicPropertiesFactory autoStartDltHandler(@Nullable Boolean a
113118
}
114119

115120
public List<DestinationTopic.Properties> createProperties() {
116-
return isSingleTopicFixedDelay()
117-
? createPropertiesForFixedDelaySingleTopic()
118-
: createPropertiesForDefaultTopicStrategy();
119-
}
120-
121-
private List<DestinationTopic.Properties> createPropertiesForFixedDelaySingleTopic() {
122-
return isNoDltStrategy()
123-
? Arrays.asList(createMainTopicProperties(),
124-
createRetryProperties(1, getShouldRetryOn()))
125-
: Arrays.asList(createMainTopicProperties(),
126-
createRetryProperties(1, getShouldRetryOn()),
127-
createDltProperties());
128-
}
129-
130-
private boolean isSingleTopicFixedDelay() {
131-
return (this.backOffValues.size() == 1 || isFixedDelay()) && isSingleTopicSameIntervalTopicReuseStrategy();
132-
}
133-
134-
private boolean isSingleTopicSameIntervalTopicReuseStrategy() {
135-
return SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(this.sameIntervalTopicReuseStrategy);
136-
}
137-
138-
private List<DestinationTopic.Properties> createPropertiesForDefaultTopicStrategy() {
139-
140-
int retryTopicsAmount = retryTopicsAmount();
141-
142-
return IntStream.rangeClosed(0, isNoDltStrategy()
143-
? retryTopicsAmount
144-
: retryTopicsAmount + 1)
145-
.mapToObj(this::createTopicProperties)
146-
.collect(Collectors.toList());
147-
}
148-
149-
int retryTopicsAmount() {
150-
return this.backOffValues.size() - reusableTopicAttempts();
151-
}
152-
153-
private int reusableTopicAttempts() {
154-
return this.backOffValues.size() > 0
155-
? !isFixedDelay()
156-
? isSingleTopicSameIntervalTopicReuseStrategy()
157-
// Assuming that duplicates are always in
158-
// the end of the list.
159-
? amountOfDuplicates(this.backOffValues.get(this.backOffValues.size() - 1)) - 1
160-
: 0
161-
: 0
162-
: 0;
163-
}
164-
165-
private boolean isNoDltStrategy() {
166-
return DltStrategy.NO_DLT.equals(this.dltStrategy);
121+
int topicAmount = DltStrategy.NO_DLT.equals(this.dltStrategy)
122+
? this.retryTopicsAmount
123+
: this.retryTopicsAmount + 1;
124+
return IntStream
125+
.rangeClosed(0, topicAmount)
126+
.mapToObj(this::createTopicProperties)
127+
.toList();
167128
}
168129

169130
private DestinationTopic.Properties createTopicProperties(int index) {
170-
BiPredicate<Integer, Throwable> shouldRetryOn = getShouldRetryOn();
171131
return index == 0
172132
? createMainTopicProperties()
173-
: (index <= this.retryTopicsAmount())
174-
? createRetryProperties(index, shouldRetryOn)
133+
: index <= this.retryTopicsAmount
134+
? createRetryProperties(index)
175135
: createDltProperties();
176136
}
177137

178138
private DestinationTopic.Properties createMainTopicProperties() {
179139
return new DestinationTopic.Properties(0, MAIN_TOPIC_SUFFIX, DestinationTopic.Type.MAIN, this.maxAttempts,
180-
this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout);
140+
this.numPartitions, this.dltStrategy, this.kafkaOperations, this.shouldRetryOn, this.timeout);
181141
}
182142

183143
private DestinationTopic.Properties createDltProperties() {
@@ -186,49 +146,42 @@ private DestinationTopic.Properties createDltProperties() {
186146
this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler);
187147
}
188148

189-
private BiPredicate<Integer, Throwable> getShouldRetryOn() {
190-
return (attempt, throwable) -> attempt < this.maxAttempts && this.exceptionClassifier.classify(throwable);
191-
}
192-
193-
private DestinationTopic.Properties createRetryProperties(int index,
194-
BiPredicate<Integer, Throwable> shouldRetryOn) {
195-
149+
private DestinationTopic.Properties createRetryProperties(int index) {
196150
int indexInBackoffValues = index - 1;
197-
Long thisBackOffValue = this.backOffValues.get(indexInBackoffValues);
198-
DestinationTopic.Type topicTypeToUse = isDelayWithReusedTopic(thisBackOffValue)
199-
? Type.REUSABLE_RETRY_TOPIC
200-
: Type.RETRY;
201-
return createProperties(topicTypeToUse, shouldRetryOn, indexInBackoffValues,
202-
getTopicSuffix(indexInBackoffValues, thisBackOffValue));
203-
}
204-
205-
private String getTopicSuffix(int indexInBackoffValues, Long thisBackOffValue) {
206-
return isSingleTopicFixedDelay()
207-
? this.destinationTopicSuffixes.getRetrySuffix()
208-
: isSuffixWithIndexStrategy() || isFixedDelay()
209-
? joinWithRetrySuffix(indexInBackoffValues)
210-
: hasDuplicates(thisBackOffValue)
211-
? joinWithRetrySuffix(thisBackOffValue)
212-
.concat(suffixForRepeatedInterval(indexInBackoffValues, thisBackOffValue))
213-
: joinWithRetrySuffix(thisBackOffValue);
214-
}
215-
216-
private String suffixForRepeatedInterval(int indexInBackoffValues, Long thisBackOffValue) {
217-
return isSingleTopicSameIntervalTopicReuseStrategy()
218-
? ""
219-
: "-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue);
151+
long thisBackOffValue = this.backOffValues.get(indexInBackoffValues);
152+
return createProperties(thisBackOffValue, getTopicSuffix(indexInBackoffValues, thisBackOffValue));
220153
}
221154

222-
private boolean isDelayWithReusedTopic(Long backoffValue) {
223-
return hasDuplicates(backoffValue) && isSingleTopicSameIntervalTopicReuseStrategy();
155+
private String getTopicSuffix(int indexInBackoffValues, long thisBackOffValue) {
156+
if (this.isSameIntervalReuse && this.retryTopicsAmount == 1) {
157+
return this.destinationTopicSuffixes.getRetrySuffix();
158+
}
159+
else if (this.isFixedDelay) {
160+
return joinWithRetrySuffix(indexInBackoffValues);
161+
}
162+
else {
163+
String retrySuffix = joinWithRetrySuffix(thisBackOffValue);
164+
if (!this.isSameIntervalReuse && hasDuplicates(thisBackOffValue)) {
165+
return retrySuffix.concat("-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue));
166+
}
167+
return retrySuffix;
168+
}
224169
}
225170

226171
private int getIndexInBackoffValues(int indexInBackoffValues, Long thisBackOffValue) {
227172
return indexInBackoffValues - this.backOffValues.indexOf(thisBackOffValue);
228173
}
229174

230-
private boolean isSuffixWithIndexStrategy() {
231-
return TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE.equals(this.topicSuffixingStrategy);
175+
private DestinationTopic.Type getDestinationTopicType(Long backOffValue) {
176+
return this.isSameIntervalReuse && hasDuplicates(backOffValue) ? Type.REUSABLE_RETRY_TOPIC : Type.RETRY;
177+
}
178+
179+
private int reusableTopicAttempts() {
180+
if (this.isSameIntervalReuse && this.backOffValues.size() > 1) {
181+
// Assuming that duplicates are always at the end of the list.
182+
return amountOfDuplicates(this.backOffValues.get(this.backOffValues.size() - 1)) - 1;
183+
}
184+
return 0;
232185
}
233186

234187
private boolean hasDuplicates(Long thisBackOffValue) {
@@ -238,22 +191,15 @@ private boolean hasDuplicates(Long thisBackOffValue) {
238191
private int amountOfDuplicates(Long thisBackOffValue) {
239192
return Long.valueOf(this.backOffValues
240193
.stream()
241-
.filter(value -> value.equals(thisBackOffValue))
242-
.count()).intValue();
243-
}
244-
245-
private DestinationTopic.Properties createProperties(DestinationTopic.Type topicType,
246-
BiPredicate<Integer, Throwable> shouldRetryOn,
247-
int indexInBackoffValues,
248-
String suffix) {
249-
return new DestinationTopic.Properties(this.backOffValues.get(indexInBackoffValues), suffix,
250-
topicType, this.maxAttempts, this.numPartitions, this.dltStrategy,
251-
this.kafkaOperations, shouldRetryOn, this.timeout);
194+
.filter(thisBackOffValue::equals)
195+
.count())
196+
.intValue();
252197
}
253198

254-
private boolean isFixedDelay() {
255-
// If all values are the same, such as in NoBackOffPolicy and FixedBackoffPolicy
256-
return this.backOffValues.size() > 1 && this.backOffValues.stream().distinct().count() == 1;
199+
private DestinationTopic.Properties createProperties(long delayMs, String suffix) {
200+
return new DestinationTopic.Properties(delayMs, suffix, getDestinationTopicType(delayMs),
201+
this.maxAttempts, this.numPartitions, this.dltStrategy, this.kafkaOperations, this.shouldRetryOn,
202+
this.timeout);
257203
}
258204

259205
private String joinWithRetrySuffix(long parameter) {

0 commit comments

Comments
 (0)