-
Notifications
You must be signed in to change notification settings - Fork 36
Sample to show how process different types of avro subjects in a single topic #98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
a050abc
f6469f6
76f6600
e7b40e8
c30178b
1defa0b
d693cf3
fdcbdd8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
**/.idea/ | ||
.bsp | ||
**/target | ||
**/.DS_Store | ||
**/dependency-reduced-pom.xml | ||
**/*.iml | ||
**/*.jar | ||
**/*.log | ||
.env | ||
.venv | ||
env/ | ||
venv/ | ||
.java-version | ||
/pyflink/ | ||
*.p12 | ||
*.key | ||
*.pem | ||
*.crt | ||
*.csr | ||
*.der | ||
*.jks | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
## AVRO One Topic Many Subjects | ||
|
||
* Flink version: 1.20 | ||
* Flink API: DataStream API | ||
* Language: Java (11) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We usually add to the list the connectors used in the example. In this case, it's also important to add that the example uses AVRO Confluent Schema Registry There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ |
||
|
||
This example demonstrates how to serialize/deserialize Avro messages in Kafka when one topic stores multiple subject types. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explain this is specific to Confluent Schema Registry There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ |
||
|
||
See [this article](https://martin.kleppmann.com/2018/01/18/event-types-in-kafka-topic.html) for more information about why it's sometimes essential to store multiple subjects in the same topic. | ||
|
||
This sample is based on a hypothetical use case where two sensors fitted into a room emit telemetry data to a single Kafka topic. We use Confluent Schema Registry to govern the schema of telemetry messages serialized using Avro. | ||
|
||
Schema definitions for the messages are in `.avdl` files located in [./src/main/resources/avro](./src/main/resources/avro). | ||
|
||
This example uses Avro-generated classes (more details [below](#using-avro-generated-classes)). | ||
|
||
A `KafkaSource` produces a stream of Avro data objects (`SpecificRecord`), fetching the writer's schema from AWS Glue Schema Registry. The Avro Kafka message value must have been serialized using AWS Glue Schema Registry. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you mean Confluent. Schema Registry? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ |
||
|
||
A `KafkaSink` serializes Avro data objects as Kafka message values, and a `String`, converted to bytes as UTF-8, as Kafka message keys. | ||
|
||
## Flink compatibility | ||
|
||
**Note:** This project is compatible with Flink 1.15+ and Amazon Managed Service for Apache Flink. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ...but requires updating the dependencies (note that connector versioning has change after 1.15 so the version of some dependencies for 1.15 is completely different from 1.20 |
||
|
||
### Flink API compatibility | ||
This example uses the newer `KafkaSource` and `KafkaSink` (as opposed to `FlinkKafkaConsumer` and `FlinkKafkaProducer`, which were deprecated with Flink 1.15). | ||
|
||
### Avro-generated classes | ||
|
||
This project uses classes generated at build-time as data objects. | ||
|
||
As a best practice, only the Avro schema definitions (IDL `.avdl` files in this case) are included in the project source code. | ||
|
||
The Avro Maven plugin generates the Java classes (source code) at build-time, during the [`generate-sources`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase. | ||
|
||
The generated classes are written into the `./target/generated-sources/avro` directory and should **not** be committed with the project source. | ||
|
||
This way, the only dependency is on the schema definition file(s). If any change is required, the schema file is modified, and the Avro classes are re-generated automatically during the build. | ||
|
||
Code generation is supported by all common IDEs like IntelliJ. If your IDE does not see the Avro classes (`AirQuality` and `RoomTemperature`) when you import the project for the first time, you may manually run `mvn generate-sources` once to force source code generation from the IDE. | ||
|
||
### Using Avro-generated classes (SpecificRecord) in Apache Flink | ||
|
||
Using Avro-generated classes (`SpecificRecord`) within the flow of the Flink application (between operators) or in the Flink state has an additional benefit. Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos) these objects, without risking falling back to Kryo. | ||
|
||
### Running required services locally | ||
Kafka and Confluent Schema Registry configuration is in [./docker-compose.yml](./docker-compose.yml). Start these services by running the `docker compose up` command. | ||
|
||
### Running in IntelliJ | ||
To start the Flink job in IntelliJ: | ||
1. Edit the Run/Debug configuration. | ||
2. Enable the option **"Add dependencies with 'provided' scope to the classpath"**. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can add the link to the shared MD at ../running-examples-locally.md There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
services: | ||
zookeeper: | ||
image: confluentinc/cp-zookeeper:latest | ||
environment: | ||
ZOOKEEPER_CLIENT_PORT: 2181 | ||
ZOOKEEPER_TICK_TIME: 2000 | ||
ports: | ||
- 22181:2181 | ||
|
||
kafka: | ||
image: confluentinc/cp-kafka:latest | ||
depends_on: | ||
- zookeeper | ||
ports: | ||
- 29092:29092 | ||
- 11001:11001 | ||
environment: | ||
KAFKA_BROKER_ID: 1 | ||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 | ||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 | ||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT | ||
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT | ||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
KAFKA_JMX_PORT: 11001 | ||
KAFKA_JMX_HOSTNAME: kafka | ||
JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=11001" | ||
links: | ||
- zookeeper | ||
|
||
scr: | ||
image: confluentinc/cp-schema-registry | ||
container_name: scr | ||
depends_on: | ||
- kafka | ||
links: | ||
- kafka | ||
ports: | ||
- 8085:8085 | ||
environment: | ||
CUB_CLASSPATH: '/usr/share/java/confluent-security/schema-registry/*:/usr/share/java/schema-registry/*:/usr/share/java/schema-registry-plugins/*:/usr/share/java/cp-base-new/*' | ||
SCHEMA_REGISTRY_HOST_NAME: schemaregistry | ||
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 | ||
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,250 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>com.amazonaws</groupId> | ||
<artifactId>avro-one-topic-many-subjects</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
|
||
<properties> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
<buildDirectory>${project.basedir}/target</buildDirectory> | ||
<jar.finalName>${project.name}</jar.finalName> | ||
<target.java.version>11</target.java.version> | ||
<maven.compiler.source>${target.java.version}</maven.compiler.source> | ||
<main.class>com.amazonaws.services.msf.StreamingJob</main.class> | ||
|
||
<scala.binary.version>2.12</scala.binary.version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not required |
||
<avro.version>1.10.2</avro.version> | ||
<kafka.clients.version>3.3.0-1.20</kafka.clients.version> | ||
|
||
<flink.version>1.20.0</flink.version> | ||
<kda.connectors.version>2.1.0</kda.connectors.version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this dependency is required. Please check and remove this variable and the dependnecy |
||
<kda.runtime.version>1.2.0</kda.runtime.version> | ||
|
||
<slf4j.version>1.7.32</slf4j.version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use newer versions. |
||
<log4j.version>2.17.2</log4j.version> | ||
<junit.version>5.9.1</junit.version> | ||
</properties> | ||
|
||
<repositories> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really required? |
||
<!-- Registry is required to add avro serde dependencies --> | ||
<repository> | ||
<id>confluent</id> | ||
<url>https://packages.confluent.io/maven/</url> | ||
</repository> | ||
</repositories> | ||
|
||
<dependencies> | ||
<!-- Apache Flink dependencies --> | ||
<!-- These dependencies are provided, because they should not be packaged into the JAR file. --> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-java</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-streaming-java</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-clients</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-runtime-web</artifactId> | ||
<version>${flink.version}</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be |
||
</dependency> | ||
|
||
<!-- Amazon Managed Service for Apache Flink (formerly Kinesis Analytics) runtime--> | ||
<dependency> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-kinesisanalytics-runtime</artifactId> | ||
<version>${kda.runtime.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<!-- Connectors --> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-base</artifactId> | ||
<version>${flink.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-kafka</artifactId> | ||
<version>${kafka.clients.version}</version> | ||
</dependency> | ||
<dependency> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this dependency required? |
||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-kinesisanalytics-flink</artifactId> | ||
<version>${kda.connectors.version}</version> | ||
</dependency> | ||
|
||
<!-- AVRO --> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-avro</artifactId> | ||
<version>${flink.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.avro</groupId> | ||
<artifactId>avro</artifactId> | ||
<version>${avro.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-avro-confluent-registry</artifactId> | ||
<version>${flink.version}</version> | ||
</dependency> | ||
<dependency> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are not using Table API. This is not required |
||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-api-java</artifactId> | ||
<version>${flink.version}</version> | ||
</dependency> | ||
<dependency> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are not using File System connector, this is not required |
||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-files</artifactId> | ||
<version>${flink.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.confluent</groupId> | ||
<artifactId>kafka-avro-serializer</artifactId> | ||
<version>7.9.0</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Define a variable for CSR AVRO dependencies version |
||
</dependency> | ||
<dependency> | ||
<groupId>io.confluent</groupId> | ||
<artifactId>kafka-schema-registry-client</artifactId> | ||
<version>7.9.0</version> <!-- Use the version compatible with your Kafka version --> | ||
</dependency> | ||
<!-- Add logging framework, to produce console output when running in the IDE. --> | ||
<!-- These dependencies are excluded from the application JAR by default. --> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-slf4j-impl</artifactId> | ||
<version>${log4j.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-api</artifactId> | ||
<version>${log4j.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-core</artifactId> | ||
<version>${log4j.version}</version> | ||
</dependency> | ||
|
||
<!-- Test --> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-api</artifactId> | ||
<version>${junit.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-engine</artifactId> | ||
<version>${junit.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<directory>${buildDirectory}</directory> | ||
<finalName>${jar.finalName}</finalName> | ||
<plugins> | ||
<!-- Java Compiler --> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
<version>3.8.1</version> | ||
<configuration> | ||
<source>${target.java.version}</source> | ||
<target>${target.java.version}</target> | ||
<forceJavacCompilerUse>true</forceJavacCompilerUse> | ||
</configuration> | ||
</plugin> | ||
|
||
<!-- AVRO source generator --> | ||
<plugin> | ||
<groupId>org.apache.avro</groupId> | ||
<artifactId>avro-maven-plugin</artifactId> | ||
<version>${avro.version}</version> | ||
<executions> | ||
<execution> | ||
<phase>generate-sources</phase> | ||
<goals> | ||
<goal>idl-protocol</goal> | ||
</goals> | ||
<configuration> | ||
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory> | ||
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> | ||
<fieldVisibility>private</fieldVisibility> | ||
<stringType>String</stringType> | ||
<enableDecimalLogicalType>true</enableDecimalLogicalType> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
|
||
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> | ||
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<version>3.1.1</version> | ||
<executions> | ||
<!-- Run shade goal on package phase --> | ||
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> | ||
</goals> | ||
<configuration> | ||
<artifactSet> | ||
<excludes> | ||
<exclude>org.apache.flink:force-shading</exclude> | ||
<exclude>com.google.code.findbugs:jsr305</exclude> | ||
<exclude>org.slf4j:*</exclude> | ||
<exclude>org.apache.logging.log4j:*</exclude> | ||
</excludes> | ||
</artifactSet> | ||
<filters> | ||
<filter> | ||
<!-- Do not copy the signatures in the META-INF folder. | ||
Otherwise, this might cause SecurityExceptions when using the JAR. --> | ||
<artifact>*:*</artifact> | ||
<excludes> | ||
<exclude>META-INF/*.SF</exclude> | ||
<exclude>META-INF/*.DSA</exclude> | ||
<exclude>META-INF/*.RSA</exclude> | ||
</excludes> | ||
</filter> | ||
</filters> | ||
<transformers> | ||
<transformer | ||
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> | ||
<transformer | ||
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> | ||
<mainClass>${main.class}</mainClass> | ||
</transformer> | ||
</transformers> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
|
||
</plugins> | ||
</build> | ||
|
||
</project> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.gitignore in the subfolder is not required.
There is a gitignore at top level. If there is anything missing there please update that one