Skip to content

Commit 3ee58fb

Browse files
authored
Implementation deserialize() for ByteBuffer
adapt apache kafka feature, reduce `CompletedFetch#parseRecord()` memory copy, details see https://issues.apache.org/jira/browse/KAFKA-14944
1 parent 69f201d commit 3ee58fb

File tree

10 files changed

+200
-100
lines changed

10 files changed

+200
-100
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicDeserializer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19+
import java.nio.ByteBuffer;
1920
import java.util.Map;
2021
import java.util.regex.Pattern;
2122

@@ -26,6 +27,8 @@
2627
* A {@link Deserializer} that delegates to other deserializers based on the topic name.
2728
*
2829
* @author Gary Russell
30+
* @author Wang Zhiyang
31+
*
2932
* @since 2.8
3033
*
3134
*/
@@ -75,4 +78,9 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
7578
return findDelegate(topic).deserialize(topic, headers, data);
7679
}
7780

81+
@Override
82+
public Object deserialize(String topic, Headers headers, ByteBuffer data) {
83+
return findDelegate(topic).deserialize(topic, headers, data);
84+
}
85+
7886
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import java.util.regex.Pattern;
2727

2828
import org.springframework.core.log.LogAccessor;
29+
import org.springframework.lang.NonNull;
2930
import org.springframework.lang.Nullable;
3031
import org.springframework.util.Assert;
3132
import org.springframework.util.ClassUtils;
@@ -37,6 +38,8 @@
3738
* @param <T> the type.
3839
*
3940
* @author Gary Russell
41+
* @author Wang Zhiyang
42+
*
4043
* @since 2.8
4144
*
4245
*/
@@ -110,14 +113,14 @@ protected void configure(Map<String, ?> configs, boolean isKey) {
110113
}
111114
this.forKeys = isKey;
112115
Object insensitive = configs.get(CASE_SENSITIVE);
113-
if (insensitive instanceof String) {
114-
this.cased = Boolean.parseBoolean((String) insensitive);
116+
if (insensitive instanceof String insensitiveString) {
117+
this.cased = Boolean.parseBoolean(insensitiveString);
115118
}
116-
else if (insensitive instanceof Boolean) {
117-
this.cased = (Boolean) insensitive;
119+
else if (insensitive instanceof Boolean insensitiveBoolean) {
120+
this.cased = insensitiveBoolean;
118121
}
119122
String configKey = defaultKey();
120-
if (configKey != null && configs.containsKey(configKey)) {
123+
if (configs.containsKey(configKey)) {
121124
buildDefault(configs, configKey, isKey, configs.get(configKey));
122125
}
123126
configKey = configKey();
@@ -128,15 +131,16 @@ else if (insensitive instanceof Boolean) {
128131
else if (value instanceof Map) {
129132
processMap(configs, isKey, configKey, (Map<Object, Object>) value);
130133
}
131-
else if (value instanceof String) {
132-
this.delegates.putAll(createDelegates((String) value, configs, isKey));
134+
else if (value instanceof String mappings) {
135+
this.delegates.putAll(createDelegates(mappings, configs, isKey));
133136
}
134137
else {
135138
throw new IllegalStateException(
136139
configKey + " must be a map or String, not " + value.getClass());
137140
}
138141
}
139142

143+
@NonNull
140144
private String defaultKey() {
141145
return this.forKeys ? KEY_SERIALIZATION_TOPIC_DEFAULT : VALUE_SERIALIZATION_TOPIC_DEFAULT;
142146
}
@@ -163,11 +167,11 @@ protected void build(Map<String, ?> configs, boolean isKey, String configKey, Ob
163167
this.delegates.put(pattern, (T) delegate);
164168
configureDelegate(configs, isKey, (T) delegate);
165169
}
166-
else if (delegate instanceof Class) {
167-
instantiateAndConfigure(configs, isKey, this.delegates, pattern, (Class<?>) delegate);
170+
else if (delegate instanceof Class<?> clazz) {
171+
instantiateAndConfigure(configs, isKey, this.delegates, pattern, clazz);
168172
}
169-
else if (delegate instanceof String) {
170-
createInstanceAndConfigure(configs, isKey, this.delegates, pattern, (String) delegate);
173+
else if (delegate instanceof String className) {
174+
createInstanceAndConfigure(configs, isKey, this.delegates, pattern, className);
171175
}
172176
else {
173177
throw new IllegalStateException(configKey
@@ -181,11 +185,11 @@ protected void buildDefault(Map<String, ?> configs, String configKey, boolean is
181185
if (isInstance(delegate)) {
182186
this.defaultDelegate = configureDelegate(configs, isKey, (T) delegate);
183187
}
184-
else if (delegate instanceof Class) {
185-
this.defaultDelegate = instantiateAndConfigure(configs, isKey, this.delegates, null, (Class<?>) delegate);
188+
else if (delegate instanceof Class<?> clazz) {
189+
this.defaultDelegate = instantiateAndConfigure(configs, isKey, this.delegates, null, clazz);
186190
}
187-
else if (delegate instanceof String) {
188-
this.defaultDelegate = createInstanceAndConfigure(configs, isKey, this.delegates, null, (String) delegate);
191+
else if (delegate instanceof String className) {
192+
this.defaultDelegate = createInstanceAndConfigure(configs, isKey, this.delegates, null, className);
189193
}
190194
else {
191195
throw new IllegalStateException(configKey
@@ -236,15 +240,15 @@ private T createInstanceAndConfigure(Map<String, ?> configs, boolean isKey,
236240
}
237241

238242
private Pattern obtainPattern(Object key) {
239-
if (key instanceof Pattern) {
240-
return (Pattern) key;
243+
if (key instanceof Pattern pattern) {
244+
return pattern;
241245
}
242-
else if (key instanceof String) {
246+
else if (key instanceof String regex) {
243247
if (this.cased) {
244-
return Pattern.compile(((String) key).trim());
248+
return Pattern.compile(regex.trim());
245249
}
246250
else {
247-
return Pattern.compile(((String) key).trim(), Pattern.CASE_INSENSITIVE);
251+
return Pattern.compile(regex.trim(), Pattern.CASE_INSENSITIVE);
248252
}
249253
}
250254
else {
@@ -287,7 +291,6 @@ public T removeDelegate(Pattern pattern) {
287291
* @param topic the topic.
288292
* @return the delegate.
289293
*/
290-
@SuppressWarnings(UNCHECKED)
291294
protected T findDelegate(String topic) {
292295
T delegate = null;
293296
for (Entry<Pattern, T> entry : this.delegates.entrySet()) {

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19+
import java.nio.ByteBuffer;
1920
import java.util.HashMap;
2021
import java.util.Map;
2122
import java.util.concurrent.ConcurrentHashMap;
@@ -37,12 +38,14 @@
3738
* {@link Serdes}.
3839
*
3940
* @author Gary Russell
41+
* @author Wang Zhiyang
42+
*
4043
* @since 2.3
4144
*
4245
*/
4346
public class DelegatingDeserializer implements Deserializer<Object> {
4447

45-
private final Map<String, Deserializer<? extends Object>> delegates = new ConcurrentHashMap<>();
48+
private final Map<String, Deserializer<?>> delegates = new ConcurrentHashMap<>();
4649

4750
private final Map<String, Object> autoConfigs = new HashMap<>();
4851

@@ -81,24 +84,24 @@ public void configure(Map<String, ?> configs, boolean isKey) {
8184
}
8285
if (value instanceof Map) {
8386
((Map<String, Object>) value).forEach((selector, deser) -> {
84-
if (deser instanceof Deserializer) {
85-
this.delegates.put(selector, (Deserializer<?>) deser);
86-
((Deserializer<?>) deser).configure(configs, isKey);
87+
if (deser instanceof Deserializer<?> clazz) {
88+
this.delegates.put(selector, clazz);
89+
clazz.configure(configs, isKey);
8790
}
88-
else if (deser instanceof Class) {
89-
instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class<?>) deser);
91+
else if (deser instanceof Class<?> clazz) {
92+
instantiateAndConfigure(configs, isKey, this.delegates, selector, clazz);
9093
}
91-
else if (deser instanceof String) {
92-
createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) deser);
94+
else if (deser instanceof String className) {
95+
createInstanceAndConfigure(configs, isKey, this.delegates, selector, className);
9396
}
9497
else {
9598
throw new IllegalStateException(configKey
9699
+ " map entries must be Serializers or class names, not " + value.getClass());
97100
}
98101
});
99102
}
100-
else if (value instanceof String) {
101-
this.delegates.putAll(createDelegates((String) value, configs, isKey));
103+
else if (value instanceof String mappings) {
104+
this.delegates.putAll(createDelegates(mappings, configs, isKey));
102105
}
103106
else {
104107
throw new IllegalStateException(configKey + " must be a map or String, not " + value.getClass());
@@ -165,6 +168,17 @@ public Object deserialize(String topic, byte[] data) {
165168

166169
@Override
167170
public Object deserialize(String topic, Headers headers, byte[] data) {
171+
Deserializer<?> deserializer = getDeserializerByHeaders(headers);
172+
return deserializer == null ? data : deserializer.deserialize(topic, headers, data);
173+
}
174+
175+
@Override
176+
public Object deserialize(String topic, Headers headers, ByteBuffer data) {
177+
Deserializer<?> deserializer = getDeserializerByHeaders(headers);
178+
return deserializer == null ? data : deserializer.deserialize(topic, headers, data);
179+
}
180+
181+
private Deserializer<?> getDeserializerByHeaders(Headers headers) {
168182
byte[] value = null;
169183
String selectorKey = selectorKey();
170184
Header header = headers.lastHeader(selectorKey);
@@ -175,16 +189,11 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
175189
throw new IllegalStateException("No '" + selectorKey + "' header present");
176190
}
177191
String selector = new String(value).replaceAll("\"", "");
178-
Deserializer<? extends Object> deserializer = this.delegates.get(selector);
192+
Deserializer<?> deserializer = this.delegates.get(selector);
179193
if (deserializer == null) {
180194
deserializer = trySerdes(selector);
181195
}
182-
if (deserializer == null) {
183-
return data;
184-
}
185-
else {
186-
return deserializer.deserialize(topic, headers, data);
187-
}
196+
return deserializer;
188197
}
189198

190199
private String selectorKey() {
@@ -197,11 +206,11 @@ private String selectorKey() {
197206
* Package for testing.
198207
*/
199208
@Nullable
200-
Deserializer<? extends Object> trySerdes(String key) {
209+
Deserializer<?> trySerdes(String key) {
201210
try {
202211
Class<?> clazz = ClassUtils.forName(key, ClassUtils.getDefaultClassLoader());
203-
Serde<? extends Object> serdeFrom = Serdes.serdeFrom(clazz);
204-
Deserializer<? extends Object> deserializer = serdeFrom.deserializer();
212+
Serde<?> serdeFrom = Serdes.serdeFrom(clazz);
213+
Deserializer<?> deserializer = serdeFrom.deserializer();
205214
deserializer.configure(this.autoConfigs, this.forKeys);
206215
this.delegates.put(key, deserializer);
207216
return deserializer;

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -52,6 +52,7 @@
5252
* @author Artem Bilan
5353
* @author Gary Russell
5454
* @author Elliot Kennedy
55+
* @author Wang Zhiyang
5556
*/
5657
public class JsonSerializer<T> implements Serializer<T> {
5758

@@ -156,20 +157,19 @@ public synchronized void configure(Map<String, ?> configs, boolean isKey) {
156157
setUseTypeMapperForKey(isKey);
157158
if (configs.containsKey(ADD_TYPE_INFO_HEADERS)) {
158159
Object config = configs.get(ADD_TYPE_INFO_HEADERS);
159-
if (config instanceof Boolean) {
160-
this.addTypeInfo = (Boolean) config;
160+
if (config instanceof Boolean configBoolean) {
161+
this.addTypeInfo = configBoolean;
161162
}
162-
else if (config instanceof String) {
163-
this.addTypeInfo = Boolean.valueOf((String) config);
163+
else if (config instanceof String configString) {
164+
this.addTypeInfo = Boolean.parseBoolean(configString);
164165
}
165166
else {
166167
throw new IllegalStateException(ADD_TYPE_INFO_HEADERS + " must be Boolean or String");
167168
}
168169
}
169170
if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet
170-
&& this.typeMapper instanceof AbstractJavaTypeMapper) {
171-
((AbstractJavaTypeMapper) this.typeMapper)
172-
.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS)));
171+
&& this.typeMapper instanceof AbstractJavaTypeMapper abstractJavaTypeMapper) {
172+
abstractJavaTypeMapper.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS)));
173173
}
174174
this.configured = true;
175175
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ParseStringDeserializer.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19+
import java.nio.ByteBuffer;
1920
import java.nio.charset.Charset;
2021
import java.nio.charset.StandardCharsets;
2122
import java.util.Map;
@@ -24,6 +25,7 @@
2425

2526
import org.apache.kafka.common.header.Headers;
2627
import org.apache.kafka.common.serialization.Deserializer;
28+
import org.apache.kafka.common.utils.Utils;
2729

2830
import org.springframework.util.Assert;
2931

@@ -35,6 +37,8 @@
3537
*
3638
* @author Alexei Klenin
3739
* @author Gary Russell
40+
* @author Wang Zhiyang
41+
*
3842
* @since 2.5
3943
*/
4044
public class ParseStringDeserializer<T> implements Deserializer<T> {
@@ -105,6 +109,23 @@ public T deserialize(String topic, Headers headers, byte[] data) {
105109
return this.parser.apply(data == null ? null : new String(data, this.charset), headers);
106110
}
107111

112+
@Override
113+
public T deserialize(String topic, Headers headers, ByteBuffer data) {
114+
String value = deserialize(data);
115+
return this.parser.apply(value, headers);
116+
}
117+
118+
private String deserialize(ByteBuffer data) {
119+
if (data == null) {
120+
return null;
121+
}
122+
123+
if (data.hasArray()) {
124+
return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), this.charset);
125+
}
126+
return new String(Utils.toArray(data), this.charset);
127+
}
128+
108129
/**
109130
* Set a charset to use when converting byte[] to {@link String}. Default UTF-8.
110131
* @param charset the charset.

0 commit comments

Comments
 (0)