diff --git a/java/AvroOneTopicManySubjects/.run/all.run.xml b/java/AvroOneTopicManySubjects/.run/all.run.xml new file mode 100644 index 0000000..a64e42d --- /dev/null +++ b/java/AvroOneTopicManySubjects/.run/all.run.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/.run/flink-consumer.run.xml b/java/AvroOneTopicManySubjects/.run/flink-consumer.run.xml new file mode 100644 index 0000000..ab31e9c --- /dev/null +++ b/java/AvroOneTopicManySubjects/.run/flink-consumer.run.xml @@ -0,0 +1,17 @@ + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/.run/producer-app.run.xml b/java/AvroOneTopicManySubjects/.run/producer-app.run.xml new file mode 100644 index 0000000..1c0bf74 --- /dev/null +++ b/java/AvroOneTopicManySubjects/.run/producer-app.run.xml @@ -0,0 +1,17 @@ + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/README.md b/java/AvroOneTopicManySubjects/README.md new file mode 100644 index 0000000..ca19a3e --- /dev/null +++ b/java/AvroOneTopicManySubjects/README.md @@ -0,0 +1,57 @@ +## AVRO One Topic Many Subjects + +* Flink version: 1.20 +* Flink API: DataStream API +* Language: Java (11) +* Schema Registry: Confluent Schema Registry + +This example demonstrates how to serialize/deserialize Avro messages in Kafka when one topic stores multiple subject +types, leveraging Confluent Schema Registry for schema management and evolution. + +See [this article](https://martin.kleppmann.com/2018/01/18/event-types-in-kafka-topic.html) for more information about why it's sometimes essential to store multiple subjects in the same topic. + +This sample is based on a hypothetical use case where two sensors fitted into a room emit telemetry data to a single Kafka topic. +We use Confluent Schema Registry to govern the schema of telemetry messages serialized using Avro. + +Schema definitions for the messages are in `.avdl` files located in [./producer-app/src/main/resources/avro](./producer-app/src/main/resources/avro). + +This example uses Avro-generated classes (more details [below](#using-avro-generated-classes)). + +**producer-app** writes random samples of room temperature and air quality records to sensor-data topic. +**flink-consumer** reads sensor-data topic and processes them using `DataStream` API to print room temperatures and air quality exceeding a given threshold. + + +## Flink compatibility + +**Note:** This project is compatible with Flink 1.20+ and Amazon Managed Service for Apache Flink. + +### Flink API compatibility +This example uses the newer `KafkaSource` (as opposed to `FlinkKafkaConsumer`, which were deprecated with Flink 1.15). + +### Avro-generated classes + +This project uses classes generated at build-time as data objects. + +As a best practice, only the Avro schema definitions (IDL `.avdl` files in this case) are included in the project source code. + +The Avro Maven plugin generates the Java classes (source code) at build-time, during the [`generate-sources`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase. + +The generated classes are written into the `.//target/generated-sources/avro` directory and should **not** be committed with the project source. + +This way, the only dependency is on the schema definition file(s). If any change is required, the schema file is modified, and the Avro classes are re-generated automatically during the build. + +Code generation is supported by all common IDEs like IntelliJ. If your IDE does not see the Avro classes (`AirQuality` and `RoomTemperature`) when you import the project for the first time, you may manually run `mvn generate-sources` once to force source code generation from the IDE. + +### Using Avro-generated classes (SpecificRecord) in Apache Flink + +Using Avro-generated classes (`SpecificRecord`) within the flow of the Flink application (between operators) or in the Flink state has an additional benefit. Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos) these objects, without risking falling back to Kryo. + +### Running required services locally +Kafka and Confluent Schema Registry configuration is in [./docker-compose.yml](./docker-compose.yml). Start these services by running the `docker compose up` command. + +### Running in IntelliJ +To start the Flink job in IntelliJ: +1. Edit the Run/Debug configuration (Use compound configuration to run both applications at the same time). +2. Enable the option **"Add dependencies with 'provided' scope to the classpath"**. + +See this [page](../running-examples-locally.md) for more information. \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/docker-compose.yml b/java/AvroOneTopicManySubjects/docker-compose.yml new file mode 100644 index 0000000..3b8b3ea --- /dev/null +++ b/java/AvroOneTopicManySubjects/docker-compose.yml @@ -0,0 +1,43 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 22181:2181 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - 29092:29092 + - 11001:11001 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 11001 + KAFKA_JMX_HOSTNAME: kafka + JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=11001" + links: + - zookeeper + + scr: + image: confluentinc/cp-schema-registry + container_name: scr + depends_on: + - kafka + links: + - kafka + ports: + - 8085:8085 + environment: + CUB_CLASSPATH: '/usr/share/java/confluent-security/schema-registry/*:/usr/share/java/schema-registry/*:/usr/share/java/schema-registry-plugins/*:/usr/share/java/cp-base-new/*' + SCHEMA_REGISTRY_HOST_NAME: schemaregistry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 + SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085" \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/flink-consumer/pom.xml b/java/AvroOneTopicManySubjects/flink-consumer/pom.xml new file mode 100644 index 0000000..0b57790 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/pom.xml @@ -0,0 +1,252 @@ + + + 4.0.0 + + com.amazonaws + flink-consumer + 1.0-SNAPSHOT + + + UTF-8 + ${project.basedir}/target + ${project.name} + 11 + ${target.java.version} + com.amazonaws.services.msf.StreamingJob + + 2.12 + 1.10.2 + 3.3.0-1.20 + + 1.20.0 + 2.1.0 + 1.2.0 + + 1.7.32 + 2.17.2 + 5.9.1 + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-base + ${flink.version} + + + org.apache.flink + flink-connector-kafka + ${kafka.clients.version} + + + com.amazonaws + aws-kinesisanalytics-flink + ${kda.connectors.version} + + + + + org.apache.flink + flink-avro + ${flink.version} + + + org.apache.avro + avro + ${avro.version} + + + org.apache.flink + flink-avro-confluent-registry + ${flink.version} + + + org.apache.flink + flink-table-api-java + ${flink.version} + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + io.confluent + kafka-avro-serializer + 7.9.0 + + + io.confluent + kafka-schema-registry-client + 7.9.0 + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + + + ${buildDirectory} + ${jar.finalName} + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + true + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + protocol + idl-protocol + + + + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/src/test/resources/avro + private + String + true + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + ${main.class} + + + + + + + + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Configuration.java b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Configuration.java new file mode 100644 index 0000000..848dc66 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Configuration.java @@ -0,0 +1,36 @@ +package com.amazonaws.services.msf; + +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +public class Configuration { + private final String bootstrapServers; + private final String topic; + private final String consumerGroupId; + private final String schemaRegistryUrl; + + public Configuration(Properties properties) { + this.bootstrapServers = Preconditions.checkNotNull(properties.getProperty("bootstrap.servers"), "bootstrap.servers not defined"); + this.topic = Preconditions.checkNotNull(properties.getProperty("source.topic"), "source.topic not defined"); + this.consumerGroupId = properties.getProperty("source.consumer.group.id", "flink-consumer"); + this.schemaRegistryUrl = properties.getProperty("schema.registry.url", "http://localhost:8085"); + } + + + public String getBootstrapServers() { + return bootstrapServers; + } + + public String getTopic() { + return topic; + } + + public String getConsumerGroupId() { + return consumerGroupId; + } + + public String getSchemaRegistryUrl() { + return schemaRegistryUrl; + } +} diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Option.java b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Option.java new file mode 100644 index 0000000..0072e0f --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Option.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.msf; + +/** + * Option type is a generic container to hold deserialized records. + * It can be useful to introduce fields into this type to handle partitioning + * strategies and event time extraction. However, for those scenarios to work + * all subjects should have a standard set of fields. + */ +class Option { + private Object value; + + public Option() { + } + + public Option(Object value) { + this.value = value; + } + + public Object getValue() { + return this.value; + } + + public void setValue(Object value) { + this.value = value; + } +} diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/OptionDeserializationSchema.java b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/OptionDeserializationSchema.java new file mode 100644 index 0000000..5884660 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/OptionDeserializationSchema.java @@ -0,0 +1,55 @@ +package com.amazonaws.services.msf; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.subject.RecordNameStrategy; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +// Custom deserialization schema for handling multiple generic Avro record types +class OptionDeserializationSchema implements KafkaRecordDeserializationSchema