Skip to content

Sample to show how process different types of avro subjects in a single topic #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions java/AvroOneTopicManySubjects/.run/all.run.xml
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nicusX Do you think this is a good approach to share run configurations with IntelliJ users?

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="all" type="CompoundRunConfigurationType">
<toRun name="flink-consumer" type="Application" />
<toRun name="producer-app" type="Application" />
<method v="2" />
</configuration>
</component>
17 changes: 17 additions & 0 deletions java/AvroOneTopicManySubjects/.run/flink-consumer.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="flink-consumer" type="Application" factoryName="Application">
<option name="ALTERNATIVE_JRE_PATH" value="11" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
<option name="MAIN_CLASS_NAME" value="com.amazonaws.services.msf.StreamingJob" />
<module name="flink-consumer" />
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
17 changes: 17 additions & 0 deletions java/AvroOneTopicManySubjects/.run/producer-app.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="producer-app" type="Application" factoryName="Application">
<option name="ALTERNATIVE_JRE_PATH" value="11" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
<option name="MAIN_CLASS_NAME" value="com.amazonaws.services.msf.Sensors" />
<module name="producer-app" />
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
57 changes: 57 additions & 0 deletions java/AvroOneTopicManySubjects/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
## AVRO One Topic Many Subjects

* Flink version: 1.20
* Flink API: DataStream API
* Language: Java (11)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually add to the list the connectors used in the example. In this case, it's also important to add that the example uses AVRO Confluent Schema Registry

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* Schema Registry: Confluent Schema Registry

This example demonstrates how to serialize/deserialize Avro messages in Kafka when one topic stores multiple subject
types, leveraging Confluent Schema Registry for schema management and evolution.

See [this article](https://martin.kleppmann.com/2018/01/18/event-types-in-kafka-topic.html) for more information about why it's sometimes essential to store multiple subjects in the same topic.

This sample is based on a hypothetical use case where two sensors fitted into a room emit telemetry data to a single Kafka topic.
We use Confluent Schema Registry to govern the schema of telemetry messages serialized using Avro.

Schema definitions for the messages are in `.avdl` files located in [./producer-app/src/main/resources/avro](./producer-app/src/main/resources/avro).

This example uses Avro-generated classes (more details [below](#using-avro-generated-classes)).

**producer-app** writes random samples of room temperature and air quality records to sensor-data topic.
**flink-consumer** reads sensor-data topic and processes them using `DataStream` API to print room temperatures and air quality exceeding a given threshold.


## Flink compatibility

**Note:** This project is compatible with Flink 1.20+ and Amazon Managed Service for Apache Flink.

### Flink API compatibility
This example uses the newer `KafkaSource` (as opposed to `FlinkKafkaConsumer`, which were deprecated with Flink 1.15).

### Avro-generated classes

This project uses classes generated at build-time as data objects.

As a best practice, only the Avro schema definitions (IDL `.avdl` files in this case) are included in the project source code.

The Avro Maven plugin generates the Java classes (source code) at build-time, during the [`generate-sources`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase.

The generated classes are written into the `./<application-dir>/target/generated-sources/avro` directory and should **not** be committed with the project source.

This way, the only dependency is on the schema definition file(s). If any change is required, the schema file is modified, and the Avro classes are re-generated automatically during the build.

Code generation is supported by all common IDEs like IntelliJ. If your IDE does not see the Avro classes (`AirQuality` and `RoomTemperature`) when you import the project for the first time, you may manually run `mvn generate-sources` once to force source code generation from the IDE.

### Using Avro-generated classes (SpecificRecord) in Apache Flink

Using Avro-generated classes (`SpecificRecord`) within the flow of the Flink application (between operators) or in the Flink state has an additional benefit. Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos) these objects, without risking falling back to Kryo.

### Running required services locally
Kafka and Confluent Schema Registry configuration is in [./docker-compose.yml](./docker-compose.yml). Start these services by running the `docker compose up` command.

### Running in IntelliJ
To start the Flink job in IntelliJ:
1. Edit the Run/Debug configuration (Use compound configuration to run both applications at the same time).
2. Enable the option **"Add dependencies with 'provided' scope to the classpath"**.

See this [page](../running-examples-locally.md) for more information.
43 changes: 43 additions & 0 deletions java/AvroOneTopicManySubjects/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
- 11001:11001
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 11001
KAFKA_JMX_HOSTNAME: kafka
JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=11001"
links:
- zookeeper

scr:
image: confluentinc/cp-schema-registry
container_name: scr
depends_on:
- kafka
links:
- kafka
ports:
- 8085:8085
environment:
CUB_CLASSPATH: '/usr/share/java/confluent-security/schema-registry/*:/usr/share/java/schema-registry/*:/usr/share/java/schema-registry-plugins/*:/usr/share/java/cp-base-new/*'
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
Loading