Skip to content

OSON Kafka Serializers #204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Copyright (c) 2024, Oracle and/or its affiliates. -->
<!-- Copyright (c) 2024, 2025, Oracle and/or its affiliates. -->
<!-- Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -->
<modelVersion>4.0.0</modelVersion>
<groupId>com.oracle.database.spring.cloud-stream-binder</groupId>
Expand Down Expand Up @@ -42,7 +42,7 @@
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<txeventq.streambinder.version>0.12.0</txeventq.streambinder.version>
<txeventq.streambinder.version>0.13.0</txeventq.streambinder.version>
<spring.boot.version>3.4.3</spring.boot.version>
<testcontainers.version>1.20.6</testcontainers.version>
</properties>
Expand Down Expand Up @@ -86,4 +86,4 @@
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>
7 changes: 7 additions & 0 deletions database/starters/oracle-spring-boot-json-data-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
</scm>

<dependencies>
<!-- Include to enable OKafka Conversion beans -->
<dependency>
<groupId>com.oracle.database.spring</groupId>
<artifactId>oracle-spring-boot-starter-okafka</artifactId>
<version>${project.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.oracle.database.spring</groupId>
<artifactId>oracle-spring-boot-starter-ucp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Copyright (c) 2024, 2025, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.spring.json;

import com.oracle.spring.json.jsonb.JSONB;
import com.oracle.spring.json.kafka.OSONKafkaSerializationFactory;
import jakarta.json.bind.JsonbBuilder;
import oracle.sql.json.OracleJsonFactory;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.eclipse.yasson.YassonJsonb;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
Expand All @@ -29,4 +32,13 @@ YassonJsonb yassonJsonb() {
public JSONB jsonb(OracleJsonFactory oracleJsonFactory, YassonJsonb yassonJsonb) {
return new JSONB(oracleJsonFactory, yassonJsonb);
}

@Bean
@ConditionalOnClass(value = {
Deserializer.class,
Serializer.class
})
public OSONKafkaSerializationFactory osonSerializationFactory(JSONB jsonb) {
return new OSONKafkaSerializationFactory(jsonb);
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Copyright (c) 2025, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.jsonevents.serde;
package com.oracle.spring.json.kafka;

import java.nio.ByteBuffer;

import com.oracle.spring.json.jsonb.JSONB;
import org.apache.kafka.common.serialization.Deserializer;

/**
* The JSONBDeserializer converts JSONB byte arrays to java objects.
* The OSONDeserializer converts JSONB byte arrays to java objects.
* @param <T> deserialization type
*/
public class JSONBDeserializer<T> implements Deserializer<T> {
public class OSONDeserializer<T> implements Deserializer<T> {
private final JSONB jsonb;
private final Class<T> clazz;

public JSONBDeserializer(JSONB jsonb, Class<T> clazz) {
public OSONDeserializer(JSONB jsonb, Class<T> clazz) {
this.jsonb = jsonb;
this.clazz = clazz;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2025, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.spring.json.kafka;

import com.oracle.spring.json.jsonb.JSONB;

public class OSONKafkaSerializationFactory {
private final JSONB jsonb;

public OSONKafkaSerializationFactory(JSONB jsonb) {
this.jsonb = jsonb;
}

public <T> OSONDeserializer<T> createDeserializer(Class<T> clazz) {
return new OSONDeserializer<>(jsonb, clazz);
}

public <T> OSONSerializer<T> createSerializer() {
return new OSONSerializer<>(jsonb);
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Copyright (c) 2025, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.jsonevents.serde;
package com.oracle.spring.json.kafka;

import com.oracle.spring.json.jsonb.JSONB;
import org.apache.kafka.common.serialization.Serializer;

/**
* The JSONBSerializer converts java objects to a JSONB byte array.
* The OSONSerializer converts java objects to a JSONB byte array.
* @param <T> serialization type.
*/
public class JSONBSerializer<T> implements Serializer<T> {
public class OSONSerializer<T> implements Serializer<T> {
private final JSONB jsonb;

public JSONBSerializer(JSONB jsonb) {
public OSONSerializer(JSONB jsonb) {
this.jsonb = jsonb;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Copyright (c) 2024, 2025, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.jsonevents;

import java.util.Properties;

import com.oracle.database.spring.jsonevents.model.Sensor;
import com.oracle.database.spring.jsonevents.serde.JSONBDeserializer;
import com.oracle.database.spring.jsonevents.serde.JSONBSerializer;
import com.oracle.spring.json.jsonb.JSONB;
import com.oracle.spring.json.kafka.OSONKafkaSerializationFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -26,7 +24,6 @@
*/
@Configuration
public class OKafkaConfiguration {
private final JSONB jsonb;

@Value("${app.ojdbcPath}")
private String ojdbcPath;
Expand All @@ -46,10 +43,6 @@ public class OKafkaConfiguration {
@Value("${app.consumerGroup:SensorEvents}")
private String consumerGroup;

public OKafkaConfiguration(JSONB jsonb) {
this.jsonb = jsonb;
}

@Bean
@Qualifier("okafkaProperties")
public Properties okafkaProperties() {
Expand All @@ -64,26 +57,26 @@ public Properties okafkaProperties() {

@Bean
@Qualifier("okafkaConsumer")
public Consumer<String, Sensor> okafkaConsumer() {
public Consumer<String, Sensor> okafkaConsumer(OSONKafkaSerializationFactory osonKafkaSerializationFactory) {
Properties props = okafkaProperties();
props.put("group.id", consumerGroup);
props.put("enable.auto.commit","false");
props.put("max.poll.records", 2000);
props.put("auto.offset.reset", "earliest");

Deserializer<String> keyDeserializer = new StringDeserializer();
Deserializer<Sensor> valueDeserializer = new JSONBDeserializer<>(jsonb, Sensor.class);
Deserializer<Sensor> valueDeserializer = osonKafkaSerializationFactory.createDeserializer(Sensor.class);
return new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);
}

@Bean
@Qualifier("okafkaProducer")
public Producer<String, Sensor> okafkaProducer() {
public Producer<String, Sensor> okafkaProducer(OSONKafkaSerializationFactory osonKafkaSerializationFactory) {
Properties props = okafkaProperties();
props.put("enable.idempotence", "true");

Serializer<String> keySerializer = new StringSerializer();
Serializer<Sensor> valueSerializer = new JSONBSerializer<>(jsonb);
Serializer<Sensor> valueSerializer = osonKafkaSerializationFactory.createSerializer();
return new KafkaProducer<>(props, keySerializer, valueSerializer);
}
}
Loading