Skip to content

Commit 01d7347

Browse files
OSON Kafka Serializers
Signed-off-by: Anders Swanson <anders.swanson@oracle.com>
1 parent 98f2702 commit 01d7347

File tree

7 files changed

+56
-23
lines changed

7 files changed

+56
-23
lines changed

database/spring-cloud-stream-binder-oracle-txeventq/spring-cloud-stream-binder-txeventq-sample/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
<properties>
4343
<maven.compiler.source>17</maven.compiler.source>
4444
<maven.compiler.target>17</maven.compiler.target>
45-
<txeventq.streambinder.version>0.12.0</txeventq.streambinder.version>
45+
<txeventq.streambinder.version>0.13.0</txeventq.streambinder.version>
4646
<spring.boot.version>3.4.3</spring.boot.version>
4747
<testcontainers.version>1.20.6</testcontainers.version>
4848
</properties>
@@ -86,4 +86,4 @@
8686
<scope>test</scope>
8787
</dependency>
8888
</dependencies>
89-
</project>
89+
</project>

database/starters/oracle-spring-boot-json-data-tools/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@
4848
</scm>
4949

5050
<dependencies>
51+
<!-- Include to enable OKafka Conversion beans -->
52+
<dependency>
53+
<groupId>com.oracle.database.spring</groupId>
54+
<artifactId>oracle-spring-boot-starter-okafka</artifactId>
55+
<version>${project.version}</version>
56+
<optional>true</optional>
57+
</dependency>
5158
<dependency>
5259
<groupId>com.oracle.database.spring</groupId>
5360
<artifactId>oracle-spring-boot-starter-ucp</artifactId>

database/starters/oracle-spring-boot-json-data-tools/src/main/java/com/oracle/spring/json/JsonCollectionsAutoConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
package com.oracle.spring.json;
44

55
import com.oracle.spring.json.jsonb.JSONB;
6+
import com.oracle.spring.json.kafka.OSONKafkaSerializationFactory;
67
import jakarta.json.bind.JsonbBuilder;
78
import oracle.sql.json.OracleJsonFactory;
9+
import org.apache.kafka.common.serialization.Deserializer;
10+
import org.apache.kafka.common.serialization.Serializer;
811
import org.eclipse.yasson.YassonJsonb;
912
import org.springframework.boot.autoconfigure.AutoConfiguration;
1013
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@@ -29,4 +32,13 @@ YassonJsonb yassonJsonb() {
2932
public JSONB jsonb(OracleJsonFactory oracleJsonFactory, YassonJsonb yassonJsonb) {
3033
return new JSONB(oracleJsonFactory, yassonJsonb);
3134
}
35+
36+
@Bean
37+
@ConditionalOnClass(value = {
38+
Deserializer.class,
39+
Serializer.class
40+
})
41+
public OSONKafkaSerializationFactory osonSerializationFactory(JSONB jsonb) {
42+
return new OSONKafkaSerializationFactory(jsonb);
43+
}
3244
}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
// Copyright (c) 2024, Oracle and/or its affiliates.
22
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3-
package com.oracle.database.spring.jsonevents.serde;
3+
package com.oracle.spring.json.kafka;
44

55
import java.nio.ByteBuffer;
66

77
import com.oracle.spring.json.jsonb.JSONB;
88
import org.apache.kafka.common.serialization.Deserializer;
99

1010
/**
11-
* The JSONBDeserializer converts JSONB byte arrays to java objects.
11+
* The OSONDeserializer converts JSONB byte arrays to java objects.
1212
* @param <T> deserialization type
1313
*/
14-
public class JSONBDeserializer<T> implements Deserializer<T> {
14+
public class OSONDeserializer<T> implements Deserializer<T> {
1515
private final JSONB jsonb;
1616
private final Class<T> clazz;
1717

18-
public JSONBDeserializer(JSONB jsonb, Class<T> clazz) {
18+
public OSONDeserializer(JSONB jsonb, Class<T> clazz) {
1919
this.jsonb = jsonb;
2020
this.clazz = clazz;
2121
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.spring.json.kafka;
4+
5+
import com.oracle.spring.json.jsonb.JSONB;
6+
7+
public class OSONKafkaSerializationFactory {
8+
private final JSONB jsonb;
9+
10+
public OSONKafkaSerializationFactory(JSONB jsonb) {
11+
this.jsonb = jsonb;
12+
}
13+
14+
public <T> OSONDeserializer<T> createDeserializer(Class<T> clazz) {
15+
return new OSONDeserializer<>(jsonb, clazz);
16+
}
17+
18+
public <T> OSONSerializer<T> createSerializer() {
19+
return new OSONSerializer<>(jsonb);
20+
}
21+
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
// Copyright (c) 2024, Oracle and/or its affiliates.
1+
// Copyright (c) 2025, Oracle and/or its affiliates.
22
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3-
package com.oracle.database.spring.jsonevents.serde;
3+
package com.oracle.spring.json.kafka;
44

55
import com.oracle.spring.json.jsonb.JSONB;
66
import org.apache.kafka.common.serialization.Serializer;
77

88
/**
9-
* The JSONBSerializer converts java objects to a JSONB byte array.
9+
* The OSONSerializer converts java objects to a JSONB byte array.
1010
* @param <T> serialization type.
1111
*/
12-
public class JSONBSerializer<T> implements Serializer<T> {
12+
public class OSONSerializer<T> implements Serializer<T> {
1313
private final JSONB jsonb;
1414

15-
public JSONBSerializer(JSONB jsonb) {
15+
public OSONSerializer(JSONB jsonb) {
1616
this.jsonb = jsonb;
1717
}
1818

database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-json-events/src/main/java/com/oracle/database/spring/jsonevents/OKafkaConfiguration.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
import java.util.Properties;
66

77
import com.oracle.database.spring.jsonevents.model.Sensor;
8-
import com.oracle.database.spring.jsonevents.serde.JSONBDeserializer;
9-
import com.oracle.database.spring.jsonevents.serde.JSONBSerializer;
10-
import com.oracle.spring.json.jsonb.JSONB;
8+
import com.oracle.spring.json.kafka.OSONKafkaSerializationFactory;
119
import org.apache.kafka.clients.consumer.Consumer;
1210
import org.apache.kafka.clients.producer.Producer;
1311
import org.apache.kafka.common.serialization.Deserializer;
@@ -26,7 +24,6 @@
2624
*/
2725
@Configuration
2826
public class OKafkaConfiguration {
29-
private final JSONB jsonb;
3027

3128
@Value("${app.ojdbcPath}")
3229
private String ojdbcPath;
@@ -46,10 +43,6 @@ public class OKafkaConfiguration {
4643
@Value("${app.consumerGroup:SensorEvents}")
4744
private String consumerGroup;
4845

49-
public OKafkaConfiguration(JSONB jsonb) {
50-
this.jsonb = jsonb;
51-
}
52-
5346
@Bean
5447
@Qualifier("okafkaProperties")
5548
public Properties okafkaProperties() {
@@ -64,26 +57,26 @@ public Properties okafkaProperties() {
6457

6558
@Bean
6659
@Qualifier("okafkaConsumer")
67-
public Consumer<String, Sensor> okafkaConsumer() {
60+
public Consumer<String, Sensor> okafkaConsumer(OSONKafkaSerializationFactory osonKafkaSerializationFactory) {
6861
Properties props = okafkaProperties();
6962
props.put("group.id", consumerGroup);
7063
props.put("enable.auto.commit","false");
7164
props.put("max.poll.records", 2000);
7265
props.put("auto.offset.reset", "earliest");
7366

7467
Deserializer<String> keyDeserializer = new StringDeserializer();
75-
Deserializer<Sensor> valueDeserializer = new JSONBDeserializer<>(jsonb, Sensor.class);
68+
Deserializer<Sensor> valueDeserializer = osonKafkaSerializationFactory.createDeserializer(Sensor.class);
7669
return new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);
7770
}
7871

7972
@Bean
8073
@Qualifier("okafkaProducer")
81-
public Producer<String, Sensor> okafkaProducer() {
74+
public Producer<String, Sensor> okafkaProducer(OSONKafkaSerializationFactory osonKafkaSerializationFactory) {
8275
Properties props = okafkaProperties();
8376
props.put("enable.idempotence", "true");
8477

8578
Serializer<String> keySerializer = new StringSerializer();
86-
Serializer<Sensor> valueSerializer = new JSONBSerializer<>(jsonb);
79+
Serializer<Sensor> valueSerializer = osonKafkaSerializationFactory.createSerializer();
8780
return new KafkaProducer<>(props, keySerializer, valueSerializer);
8881
}
8982
}

0 commit comments

Comments
 (0)