Skip to content

Commit 1cb3353

Browse files
OKafka Sample
Signed-off-by: Anders Swanson <anders.swanson@oracle.com>
1 parent b7051be commit 1cb3353

File tree

14 files changed

+593
-0
lines changed

14 files changed

+593
-0
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Oracle Database Spring Boot Samples
2+
3+
The Oracle Database Spring Boot Samples module provides a suite of comprehensive Spring Boot sample applications designed to enable developers with example code for application development.
4+
5+
### [Oracle UCP with JPA Sample](./oracle-spring-boot-sample-ucp-jpa/README.md)
6+
7+
The Oracle UCP with JPA sample application demonstrates how to use the Oracle Spring Boot Starter UCP with Spring Data JPA, connecting your Oracle Database with powerful ORM abstractions that facilitate rapid development, all while using the best connection pooling library for Oracle Database with Spring Boot.
8+
9+
### [JSON Relational Duality Views Sample](./oracle-spring-boot-sample-json-duality/README.md)
10+
11+
The JSON Relational Duality Views sample application demonstrates how to use the Oracle Spring Boot Starter JSON Collections with [JSON Relational Duality Views](https://docs.oracle.com/en/database/oracle/oracle-database/23/jsnvu/overview-json-relational-duality-views.html). JSON Relational Duality Views layer the advantages of JSON document-style database over existing relational data structures — Powerful JSON views with full CRUD capabilities can be created on relational database schemas, nesting related data into a single document with unified access.
12+
13+
### [OKafka Sample](./oracle-spring-boot-starter-okafka/README.md)
14+
15+
This sample application demonstrates how to use the Oracle Spring Boot Starter OKafka with the [Kafka Java Client for Oracle Transactional Event Queues](https://github.com/oracle/okafka)
16+
17+
Using an in-database message broker like TxEventQ eliminates the need for external message brokers, reduces overall network traffic, simplifying your overall application architecture — and the OKafka library enables developers to create applications for TxEventQ using familiar Kafka APIs for messaging.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Oracle Spring Boot Sample for OKafka
2+
3+
With Oracle Database 23ai, powerful Kafka APIs are easily used to read and write data backed by [Transactional Event Queues (TxEventQ)](https://docs.oracle.com/en/database/oracle/oracle-database/21/adque/aq-introduction.html).
4+
5+
If you’re unfamiliar with TxEventQ, it’s a a robust, real-time message broker that runs within Oracle Database, designed for high throughput — TxEventQ can handle approximately [100 billion messages per day](https://www.oracle.com/database/advanced-queuing/) on an 8-node Oracle RAC cluster.
6+
7+
This sample application demonstrates how to use the Oracle Spring Boot Starter OKafka with the [Kafka Java Client for Oracle Transactional Event Queues](https://github.com/oracle/okafka)
8+
9+
The Spring Boot OKafka sample application includes the following components to demonstrate application development using Kafka APIs for Oracle Transactional Event Queues:
10+
11+
- Sample OKafka Producers and Consumers
12+
- Connection properties for OKafka with Oracle Database
13+
- Topic management using OKafka admin client
14+
- Spring Boot configuration for OKafka
15+
- A comprehensive test using Spring Boot that produces and consumes data from Transactional Event Queues using OKafka
16+
17+
## Run the sample application
18+
19+
The sample application creates a temporary Oracle Free container database, and requires a docker runtime environment.
20+
21+
To run the test application, run the following command:
22+
23+
```shell
24+
mvn test
25+
```
26+
27+
## Configure your project to use OKafka
28+
29+
To use OKafka from your Spring Boot application, add the following Maven dependency to your project:
30+
31+
```xml
32+
<dependency>
33+
<groupId>com.oracle.database.spring</groupId>
34+
<artifactId>oracle-spring-boot-starter-okafka</artifactId>
35+
</dependency>
36+
```
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!-- Copyright (c) 2024, Oracle and/or its affiliates. -->
3+
<!-- Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -->
4+
<project xmlns="http://maven.apache.org/POM/4.0.0"
5+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
6+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
7+
<modelVersion>4.0.0</modelVersion>
8+
<parent>
9+
<artifactId>oracle-spring-boot-starter-samples</artifactId>
10+
<groupId>com.oracle.database.spring</groupId>
11+
<version>24.2.0</version>
12+
<relativePath>../pom.xml</relativePath>
13+
</parent>
14+
15+
<artifactId>oracle-spring-boot-sample-okafka</artifactId>
16+
<version>24.2.0</version>
17+
18+
<name>Oracle Spring Boot Starter - OKafka Sample</name>
19+
<description>Oracle Spring Boot Starter Sample for OKafka</description>
20+
21+
<organization>
22+
<name>Oracle America, Inc.</name>
23+
<url>https://www.oracle.com</url>
24+
</organization>
25+
26+
<developers>
27+
<developer>
28+
<name>Oracle</name>
29+
<email>obaas_ww at oracle.com</email>
30+
<organization>Oracle America, Inc.</organization>
31+
<organizationUrl>https://www.oracle.com</organizationUrl>
32+
</developer>
33+
</developers>
34+
35+
<licenses>
36+
<license>
37+
<name>The Universal Permissive License (UPL), Version 1.0</name>
38+
<url>https://oss.oracle.com/licenses/upl/</url>
39+
<distribution>repo</distribution>
40+
</license>
41+
</licenses>
42+
43+
<scm>
44+
<url>https://github.com/oracle/spring-cloud-oracle</url>
45+
<connection>scm:git:https://github.com/oracle/spring-cloud-oracle.git</connection>
46+
<developerConnection>scm:git:git@github.com:oracle/spring-cloud-oracle.git</developerConnection>
47+
</scm>
48+
49+
<dependencies>
50+
<dependency>
51+
<groupId>com.oracle.database.spring</groupId>
52+
<artifactId>oracle-spring-boot-starter-okafka</artifactId>
53+
<version>${project.version}</version>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.apache.kafka</groupId>
57+
<artifactId>kafka-clients</artifactId>
58+
<version>3.7.1</version>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.springframework.boot</groupId>
62+
<artifactId>spring-boot-starter</artifactId>
63+
</dependency>
64+
65+
<!-- Test Dependencies-->
66+
<dependency>
67+
<groupId>org.springframework.boot</groupId>
68+
<artifactId>spring-boot-starter-test</artifactId>
69+
<version>${spring-boot-dependencies.version}</version>
70+
<scope>test</scope>
71+
</dependency>
72+
73+
<dependency>
74+
<groupId>org.testcontainers</groupId>
75+
<artifactId>junit-jupiter</artifactId>
76+
<scope>test</scope>
77+
</dependency>
78+
79+
<dependency>
80+
<groupId>org.testcontainers</groupId>
81+
<artifactId>testcontainers</artifactId>
82+
<scope>test</scope>
83+
</dependency>
84+
85+
<dependency>
86+
<groupId>org.testcontainers</groupId>
87+
<artifactId>oracle-free</artifactId>
88+
<scope>test</scope>
89+
</dependency>
90+
</dependencies>
91+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.database.spring.okafka;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.Properties;
8+
import java.util.concurrent.ExecutionException;
9+
import java.util.concurrent.Future;
10+
11+
import jakarta.annotation.PostConstruct;
12+
import org.apache.kafka.clients.admin.NewTopic;
13+
import org.springframework.beans.factory.annotation.Qualifier;
14+
import org.springframework.core.task.AsyncTaskExecutor;
15+
import org.springframework.stereotype.Component;
16+
17+
import static com.oracle.database.spring.okafka.OKafkaConfiguration.TOPIC_NAME;
18+
19+
@Component
20+
public class OKafkaComponent {
21+
private final AsyncTaskExecutor taskExecutor;
22+
private final Properties kafkaProperties;
23+
private final SampleProducer<String> sampleProducer;
24+
private final SampleConsumer<String> sampleConsumer;
25+
26+
// The tasks list is used to track and wait for consumer/producer execution.
27+
private final List<Future<?>> tasks = new ArrayList<>();
28+
29+
public OKafkaComponent(@Qualifier("applicationTaskExecutor") AsyncTaskExecutor taskExecutor,
30+
@Qualifier("kafkaProperties") Properties kafkaProperties,
31+
@Qualifier("sampleProducer") SampleProducer<String> sampleProducer,
32+
@Qualifier("sampleConsumer") SampleConsumer<String> sampleConsumer) {
33+
this.taskExecutor = taskExecutor;
34+
this.kafkaProperties = kafkaProperties;
35+
this.sampleProducer = sampleProducer;
36+
this.sampleConsumer = sampleConsumer;
37+
}
38+
39+
@PostConstruct
40+
public void init() {
41+
// Create a new OKafka topic
42+
NewTopic topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
43+
OKafkaUtil.createTopicIfNotExists(kafkaProperties, topic);
44+
45+
// Start the producer and consumer
46+
tasks.add(taskExecutor.submit(sampleProducer));
47+
tasks.add(taskExecutor.submit(sampleConsumer));
48+
}
49+
50+
public void await() throws ExecutionException, InterruptedException {
51+
for (Future<?> task : tasks) {
52+
task.get();
53+
}
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.database.spring.okafka;
4+
5+
import java.io.File;
6+
import java.io.IOException;
7+
import java.nio.file.Files;
8+
import java.util.Properties;
9+
import java.util.stream.Stream;
10+
11+
import org.apache.kafka.clients.consumer.Consumer;
12+
import org.oracle.okafka.clients.consumer.KafkaConsumer;
13+
import org.oracle.okafka.clients.producer.KafkaProducer;
14+
import org.springframework.beans.factory.annotation.Qualifier;
15+
import org.springframework.beans.factory.annotation.Value;
16+
import org.springframework.context.annotation.Bean;
17+
import org.springframework.context.annotation.Configuration;
18+
19+
@Configuration
20+
public class OKafkaConfiguration {
21+
public static final String TOPIC_NAME = "OKAFKA_SAMPLE";
22+
23+
@Value("${ojdbc.path}")
24+
private String ojdbcPath;
25+
26+
@Value("${bootstrap.servers}")
27+
private String bootstrapServers;
28+
29+
// We use the default 23ai Free service name
30+
@Value("${service.name:freepdb1}")
31+
private String serviceName;
32+
33+
// We use plaintext for a containerized, local database.
34+
// Use SSL for wallet connections, like Autonomous Database.
35+
@Value("${security.protocol:PLAINTEXT}")
36+
private String securityProtocol;
37+
38+
@Value("${producer.stream.file}")
39+
private String producerFile;
40+
41+
@Value("${expected.messages:50}")
42+
private int expectedMessages;
43+
44+
45+
@Bean
46+
@Qualifier("okafkaProperties")
47+
public Properties kafkaProperties() {
48+
return OKafkaUtil.getConnectionProperties(ojdbcPath,
49+
bootstrapServers,
50+
securityProtocol,
51+
serviceName);
52+
}
53+
54+
@Bean
55+
@Qualifier("sampleProducer")
56+
public SampleProducer<String> sampleProducer() throws IOException {
57+
// Create the OKafka Producer.
58+
Properties props = kafkaProperties();
59+
props.put("enable.idempotence", "true");
60+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
61+
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
62+
// Note the use of the org.oracle.okafka.clients.producer.KafkaProducer class, for Oracle TxEventQ.
63+
KafkaProducer<String, String> okafkaProducer = new KafkaProducer<>(props);
64+
// We create a stream of a data from a file to give the producer input messages.
65+
Stream<String> producerData = Files.lines(new File(producerFile).toPath());
66+
return new SampleProducer<>(okafkaProducer, TOPIC_NAME, producerData);
67+
}
68+
69+
@Bean
70+
@Qualifier("sampleConsumer")
71+
public SampleConsumer<String> sampleConsumer() {
72+
// Create the OKafka Consumer.
73+
Properties props = kafkaProperties();
74+
props.put("group.id" , "MY_CONSUMER_GROUP");
75+
props.put("enable.auto.commit","false");
76+
props.put("max.poll.records", 2000);
77+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
78+
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
79+
// Note the use of the org.oracle.okafka.clients.producer.KafkaConsumer class, for Oracle TxEventQ.
80+
Consumer<String, String> okafkaConsumer = new KafkaConsumer<>(props);
81+
return new SampleConsumer<>(okafkaConsumer, TOPIC_NAME, expectedMessages);
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.database.spring.okafka;
4+
5+
import org.springframework.boot.SpringApplication;
6+
import org.springframework.boot.autoconfigure.SpringBootApplication;
7+
import org.springframework.scheduling.annotation.EnableAsync;
8+
9+
@SpringBootApplication
10+
@EnableAsync
11+
public class OKafkaSampleApp {
12+
public static void main(String[] args) {
13+
SpringApplication.run(OKafkaSampleApp.class, args);
14+
}
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.database.spring.okafka;
4+
5+
import java.util.Collections;
6+
import java.util.Properties;
7+
import java.util.concurrent.ExecutionException;
8+
9+
import org.apache.kafka.clients.admin.Admin;
10+
import org.apache.kafka.clients.admin.NewTopic;
11+
import org.apache.kafka.common.errors.TopicExistsException;
12+
import org.oracle.okafka.clients.admin.AdminClient;
13+
public class OKafkaUtil {
14+
public static Properties getConnectionProperties(String ojdbcPath,
15+
String bootstrapServers,
16+
String securityProtocol,
17+
String serviceName) {
18+
Properties props = new Properties();
19+
props.put("oracle.service.name", serviceName);
20+
props.put("security.protocol", securityProtocol);
21+
props.put("bootstrap.servers", bootstrapServers);
22+
// If using Oracle Database wallet, pass wallet directory
23+
props.put("oracle.net.tns_admin", ojdbcPath);
24+
return props;
25+
}
26+
27+
public static void createTopicIfNotExists(Properties okafkaProperties,
28+
NewTopic newTopic) {
29+
try (Admin admin = AdminClient.create(okafkaProperties)) {
30+
admin.createTopics(Collections.singletonList(newTopic))
31+
.all()
32+
.get();
33+
} catch (ExecutionException | InterruptedException e) {
34+
if (e.getCause() instanceof TopicExistsException) {
35+
System.out.println("Topic already exists, skipping creation");
36+
} else {
37+
throw new RuntimeException(e);
38+
}
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)