-
Notifications
You must be signed in to change notification settings - Fork 35
Sample to show how process different types of avro subjects in a single topic #98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many thanks for the contribution.
I would suggest some changes to make it simpler to understand, and also not to hint any not-so-good practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.gitignore in the subfolder is not required.
There is a gitignore at top level. If there is anything missing there please update that one
* Flink API: DataStream API | ||
* Language: Java (11) | ||
|
||
This example demonstrates how to serialize/deserialize Avro messages in Kafka when one topic stores multiple subject types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain this is specific to Confluent Schema Registry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
|
||
* Flink version: 1.20 | ||
* Flink API: DataStream API | ||
* Language: Java (11) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually add to the list the connectors used in the example. In this case, it's also important to add that the example uses AVRO Confluent Schema Registry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
|
||
This example uses Avro-generated classes (more details [below](#using-avro-generated-classes)). | ||
|
||
A `KafkaSource` produces a stream of Avro data objects (`SpecificRecord`), fetching the writer's schema from AWS Glue Schema Registry. The Avro Kafka message value must have been serialized using AWS Glue Schema Registry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mean Confluent. Schema Registry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); | ||
env.enableCheckpointing(60000); | ||
} | ||
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not really required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
env.execute("avro-one-topic-many-subjects"); | ||
} | ||
|
||
private static void setupAirQualityGenerator(String bootstrapServers, String sourceTopic, String schemaRegistryUrl, Map<String, Object> schemaRegistryConfig, StreamExecutionEnvironment env) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though having the data generator within the same Flink app works, we are deliberately avoiding doing it in any of the examples. The reason is that building jobs with multiple dataflows is strongly discouraged.
We are avoiding using any bad practice in examples, not to suggest it may be a good idea doing it.
I reckon it's more complicated, but you can add a separate module with a standalone Java application which generates data. Something similar to what we do in this example, even though in that case it's Kinesis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
* strategies and event time extraction. However, for those scenarios to work | ||
* all subjects should have a standard set of fields. | ||
*/ | ||
class Option { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can use org.apache.flink.types.SerializableOptional<T>
that comes with Flink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option
type in this PR is a container type to hold any possible deserialized value. SerializableOptional<T>
is for optional values. I guess it would not be the right choice here? Am I missing something? 👀 🙏🏾
BTW: I could have used Object
instead of creating an Option
with the Object
type value
field. However, having Option
type helps if we want to generate watermarks via source operator using a common timestamp field.
Is there a better way to do this?
} | ||
|
||
// Custom deserialization schema for handling multiple generic Avro record types | ||
class OptionDeserializationSchema implements KafkaRecordDeserializationSchema<Option> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, move to a top level class for readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
} | ||
} | ||
|
||
class RecordNameSerializer<T> implements KafkaRecordSerializationSchema<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to top level class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); | ||
|
||
Properties applicationProperties = loadApplicationProperties(env).get(APPLICATION_CONFIG_GROUP); | ||
String bootstrapServers = Preconditions.checkNotNull(applicationProperties.getProperty("bootstrap.servers"), "bootstrap.servers not defined"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code building the dataflow is a bit hard to follow.
I would suggest to do what we tend to do in other examples
- In runtime configuration, use a PropertyGroup for each source and sink, even if some configurations are repeated
- Instantiate Source and Sink in a local method, the
Properties
which contains all configuration for that specific component. Extract specific properties, like topic name, within the method rather than in the main() directly - Build the dataflow just attaching the operators one after the others, using intermediate streams variables only when it helps readability
- Avoid having methods that attach operators to the dataflow. Practically, any method which expects a
DataStream
orStreamingExecutionEnvironment
as a parameter should be avoided. - If an operator implementation like a map for a filter is simple, try using a lambda and inlining it. If the operator implementation is complex externalize the implementation to a separate class
See examples here
We are not following these patterns in all examples, but we are trying to converge as possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
@nicusX Thanks for reviewing this PR. I've addressed your points. Could you please take another look? 🙏🏾 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nicusX Do you think this is a good approach to share run configurations with IntelliJ users?
No description provided.