In this scenario, you will create a Quarkus application that uses the MicroProfile Reactive Messaging extension to send events to Apache Kafka.
Kafka is generally used for two broad classes of applications:
- Building real-time streaming data pipelines that reliably get data between systems or applications
- Building real-time streaming applications that transform or react to the streams of data
The Quarkus extension uses SmallRye Reactive Messaging to implement the connectors to Kafka. SmallRye is a framework for building event-driven, data streaming, and event-sourcing applications using Context and Dependency Injection (CDI) for Java.
Channels and Streams
When dealing with an event-driven or data streaming application, there are a few concepts and terms we need to understand.
In the application,
messages flow on a channel. A channel is a virtual destination identified by a name. SmallRye connects the component to a channel they read and to a channel they populate. The resulting structure is a stream: Messages flow between components through channels.
An application interacts with an event broker, which transmits messages using connectors. A connector is a piece of code that connects to a broker to:
- Receive messages from the event broker and propagate them to the application
- Send messages provided by the application to the broker
To achieve this, connectors are configured to map incoming messages to a specific channel (consumed by the application), and to collect outgoing messages sent to a specific channel by the application.
Each connector has a name. This name is referenced by the application to indicate that a specific channel is managed by this connector.
Apache Kafka Connector
A Kafka connector adds support for Kafka to SmallRye. With it you can receive Kafka Records as well as write
message into Kafka.
The Kafka Connector is based on the Vert.x Kafka Client.
Quarkus is a full-stack, Kubernetes-native Java framework made for Java virtual machines (JVMs) and native compilation, optimizing Java specifically for containers and enabling it to become an effective platform for serverless, cloud, and Kubernetes environments.
This scenario has shown you how to develop with Quarkus to connect to Apache Kafka using the SmallRye Reactive Messaging to build data streaming applications.
To learn more about and getting started:
Apache Kafka with Reactive Messaging
Adding a Quarkus Reactive Messaging Extension
You start this scenario with a basic Maven-based application, which is created using the Quarkus maven plugin.
Add an extension to integrate with Kafka
The current project needs the extensions to be added to integrate Quarkus with Apache Kafka.
Change to the project folder:
Install the extension into the project with the following command:
mvn quarkus:add-extension -Dextension="quarkus-smallrye-reactive-messaging-kafka"
The first time you add the extension will take longer, as Maven downloads new dependencies.
This will add the necessary entries in your
pom.xml to bring in the Kafka extension. You should see a fragment similar to this around line 50:
... <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId> </dependency> ...
Configure a channel to integrate with the event broker
Next, we need to configure the application to define how are we going to connect to the event broker.
The MicroProfile Reactive Messaging properties are structured as follows:
channel-name segment must match the value set in the
@Outgoing annotations. To indicate that a channel is managed by the Kafka connector we need:
src/main/resources/application.properties file to add the following configuration:
# Configuration file kafka.bootstrap.servers=my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092 mp.messaging.outgoing.uber.connector=smallrye-kafka mp.messaging.outgoing.uber.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.uber.value.serializer=org.apache.kafka.common.serialization.StringSerializer
You can click Copy to Editor to add the values into the file
You can see we added the kafka bootstrap server hostname and port for the broker locations and the configuration for a channel named
value serializers are part of the Producer configuration and Consumer configuration to encode the message payload.
You don’t need to set the Kafka topic. By default, it uses the channel name (
uber). You can, however, configure the topic attribute to override it.