-
Notifications
You must be signed in to change notification settings - Fork 35
Sample to show how process different types of avro subjects in a single topic #98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a050abc
f6469f6
76f6600
e7b40e8
c30178b
1defa0b
d693cf3
fdcbdd8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
<component name="ProjectRunConfigurationManager"> | ||
<configuration default="false" name="all" type="CompoundRunConfigurationType"> | ||
<toRun name="flink-consumer" type="Application" /> | ||
<toRun name="producer-app" type="Application" /> | ||
<method v="2" /> | ||
</configuration> | ||
</component> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
<component name="ProjectRunConfigurationManager"> | ||
<configuration default="false" name="flink-consumer" type="Application" factoryName="Application"> | ||
<option name="ALTERNATIVE_JRE_PATH" value="11" /> | ||
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" /> | ||
<option name="INCLUDE_PROVIDED_SCOPE" value="true" /> | ||
<option name="MAIN_CLASS_NAME" value="com.amazonaws.services.msf.StreamingJob" /> | ||
<module name="flink-consumer" /> | ||
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension"> | ||
<option name="credential" /> | ||
<option name="region" /> | ||
<option name="useCurrentConnection" value="false" /> | ||
</extension> | ||
<method v="2"> | ||
<option name="Make" enabled="true" /> | ||
</method> | ||
</configuration> | ||
</component> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
<component name="ProjectRunConfigurationManager"> | ||
<configuration default="false" name="producer-app" type="Application" factoryName="Application"> | ||
<option name="ALTERNATIVE_JRE_PATH" value="11" /> | ||
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" /> | ||
<option name="INCLUDE_PROVIDED_SCOPE" value="true" /> | ||
<option name="MAIN_CLASS_NAME" value="com.amazonaws.services.msf.Sensors" /> | ||
<module name="producer-app" /> | ||
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension"> | ||
<option name="credential" /> | ||
<option name="region" /> | ||
<option name="useCurrentConnection" value="false" /> | ||
</extension> | ||
<method v="2"> | ||
<option name="Make" enabled="true" /> | ||
</method> | ||
</configuration> | ||
</component> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
## AVRO One Topic Many Subjects | ||
|
||
* Flink version: 1.20 | ||
* Flink API: DataStream API | ||
* Language: Java (11) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We usually add to the list the connectors used in the example. In this case, it's also important to add that the example uses AVRO Confluent Schema Registry There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ |
||
* Schema Registry: Confluent Schema Registry | ||
|
||
This example demonstrates how to serialize/deserialize Avro messages in Kafka when one topic stores multiple subject | ||
types, leveraging Confluent Schema Registry for schema management and evolution. | ||
|
||
See [this article](https://martin.kleppmann.com/2018/01/18/event-types-in-kafka-topic.html) for more information about why it's sometimes essential to store multiple subjects in the same topic. | ||
|
||
This sample is based on a hypothetical use case where two sensors fitted into a room emit telemetry data to a single Kafka topic. | ||
We use Confluent Schema Registry to govern the schema of telemetry messages serialized using Avro. | ||
|
||
Schema definitions for the messages are in `.avdl` files located in [./producer-app/src/main/resources/avro](./producer-app/src/main/resources/avro). | ||
|
||
This example uses Avro-generated classes (more details [below](#using-avro-generated-classes)). | ||
|
||
**producer-app** writes random samples of room temperature and air quality records to sensor-data topic. | ||
**flink-consumer** reads sensor-data topic and processes them using `DataStream` API to print room temperatures and air quality exceeding a given threshold. | ||
|
||
|
||
## Flink compatibility | ||
|
||
**Note:** This project is compatible with Flink 1.20+ and Amazon Managed Service for Apache Flink. | ||
|
||
### Flink API compatibility | ||
This example uses the newer `KafkaSource` (as opposed to `FlinkKafkaConsumer`, which were deprecated with Flink 1.15). | ||
|
||
### Avro-generated classes | ||
|
||
This project uses classes generated at build-time as data objects. | ||
|
||
As a best practice, only the Avro schema definitions (IDL `.avdl` files in this case) are included in the project source code. | ||
|
||
The Avro Maven plugin generates the Java classes (source code) at build-time, during the [`generate-sources`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase. | ||
|
||
The generated classes are written into the `./<application-dir>/target/generated-sources/avro` directory and should **not** be committed with the project source. | ||
|
||
This way, the only dependency is on the schema definition file(s). If any change is required, the schema file is modified, and the Avro classes are re-generated automatically during the build. | ||
|
||
Code generation is supported by all common IDEs like IntelliJ. If your IDE does not see the Avro classes (`AirQuality` and `RoomTemperature`) when you import the project for the first time, you may manually run `mvn generate-sources` once to force source code generation from the IDE. | ||
|
||
### Using Avro-generated classes (SpecificRecord) in Apache Flink | ||
|
||
Using Avro-generated classes (`SpecificRecord`) within the flow of the Flink application (between operators) or in the Flink state has an additional benefit. Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos) these objects, without risking falling back to Kryo. | ||
|
||
### Running required services locally | ||
Kafka and Confluent Schema Registry configuration is in [./docker-compose.yml](./docker-compose.yml). Start these services by running the `docker compose up` command. | ||
|
||
### Running in IntelliJ | ||
To start the Flink job in IntelliJ: | ||
1. Edit the Run/Debug configuration (Use compound configuration to run both applications at the same time). | ||
2. Enable the option **"Add dependencies with 'provided' scope to the classpath"**. | ||
|
||
See this [page](../running-examples-locally.md) for more information. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
services: | ||
zookeeper: | ||
image: confluentinc/cp-zookeeper:latest | ||
environment: | ||
ZOOKEEPER_CLIENT_PORT: 2181 | ||
ZOOKEEPER_TICK_TIME: 2000 | ||
ports: | ||
- 22181:2181 | ||
|
||
kafka: | ||
image: confluentinc/cp-kafka:latest | ||
depends_on: | ||
- zookeeper | ||
ports: | ||
- 29092:29092 | ||
- 11001:11001 | ||
environment: | ||
KAFKA_BROKER_ID: 1 | ||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 | ||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 | ||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT | ||
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT | ||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
KAFKA_JMX_PORT: 11001 | ||
KAFKA_JMX_HOSTNAME: kafka | ||
JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=11001" | ||
links: | ||
- zookeeper | ||
|
||
scr: | ||
image: confluentinc/cp-schema-registry | ||
container_name: scr | ||
depends_on: | ||
- kafka | ||
links: | ||
- kafka | ||
ports: | ||
- 8085:8085 | ||
environment: | ||
CUB_CLASSPATH: '/usr/share/java/confluent-security/schema-registry/*:/usr/share/java/schema-registry/*:/usr/share/java/schema-registry-plugins/*:/usr/share/java/cp-base-new/*' | ||
SCHEMA_REGISTRY_HOST_NAME: schemaregistry | ||
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 | ||
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nicusX Do you think this is a good approach to share run configurations with IntelliJ users?