The microprofile-reactive-messaging-kafka quickstart demonstrates the use of the MicroProfile Reactive Messaging specification backed by Apache Kafka in WildFly.

What is it?

MicroProfile Reactive Messaging is a framework for building event-driven, data streaming, and event-sourcing applications using CDI. It lets your application interact using messaging technologies such as Apache Kafka.

The implementation of MicroProfile Reactive Messaging is built on top of MicroProfile Reactive Streams Operators. Reactive Streams Operators extends the Reactive Streams specification, by adding operators such as map(), filter() and others.

Architecture

In this quickstart, we have a CDI bean that demonstrates the functionality of the MicroProfile Reactive Messaging specification. Currently, we support Reactive Messaging 1.0. Connections to external messaging systems such as Apache Kafka are configured via MicroProfile Config. We will also use Reactive Streams Operators to modify the data relayed in these streams in the methods handling the Reactive Messaging streams.

System Requirements

The application this project produces is designed to be run on WildFly Application Server 33 or later.

All you need to build this project is Java 11.0 (Java SDK 11) or later and Maven 3.6.0 or later. See Configure Maven to Build and Deploy the Quickstarts to make sure you are configured correctly for testing the quickstarts.

Use of the WILDFLY_HOME and QUICKSTART_HOME Variables

In the following instructions, replace WILDFLY_HOME with the actual path to your WildFly installation. The installation path is described in detail here: Use of WILDFLY_HOME and JBOSS_HOME Variables.

When you see the replaceable variable QUICKSTART_HOME, replace it with the path to the root directory of all of the quickstarts.

Start the WildFly Standalone Server

  1. Open a terminal and navigate to the root of the WildFly directory.

  2. Start the WildFly server with the MicroProfile profile by typing the following command.

    $ WILDFLY_HOME/bin/standalone.sh -c standalone-microprofile.xml
    Note
    For Windows, use the WILDFLY_HOME\bin\standalone.bat script.

Solution

We recommend that you follow the instructions that create the application step by step. However, you can also go right to the completed example which is available in this directory.

Caution
Kafka must be running before attempting to deploy the Quickstart application. See Running the Apache Kafka Service for how to do this in your local environment.

Build and Deploy the Quickstart

  1. Make sure WildFly server is started.

  2. Open a terminal and navigate to the root directory of this quickstart.

  3. Type the following command to build the quickstart.

    $ mvn clean package
  4. Type the following command to deploy the quickstart.

    $ mvn wildfly:deploy

This deploys the microprofile-reactive-messaging-kafka/target/microprofile-reactive-messaging-kafka.war to the running instance of the server.

You should see a message in the server log indicating that the archive deployed successfully.

Undeploy the Quickstart

When you are finished testing the quickstart, follow these steps to undeploy the archive.

  1. Make sure WildFly server is started.

  2. Open a terminal and navigate to the root directory of this quickstart.

  3. Type this command to undeploy the archive:

    $ mvn wildfly:undeploy

Run the Integration Tests

This quickstart includes integration tests, which are located under the src/test/ directory. The integration tests verify that the quickstart runs correctly when deployed on the server.

Follow these steps to run the integration tests.

  1. Make sure WildFly server is started.

  2. Make sure the quickstart is deployed.

  3. Type the following command to run the verify goal with the integration-testing profile activated.

    $ mvn verify -Pintegration-testing 

Run the Quickstart in Red Hat CodeReady Studio or Eclipse

You can also start the server and deploy the quickstarts or run the Arquillian tests in Red Hat CodeReady Studio or from Eclipse using JBoss tools. For general information about how to import a quickstart, add a WildFly server, and build and deploy a quickstart, see Use Red Hat CodeReady Studio or Eclipse to Run the Quickstarts.

Creating the Maven Project

mvn archetype:generate \
    -DgroupId=org.wildfly.quickstarts \
    -DartifactId=microprofile-reactive-messaging-kafka \
    -DinteractiveMode=false \
    -DarchetypeGroupId=org.apache.maven.archetypes \
    -DarchetypeArtifactId=maven-archetype-webapp
cd microprofile-reactive-messaging-kafka

Open the project in your favourite IDE.

The project needs to be updated to use Java 8 as the minimum:

<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

Next set up our dependencies. Add the following section to your pom.xml:

<dependencyManagement>
  <dependencies>
    <!-- importing the ee-with-tools BOM adds specs and other useful artifacts as managed dependencies -->
    <dependency>
        <groupId>org.wildfly.bom</groupId>
        <artifactId>wildfly-ee-with-tools</artifactId>
        <version>33.0.0.Final</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>

    <!-- importing the microprofile BOM adds MicroProfile specs -->
    <dependency>
        <groupId>org.wildfly.bom</groupId>
        <artifactId>wildfly-microprofile</artifactId>
        <version>33.0.0.Final</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

By using boms the majority of dependencies used within this quickstart align with the version used by the application server.

Now we need to add the dependencies which are needed by what is the focus of this QuickStart (CDI, MicroProfile Reactive Messaging, Reactive Streams Operators, Reactive Streams and the Apache Kafka Client). Additionally we need to add dependencies for 'supporting' functionality (JPA, JTA and JAX-RS):

<dependencies>
    <!-- Core dependencies -->

    <!-- Import the CDI API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>jakarta.enterprise</groupId>
        <artifactId>jakarta.enterprise.cdi-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <!-- Import the Kafka Client API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <scope>provided</scope>
    </dependency>
    <!-- Import the SmallRye Kafka API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>io.smallrye.reactive</groupId>
        <artifactId>smallrye-reactive-messaging-kafka-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <!-- Import the Reactive Messaging API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>org.eclipse.microprofile.reactive.messaging</groupId>
        <artifactId>microprofile-reactive-messaging-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <!-- Import the Reactive Streams Operators API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
        <artifactId>microprofile-reactive-streams-operators-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <!-- Import the Reactive Streams Operators API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>org.reactivestreams</groupId>
        <artifactId>reactive-streams</artifactId>
        <scope>provided</scope>
    </dependency>

    <!-- 'Supporting' dependencies -->

    <!-- Import the Persistence API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>jakarta.persistence</groupId>
        <artifactId>jakarta.persistence-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <!-- Import the Annotations API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>jakarta.annotation</groupId>
        <artifactId>jakarta.annotation-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <!-- Import the Persistence API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>jakarta.transaction</groupId>
        <artifactId>jakarta.transaction-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <!-- Import the JAX-RS API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>jakarta.ws.rs</groupId>
        <artifactId>jakarta.ws.rs-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <!-- Import the resteasy-jaxrs API, we use provided scope as the API is included in WildFly -->
    <dependency>
        <groupId>org.jboss.resteasy</groupId>
        <artifactId>resteasy-jaxrs</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

All dependencies have the 'provided' scope.

As we are going to be deploying this application to the WildFly server, let’s also add a maven plugin that will simplify the deployment operations (you can replace the generated build section):

<build>
  <!-- Set the name of the archive -->
  <finalName>${project.artifactId}</finalName>
  <plugins>
    <!-- Allows to use mvn wildfly:deploy -->
    <plugin>
      <groupId>org.wildfly.plugins</groupId>
      <artifactId>wildfly-maven-plugin</artifactId>
    </plugin>
  </plugins>
</build>

Setup the required Maven repositories (if you don’t have them set up in Maven global settings):

<repositories>
    <repository>
        <id>jboss-public-maven-repository</id>
        <name>JBoss Public Maven Repository</name>
        <url>https://repository.jboss.org/nexus/content/groups/public</url>
        <layout>default</layout>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>never</updatePolicy>
        </releases>
        <snapshots>
            <enabled>true</enabled>
            <updatePolicy>never</updatePolicy>
        </snapshots>
    </repository>
    <repository>
        <id>redhat-ga-maven-repository</id>
        <name>Red Hat GA Maven Repository</name>
        <url>https://maven.repository.redhat.com/ga/</url>
        <layout>default</layout>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>never</updatePolicy>
        </releases>
        <snapshots>
            <enabled>true</enabled>
            <updatePolicy>never</updatePolicy>
        </snapshots>
    </repository>
</repositories>
<pluginRepositories>
    <pluginRepository>
        <id>jboss-public-maven-repository</id>
        <name>JBoss Public Maven Repository</name>
        <url>https://repository.jboss.org/nexus/content/groups/public</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </pluginRepository>
    <pluginRepository>
        <id>redhat-ga-maven-repository</id>
        <name>Red Hat GA Maven Repository</name>
        <url>https://maven.repository.redhat.com/ga/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </pluginRepository>
</pluginRepositories>

Now we are ready to start working with MicroProfile Reactive Messaging.

Preparing the Application

This will walk you through the steps to write our application. They are:

  • Create a generator for the generated messages. We will create something which mocks a call to an asynchnronous resource.

  • Add a data object which wraps the generated messages and adds a timestamp. We will use JPA annotations on this to make it persistable.

  • Create our CDI bean interfacing with the Kafka streams via annotated methods. For the streams that are managed by Kafka we will provide a MicroProfile Config file to configure how to interact with Kafka. It will log, filter and store entries to a RDBMS.

  • Create a CDI bean that will be used to store and retrieve entries from a RDBMS.

  • Create a JAX-RS endpoint to return the data that was stored in the RDBMS to the user.

Adding our Data Generator

Copy across the microprofile-reactive-messaging-kafka/src/main/java/org/wildfly/quickstarts/microprofile/reactive/messaging/MockExternalAsyncResource.java file to your project. This class mocks a call to an asynchronous external resource. The details of how it is implemented are not important for this QuickStart.

MockExternalAsyncResource has one callable method:

public CompletionStage<String> getNextValue()

The CompletionStage returned by this method will complete with a String when ready. A String is ready every two seconds. It will emit the following Strings in the given order:

  • Hello

  • World

  • Reactive

  • Messaging

  • with

  • Kafka

After this initial sequence the returned CompletionStage will complete with a random entry from the above list. A new entry is available every two seconds.

Adding a Data Object

Later we will wrap the strings in a TimedEntry object which contains the String and a timestamp. Since we will be storing it in a database, we add JPA annotations to it:

package org.wildfly.quickstarts.microprofile.reactive.messaging;

import java.sql.Timestamp;

import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.Id;

@Entity
public class TimedEntry {
    private Long id;
    private Timestamp time;
    private String message;

    public TimedEntry() {

    }

    public TimedEntry(Timestamp time, String message) {
        this.time = time;
        this.message = message;
    }

    @Id
    @GeneratedValue
    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Timestamp getTime() {
        return time;
    }

    public void setTime(Timestamp time) {
        this.time = time;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    @Override
    public String toString() {
        String s = "TimedEntry{";
        if (id != null) {
            s += "id=" + id + ", ";
        }
        s += "time=" + time +
                ", message='" + message + '\'' +
                '}';
        return s;
    }
}

Adding our Messaging CDI Bean

MicroProfile Reactive Messaging is based on CDI, so all interaction with the MicroProfile Reactive Messaging streams must happen from a CDI beans. Note: The beans must have either the @ApplicationScoped or @Dependent scopes.

Then within these beans we have a set of methods using the @Incoming and @Outgoing annotations from the MicroProfile Reactive Messaging specification to map the underlying streams. For an @Outgoing annotation its value specifies the Reactive Messaging stream to send data to, and for an @Incoming annotation its value specifies the Reactive Messaging stream to read data from. Although in this QuickStart we are putting all these methods into one CDI bean class, they could be spread over several beans.

The MicroProfile Reactive Messaging specification contains a full list of all the valid method signatures for such @Incoming/@Outging methods. We will use a few of them, and see how they make different use-cases easier.

Our bean looks as follows, and this is the main focus for the functionality in this QuickStart. We will also be using some other technologies, but they are secondary to this section. Explanations of each method will be given in line.

package org.wildfly.quickstarts.microprofile.reactive.messaging;

import java.sql.Timestamp;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;


@ApplicationScoped
public class MessagingBean {

First we inject our mock asynchronous external data producer, which produces a string every two seconds. We explained this class in a previous section.

    @Inject
    MockExternalAsyncResource producer;

We inject a bean that will be used to persist entries to a RDBMS later on.

    @Inject
    DatabaseBean dbBean;

Now we get to the reactive messaging part.

Our first method is a 'producer' method, since is annotated with the @Outgoing annotation. It simply relays the output of our mock producer bean to the reactive messaging system. It uses the channel source (as indicated in the @Outgoing annotation’s value) as the target stream to send the data to. You can think of 'producer' methods as the entry point to push data into the reactive messaging streams.

    @Outgoing("source")
    public CompletionStage<String> sendInVm() {
        return producer.getNextValue();
    }

Next we have a 'processor' method. It is annotated with both @Incoming and @Outgoing annotations so it gets data from the reactive messaging streams and then pushes it to another stream. Essentially it simply relays data.

In this case we get data from the source stream, so it will receive the entries made available by the sendInVm() method above, and forwards everything onto the filter stream.

In this case, since we just want to log the strings that were emitted, we are using a simple method signature receiving and returning the raw string provided. The Reactive Messaging implementation has unwrapped the CompletionStage from the previous method for us.

Note that there is no Kafka involved yet. Since the @Incoming and @Outgoing values match up, Reactive Messaging will use internal, in-memory streams.

    @Incoming("source")
    @Outgoing("filter")
    public String logAllMessages(String message) {
        System.out.println("Received " + message);
        return message;
    }

Now we have another 'processor' method. We get the data from the filter stream (what was relayed by the previous logAllMessages() method) and forward it on to the sender stream.

In this method we want to do something a bit more advanced, namely apply a filter to the messages. We use a method receiving and returning a Reactive Stream, in this case we use PublisherBuilder from the MicroProfile Reactive Streams Operators specification. PublisherBuilder extends the Publisher interface from the Reactive Streams specification, and provides us with the filter() methods we will use here.

Again the Reactive Messaging implementation does all the wrapping for us.

Our filter method tells us to only relay messages that match Hello or Kafka, and drop everything else. In other words, later methods in the stream will only receive occurrences of Hello or Kafka.

    @Incoming("filter")
    @Outgoing("sender")
    public PublisherBuilder<String> filter(PublisherBuilder<String> messages) {
        return messages
                .filter(s -> s.equals("Hello") || s.equals("Kafka"));
    }

Next we have another 'processor' method, which receives data from the sender stream and forwards it on to the to-kafka stream. It’s parameter is a simple String, MicroProfile Reactive Messaging will unwrap the stream from the PublisherBuilder returned in the previous method and call this next method with the individual entries.

In this method we want to wrap up the data into the TimedEntry class we defined earlier, so we have tuple of the message and a timestamp.

Additionally we want to set a Kafka key for the entries so that we can take advantage of Kafka’s querying capabilities (not done in this quickstart). In order to do this, we do the following steps:

  • Create an instance of Message from the MicroProfile Reactive Messaging API. A Message is a simple wrapper around the payload. In our case we use the TimedEntry instance we created.

  • We create the key for the TimedEntry. In this case we just use a hash of the message.

  • Use the OutgoingKafkaRecordMetadata builder to create an instance of OutgoingKafkaRecordMetadata with the key

  • Next we call KafkaMetadataUtil.writeOutgoingKafkaMetadata() to augment the Message with the OutgoingKafkaRecordMetadata. Note that the Message passed in to KafkaMetadataUtil.writeOutgoingKafkaMetadata() is not modified, we need the returned one.

  • We return the augmented Message to the stream which is backed by Kafka

    @Incoming("sender")
    @Outgoing("to-kafka")
    public Message<TimedEntry> sendToKafka(String msg) {
        // Simpler example for debugging
        TimedEntry te = new TimedEntry(new Timestamp(System.currentTimeMillis()), msg);
        Message<TimedEntry> m = Message.of(te);
        // Just use the hash as the Kafka key for this example
        int key = te.getMessage().hashCode();

        // Create Metadata containing the Kafka key
        OutgoingKafkaRecordMetadata<Integer> md = OutgoingKafkaRecordMetadata
                .<Integer>builder()
                .withKey(key)
                .build();

        // The returned message will have the metadata added
        return KafkaMetadataUtil.writeOutgoingKafkaMetadata(m, md);
    }

Our final method is a 'consumer' method, as it has only an @Incoming annotation. You can think of this as a 'final destination' for the data in your application. We are using a Message<TimedEntry> as our method parameter. We are using this signature since we want to access the IncomingKafkaRecordMetadata, which contains the key we added in the previous method and additional information such as the Kafka partition and topic the message was sent on. Since we are using the signature taking a Message as the parameter, we need to ack() the message and return the resulting CompletionStage<Void>. (If we don’t want to ack the receipt of the message and are not interested in the IncomingKafkaRecordMetadata, we could have used a simpler signature such as void receiveFromKafka(TimedEntry message).)

The methid calls through to our dbBean to store the received data in a RDBMS. We will look at this briefly later.

    @Incoming("from-kafka")
    public CompletionStage<Void> receiveFromKafka(Message<TimedEntry> message) {
        TimedEntry payload = message.getPayload();

        IncomingKafkaRecordMetadata<Integer, TimedEntry> md =
            KafkaMetadataUtil.readIncomingKafkaMetadata(message).get();
        String msg =
                "Received from Kafka, storing it in database\n" +
                "\t%s\n" +
                "\tkey: %d; partition: %d, topic: %s";
        msg = String.format(msg, payload, md.getKey(), md.getPartition(), md.getTopic());
        System.out.println(msg);
        dbBean.store(payload);
        return message.ack();
    }} // MessagingBean - END

You might have noticed that up to, and including, the sendToKafka() method the @Incoming.value() matches the @Outgoing.value() of the prior method. This indicates that these streams (source, filter and sender) are handled in memory by the Reactive Messaging implementation.

For the last two methods this is different and there is no such pairing. The sendToKafka() method sends its data to the to-kafka stream:

    ...
    @Outgoing("to-kafka")
    public Publisher<TimedEntry> sendToKafka(Publisher<String> messages) {
       ...
    }

However, there are no methods annotated with @Incoming("to-kafka).

And the receiveFromKafka() method is expecting to receive data from the from-kafka stream:

    @Incoming("from-kafka")
    public void receiveFromKafka(TimedEntry message) {
       ...
    }

Again, there are no methods annotated with @Outgoing("from-kafka").

These 'unpaired' sets of methods indicate that we do not want to use an in-memory stream, and want to use an external system for these streams. If we were try to deploy the MessagingBean in this state the application would fail to deploy. To fix this, and tell it what to map onto, we need to provide some configuration.

Note
IncomingKafkaRecordMetadata is only available on incoming streams coming from Kafka. OutgoingKafkaRecordMetadata will only have effect on outgoing streams going to Kafka.

Mapping Streams to Kafka using MicroProfile Config

To map 'unpaired' streams onto Kafka we need to add a MicroProfile Config file to configure these streams.

Create a file called src/main/resources/META-INF/microprofile-config.properties and add the following:

mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=testing
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.value.serializer=org.wildfly.quickstarts.microprofile.reactive.messaging.TimedEntrySerializer

# Configure the Kafka source (we read from it)
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=testing
mp.messaging.incoming.from-kafka.value.deserializer=org.wildfly.quickstarts.microprofile.reactive.messaging.TimedEntryDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

The MicroProfile Reactive Messaging specification mandates the following pre-fixes:

  • mp.messaging.connector. - used to set overall configuration for your application.

  • mp.messaging.outgoing. - used to configure streams we are writing to from methods annotated with @Outgoing. The next element determines the name of the stream as identified in the @Outgoing annotation so all the properties starting with mp.messaging.outgoing.to-kafka are used to configure the writing done by the sendToKafka() method which is annotated with @Outgoing("to-kafka").

  • mp.messaging.incoming. - used to configure streams we are reading from in methods annotated with @Incoming. The next element determines the name of the stream as identified in the @Incoming annotation so all the properties starting with mp.messaging.incoming.from-kafka are used to configure the reading done by the receiveFromKafka() method which is annotated with @Incoming("from-kafka").

What comes after these prefixes is vendor dependent. We use the SmallRye implementation of MicroProfile Reactive Messaging.

At the application level, the mp.messaging.connector.smallrye-kafka.bootstrap.servers property says that all conections to Kafka in this application should go to localhost:9092. This is not strictly necessary, since this value is the default that would be used if not specified. If we wanted to override this for say the @Outgoing("to-kafka") annotated sendToKafka() method we could specify this with a property such as:

mp.messaging.outgoing.to-kafka.bootstrap.servers=otherhost:9092

For the incoming and outgoing properties we can see that they all specify that they should use the smallrye-kafka connector and that the outgoing one writes to the same topic, testing, as the incoming one reads from.

We see that the outgoing configuration uses a TimedEntrySerializer while the incoming one uses TimedEntryDeserializer for the values. Kafka just deals with byte streams, so we need to tell it how to serialize the data we are sending and how to deserialize the data we are receiving. As seen is configured with properties of the form mp.messaging.outgoing.<stream name>.value.serializer and mp.messaging.incoming.<stream name>.value.deserializer. The org.apache.kafka.common.serialization package contains implementations of serializers and deserializers for simple data types and constructs such as maps.

Finally, since the Kafka key is an Integer, we use IntegerSerializer and IntegerDeserializer for the keys. The concept is exactly the same as for the value (de)serializers, but is instead configured with the properties mp.messaging.outgoing.<stream name>.key.serializer and mp.messaging.incoming.<stream name>.key.deserializer.

Custom (De)Serializers

In our case the data we are sending to and receiving from Kafka is not a simple object. It is an object of a class defined in our application, so we need to define our own serialization and deserialization. Luckily, this is easy. We just need to implement the org.apache.kafka.common.serialization.Serializer and org.apache.kafka.common.serialization.Deserializer interfaces.

Here is our TimedEntrySerializer:

package org.wildfly.quickstarts.microprofile.reactive.messaging;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

import org.apache.kafka.common.serialization.Serializer;

public class TimedEntrySerializer implements Serializer<TimedEntry> {
    @Override
    public byte[] serialize(String topic, TimedEntry data) {
        if (data == null) {
            return null;
        }

        try {
            ByteArrayOutputStream bout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(bout);
            out.writeLong(data.getTime().getTime());
            out.writeUTF(data.getMessage());
            out.close();
            return bout.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }

    }
}

And here is our TimedEntryDeserializer:

package org.wildfly.quickstarts.microprofile.reactive.messaging;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.sql.Timestamp;

import org.apache.kafka.common.serialization.Deserializer;

public class TimedEntryDeserializer implements Deserializer<TimedEntry> {

    @Override
    public TimedEntry deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data))){
            Timestamp time = new Timestamp(in.readLong());
            String message = in.readUTF();
            return new TimedEntry(time, message);
        } catch (IOException e){
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

As you can see the serializer writes the time as a Long, and the message as a string, and the deserializer reads them in the same order. Then in our microprofile-config.properties above we saw how to make Kafka use our classes for serialization and deserialization.

Storing Data in an RDBMS

We have covered all the reactive messaging parts, but have missed out how the MessagingBean.receiveFromKafka() stores data via the DatabaseBean. This is not the focus of this QuickStart, so we will just mention how this works quickly.

This is the definition of DatabaseBean:

package org.wildfly.quickstarts.microprofile.reactive.messaging;

import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import jakarta.persistence.TypedQuery;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class DatabaseBean {

    @PersistenceContext(unitName = "test")
    EntityManager em;

    @Transactional
    public void store(Object entry) {
        em.persist(entry);
    }

    public List<TimedEntry> loadAllTimedEntries() {
        TypedQuery<TimedEntry> query = em.createQuery("SELECT t from TimedEntry t", TimedEntry.class);
        List<TimedEntry> result = query.getResultList();
        return result;
    }
}

It injects an EntityManager for the persitence context test, which is defined in the src/main/resources/META-INF/persistence.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.2"
             xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="
        http://xmlns.jcp.org/xml/ns/persistence
        http://xmlns.jcp.org/xml/ns/persistence/persistence_2_2.xsd">
    <persistence-unit name="test">
        <jta-data-source>java:jboss/datasources/ExampleDS</jta-data-source>
        <properties>
            <property name="hibernate.hbm2ddl.auto" value="create-drop"/>
        </properties>
    </persistence-unit>
</persistence>

The DatabaseBean.store() method saves the TimedEntry and the DatabaseBean.loadAllTimedEntries() method loads all the ones we stored.

It is worth pointing out that the @Incoming and @Outgoing annotated methods called by the Reactive Messaging implementation (such as MessagingBean.receiveFromKafka()) happen outside of user space, so there is no @Transaction associated with them. So we need to annotated the DatabaseBean.store() method with @Transactional in order to save our entry to the database.

Viewing the Data Stored in the RDBMS

Finally, we would like to be able to view the data that was stored in the database. To do this we will add a JAX-RS endpoint that queries the database by calling DatabaseBean.loadAllTimedEntries().

package org.wildfly.quickstarts.microprofile.reactive.messaging;

import java.util.List;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/")
public class RootResource {

    @Inject
    DatabaseBean dbBean;

    @GET
    @Path("/db")
    @Produces(MediaType.TEXT_PLAIN)
    public String getDatabaseEntries() {
        List<TimedEntry> entries = dbBean.loadAllTimedEntries();
        StringBuffer sb = new StringBuffer();
        for (TimedEntry t : entries) {
            sb.append(t);
            sb.append("\n");
        }
        return sb.toString();
    }
}

We expose our JAX-RS application at the context path:

package org.wildfly.quickstarts.microprofile.reactive.messaging;

import jakarta.ws.rs.ApplicationPath;
import jakarta.ws.rs.core.Application;

@ApplicationPath("/")
public class JaxRsApplication extends Application {
}

Interaction with User Initiated Code

So far what we have seen is really happening in the back-end with little user interaction. The MicroProfile Reactive Messaging 2.0 specification adds a @Channel annotation and an Emitter interface which makes it easier to send data to MicroProfile Reactive Messaging streams from user initiated code and to receive data from Reactive Messaging streams.

To showcase this functionality we add another CDI bean called UserMessagingBean:

package org.wildfly.quickstarts.microprofile.reactive.messaging;

import java.util.concurrent.CopyOnWriteArraySet;

import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.FormParam;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.core.Response;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@ApplicationScoped
public class UserMessagingBean {

    @Inject
    @Channel("user")
    private Emitter<String> emitter;

    private BroadcastPublisher<String> broadcastPublisher;

    public UserMessagingBean() {
        //Needed for CDI spec compliance
        //The @Inject annotated one will be used
    }

    @Inject
    public UserMessagingBean(@Channel("user") Publisher<String> receiver) {
        this.broadcastPublisher = new BroadcastPublisher(receiver);
    }

    @PreDestroy
    public void destroy() {
        broadcastPublisher.close();
    }

    public Response send(String value) {
        System.out.println("Sending " + value);
        emitter.send(value);
        return Response.accepted().build();
    }

    public Publisher<String> getPublisher() {
        return broadcastPublisher;
    }

    private class BroadcastPublisher<T> implements Publisher<T> {
        // See source code for more details
    }
}

Looking at this in more detail, the following field

    @Inject
    @Channel("user")
    private Emitter<String> emitter;

is used to send data to the MicroProfile Reactive Messagin stream called user, which is done in the following method

    public Response send(String value) {
        System.out.println("Sending " + value);
        emitter.send(value);
        return Response.accepted().build();
    }

@Inject @Channel on an Emitter can be considered similiar to an @Outgoing annotated method but with the data coming from code paths invoked by user interaction. In this case we are not using Kafka to back the stream but if we wanted to, for this example, the MicroProfile Config properties would be prefixed with the mp.messaging.outgoing.user. prefix.

Next we have the constructor where we inject a Publisher (We could also have used a MicroProfile Reactive Streams Operators PublisherBuiilder) to define the receiving side.

    @Inject
    public UserMessagingBean(@Channel("user") Publisher<String> receiver) {
        this.broadcastPublisher = new BroadcastPublisher(receiver);
    }

We store this injected Publisher in the broadcastPublisher field. We will come back to why we are wrapping it in a BroadcastPublisher in a second. So an @Inject @Channel on a Publisher (or PublisherBuilder) can be considered equivalent to use of the @Incoming annotation. In this case we are listening to the user in memory stream so messages sent via the Emitter will be received on this Publisher. If instead we wanted to configure it to send via Kafka we would use MicroProfile Config properties prefixed with the mp.messaging.incoming.user. prefix.

There are a few caveats on this mechanism though:

  1. There must be an active Subscription (from the Reactive Streams specification) on the channel before the Emitter.send() method is called.

  2. There can only be one Subscription on the injected Publisher. This means that we cannot simply return this Publisher as is via an asynchronous JAX-RS endpoint as each client request would result in a separate Subscription.

The above two points will hopefully be fixed in a future version of the specification. For the purposes of this quickstart we are bypassing the above limitations by creating the BroadcastPublisher class and wrapping the original Publisher in that instead. Note that implementing Publisher is hard, and that BroadcastPublisher should be considered a 'proof of concept'. For more details about the BroadcastPublisher see the source code of the UserMessagingBean class. In a nutshell what it does is:

  • Its constructor subscribes to the injected Publisher to avoid the first problem

  • When code subscribes to it, it handles the Subscription on its own level, and forwards on code received on the ' a separate level and forwards on data received from the base Subsciption created in the constructor.

Finally we have a JAX-RS endpoint

package org.wildfly.quickstarts.microprofile.reactive.messaging;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import org.jboss.resteasy.annotations.Stream;
import org.reactivestreams.Publisher;

@ApplicationScoped
@Path("/user")
@Produces(MediaType.TEXT_PLAIN)
public class UserResource {
    @Inject
    UserMessagingBean bean;

    @POST
    @Path("{value}")
    @Consumes(MediaType.TEXT_PLAIN)
    public Response send(@PathParam("value") String value) {
        bean.send(value);
        return Response.ok().build();
    }

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @Stream
    public Publisher<String> get() {
        return bean.getPublisher();
    }
}

It simply delegates the values received from POST requests under /user/<value> onto the bean which sends them via the Emitter.

Then GET requests for /user return the BroadCastPublisher to the user who will then receive data received on the MicroProfile Reactive Messaging channel.


Now you should be able to compile the application and prepare it for deployment.

Running the Application on a Standalone WildFly Server

We need to first start Apache Kafka. Then we will run the WildFly server and deploy our application to the server.

Running the Apache Kafka Service

To run Apache Kafka locally you will need to download it first. Extract the zip to a location, and enter that directory in a terminal window. Then enter the following command to start Apache Zookeeper:

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

Then open a new terminal window and go to the same directory. In this terminal, to start a Kafka instance, enter the command:

$ ./bin/kafka-server-start.sh config/server.properties
Note
Zookeeper and Kafka must be left running, so use a new terminal for other commands outlined in this quickstart.
Note
Kafka is not supported on Microsoft Windows, but Docker may be used to start a Kafka service on it.

Build and Deploy the Initial Application

Let’s check that our application works!

  1. Make sure the WildFly server is started as described above.

  2. WildFly ships with all the modules to run MicroProfile Reactive Messaging applications with Kafka, but the functionality is not enabled out of the box, so we need to enable it. Run: $ WILDFLY_HOME/bin/jboss-cli.sh --connect --file=enable-reactive-messaging.cli to set everything up. The enable-reactive-messaging.cli file used can be found here. NOTE: This is only required if running against the downloaded server. If the microprofile-reactive-messaging-kafka layer is provisioned, as is done by the openshift and bootable-jar maven profiles, the Kafka functionality is there

  3. Open new terminal and navigate to the root directory of your project.

  4. Type the following command to build and deploy the project:

$ mvn clean package wildfly:deploy

Now we should see output in the server console. First, we see output for the ones in the determined order:

14:24:39,197 INFO  [stdout] (vert.x-eventloop-thread-0) Received from Kafka, storing it in database
14:24:39,197 INFO  [stdout] (vert.x-eventloop-thread-0) 	TimedEntry{time=2021-08-06 14:24:39.183, message='Hello'}
14:24:39,197 INFO  [stdout] (vert.x-eventloop-thread-0) 	key: 69609650; partition: 0, topic: testing
14:24:41,185 INFO  [stdout] (pool-22-thread-1) Received world
14:24:43,183 INFO  [stdout] (pool-22-thread-1) Received Reactive
14:24:45,183 INFO  [stdout] (pool-22-thread-1) Received Messaging
14:24:47,183 INFO  [stdout] (pool-22-thread-1) Received with
14:24:49,182 INFO  [stdout] (pool-22-thread-1) Received Kafka
14:24:49,188 INFO  [stdout] (vert.x-eventloop-thread-0) Received from Kafka, storing it in database
14:24:49,188 INFO  [stdout] (vert.x-eventloop-thread-0) 	TimedEntry{time=2021-08-06 14:24:49.183, message='Kafka'}
14:24:49,188 INFO  [stdout] (vert.x-eventloop-thread-0) 	key: 72255238; partition: 0, topic: testing
14:24:51,184 INFO  [stdout] (pool-22-thread-1) Received Kafka

Then we get another section where it is using the randomised order

14:24:51,184 INFO  [stdout] (pool-22-thread-1) Received Kafka
14:24:51,190 INFO  [stdout] (vert.x-eventloop-thread-0) Received from Kafka, storing it in database
14:24:51,190 INFO  [stdout] (vert.x-eventloop-thread-0) 	TimedEntry{time=2021-08-06 14:24:51.184, message='Kafka'}
14:24:51,190 INFO  [stdout] (vert.x-eventloop-thread-0) 	key: 72255238; partition: 0, topic: testing
14:24:53,184 INFO  [stdout] (pool-22-thread-1) Received world
14:24:55,184 INFO  [stdout] (pool-22-thread-1) Received world
14:24:57,184 INFO  [stdout] (pool-22-thread-1) Received Reactive
14:24:59,181 INFO  [stdout] (pool-22-thread-1) Received Hello
14:24:59,187 INFO  [stdout] (vert.x-eventloop-thread-0) Received from Kafka, storing it in database
14:24:59,187 INFO  [stdout] (vert.x-eventloop-thread-0) 	TimedEntry{time=2021-08-06 14:24:59.182, message='Hello'}
14:24:59,187 INFO  [stdout] (vert.x-eventloop-thread-0) 	key: 69609650; partition: 0, topic: testing

In both parts of the log we see that all messages reach the logAllMessages() method, while only Hello and Kafka reach the receiveFromKafka() method which saves them to the RDBMS.

To inspect what was stored in the database, go to http://localhost:8080/microprofile-reactive-messaging-kafka/db in your browser and you should see something like:

TimedEntry{id=1, time=2021-08-06 14:24:39.183, message='Hello'}
TimedEntry{id=2, time=2021-08-06 14:24:49.183, message='Kafka'}
TimedEntry{id=3, time=2021-08-06 14:24:51.184, message='Kafka'}
TimedEntry{id=4, time=2021-08-06 14:24:59.182, message='Hello'}

The timestamps of the entries in the browser match the ones we saw in the server logs.

Interaction with User Initiated Code

With the application still running, open two terminal windows. Enter the following curl command in both of them

$curl -N http://localhost:8080/microprofile-reactive-messaging-kafka/user

The -N option keeps the connection open, so we receive data as it becomes available on the publisher.

In a third terminal window enter the commands:

$curl -X POST http://localhost:8080/microprofile-reactive-messaging-kafka/user/one
$curl -X POST http://localhost:8080/microprofile-reactive-messaging-kafka/user/two
$curl -X POST http://localhost:8080/microprofile-reactive-messaging-kafka/user/three

In the first two terminal windows you should see these entries appear as they are posted:

data: one

data: two

data: three

Building and running the quickstart application in a bootable JAR

You can use the WildFly JAR Maven plug-in to build a WildFly bootable JAR to run this quickstart.

The quickstart pom.xml file contains a Maven profile named bootable-jar which configures the bootable JAR building:

      <profile>
          <id>bootable-jar</id>
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.wildfly.plugins</groupId>
                      <artifactId>wildfly-maven-plugin</artifactId>
                      <configuration>
                          <discover-provisioning-info>
                              <version>${version.server}</version>
                          </discover-provisioning-info>
                          <bootable-jar>true</bootable-jar>
                          <!--
                            Rename the output war to ROOT.war before adding it to the server, so that the
                            application is deployed in the root web context.
                          -->
                          <name>ROOT.war</name>
                          <add-ons>...</add-ons>
                      </configuration>
                      <executions>
                          <execution>
                              <goals>
                                  <goal>package</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
                  ...
              </plugins>
          </build>
      </profile>

The plugin uses WildFly Glow to discover the feature packs and layers required to run the application, and provisions a server containing those layers.

If you get an error or the server is missing some functionality which cannot be auto-discovered, you can download the WildFly Glow CLI and run the following command to see more information about what add-ons are available:

wildfly-glow show-add-ons
Procedure
  1. Build the quickstart bootable JAR with the following command:

    $ mvn clean package -Pbootable-jar
  2. Run the quickstart application contained in the bootable JAR:

    $ java -jar target/microprofile-reactive-messaging-kafka-bootable.jar
  3. You can now interact with the quickstart application.

Note

After the quickstart application is deployed, the bootable JAR includes the application in the root context. Therefore, any URLs related to the application should not have the /microprofile-reactive-messaging-kafka path segment after HOST:PORT.

Run the Integration Tests with a bootable jar

The integration tests included with this quickstart, which verify that the quickstart runs correctly, may also be run with a bootable jar.

Follow these steps to run the integration tests.

  1. Make sure the bootable jar is provisioned.

    $ mvn clean package -Pbootable-jar
  2. Start the WildFly bootable jar, this time using the WildFly Maven Jar Plugin, which is recommend for testing due to simpler automation.

    $ mvn wildfly:start-jar
  3. Type the following command to run the verify goal with the integration-testing profile activated, and specifying the quickstart’s URL using the server.host system property, which for a bootable jar by default is http://localhost:8080.

    $ mvn verify -Pintegration-testing -Dserver.host=http://localhost:8080
  4. Shutdown the WildFly bootable jar, this time using the WildFly Maven Jar Plugin too.

    $ mvn wildfly:shutdown
Note

The following warning message may be seen when starting the WildFly bootable jar, due to the topic missing in Kafka:

WARN  [org.apache.kafka.clients.NetworkClient] (smallrye-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-from-kafka, groupId="microprofile-reactive-messaging-kafka-group-id"] Error while fetching metadata with correlation id 2 : {testing=LEADER_NOT_AVAILABLE}

You may ignore this warning, yet to avoid it the topic may be manually created in advance, with the following command:

bin/kafka-topics.sh --create --topic testing --bootstrap-server localhost:9092

Building and running the quickstart application with OpenShift

Build the WildFly Source-to-Image (S2I) Quickstart to OpenShift with Helm Charts

On OpenShift, the S2I build with Apache Maven uses an openshift Maven profile to provision a WildFly server, deploy and run the quickstart in OpenShift environment.

The server provisioning functionality is provided by the WildFly Maven Plugin, and you may find its configuration in the quickstart pom.xml:

        <profile>
            <id>openshift</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.wildfly.plugins</groupId>
                        <artifactId>wildfly-maven-plugin</artifactId>
                        <configuration>
                            <discover-provisioning-info>
                                <version>${version.server}</version>
                                <context>cloud</context>
                            </discover-provisioning-info>
                            <!--
                                The parent POM's 'openshift' profile renames the output archive to ROOT.war so that the
                                application is deployed in the root web context. Add ROOT.war to the server.
                            -->
                            <filename>ROOT.war</filename>
                            <add-ons>...</add-ons>
                        </configuration>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>package</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    ...
                </plugins>
            </build>
        </profile>

You may note that unlike the provisioned-server profile it uses the cloud context which enables a configuration tuned for OpenShift environment.

The plugin uses WildFly Glow to discover the feature packs and layers required to run the application, and provisions a server containing those layers.

If you get an error or the server is missing some functionality which cannot be auto-discovered, you can download the WildFly Glow CLI and run the following command to see more information about what add-ons are available:

wildfly-glow show-add-ons

Getting Started with WildFly for OpenShift and Helm Charts

This section contains the basic instructions to build and deploy this quickstart to WildFly for OpenShift or WildFly for OpenShift Online using Helm Charts.

Prerequisites

  • You must be logged in OpenShift and have an oc client to connect to OpenShift

  • Helm must be installed to deploy the backend on OpenShift.

Once you have installed Helm, you need to add the repository that provides Helm Charts for WildFly.

$ helm repo add wildfly https://docs.wildfly.org/wildfly-charts/
"wildfly" has been added to your repositories
$ helm search repo wildfly
NAME                    CHART VERSION   APP VERSION     DESCRIPTION
wildfly/wildfly         ...             ...            Build and Deploy WildFly applications on OpenShift
wildfly/wildfly-common  ...             ...            A library chart for WildFly-based applications
Install AMQ Streams on OpenShift

The functionality of this quickstart depends on a running instance of the AMQ Streams Operator. AMQ Streams is a Red Hat project based on Apache Kafka. To deploy AMQ Streams in the Openshift environment:

  1. Log in into the Openshift console as kubeadmin user (or any cluster administrator).

  2. Install the Red Hat Streams for Apache Kafka operator

  3. Create an instance of Red Hat Streams for Apache Kafka

  4. Create a topic in the Red Hat Streams for Apache Kafka

Install it with the default values and wait for the message telling you it has been installed and is ready for use.

In your terminal, run the following command to subscribe the Red Hat Streams for Apache Kafka operator.

$ oc apply -f ./charts/amq-operator-on-openshift.yaml --wait --timeout=10m0s

To verify the operator is ready to use, you can run the following command. You should see the following output. The PHASE needs to be Succeeded

$ oc get ClusterServiceVersion

NAME                  DISPLAY                            VERSION   REPLACES              PHASE
amqstreams.v2.7.0-1   Red Hat Streams for Apache Kafka   2.7.0-1   amqstreams.v2.7.0-0   Succeeded

Then you can set up a Kafka cluster called my-cluster with topic testing in your project:

$ oc apply -f ./charts/kafka-on-openshift.yaml --wait --timeout=10m0s

Although the above commands will return pretty immediately, your AMQ Streams instance will not be available until its entity operator is up and running. The name of the pod will be of the format my-cluster-entity-operator-xxxxxxxxx-yyyyy.

To be on the safe side, wait until this pod is ready, as shown in this example:

 oc get pods -w

NAME                     READY   STATUS    RESTARTS   AGE
my-cluster-zookeeper-0   0/1     Running   0          29s
...
my-cluster-entity-operator-cbdbffd4d-m7fzh   0/2     Pending             0          0s
my-cluster-entity-operator-cbdbffd4d-m7fzh   0/2     ContainerCreating   0          0s
my-cluster-entity-operator-cbdbffd4d-m7fzh   0/2     ContainerCreating   0          0s
my-cluster-entity-operator-cbdbffd4d-m7fzh   0/2     Running             0          1s
my-cluster-entity-operator-cbdbffd4d-m7fzh   1/2     Running             0          20s
my-cluster-entity-operator-cbdbffd4d-m7fzh   2/2     Running             0          21s

Deploy the WildFly Source-to-Image (S2I) Quickstart to OpenShift with Helm Charts

Log in to your OpenShift instance using the oc login command. The backend will be built and deployed on OpenShift with a Helm Chart for WildFly.

Navigate to the root directory of this quickstart and run the following command:

$ helm install mp-rm-qs -f charts/helm.yaml wildfly/wildfly --wait --timeout=10m0s 
NAME: mp-rm-qs
...
STATUS: deployed
REVISION: 1

This command will return once the application has successfully deployed. In case of a timeout, you can check the status of the application with the following command in another terminal:

oc get deployment mp-rm-qs

The Helm Chart for this quickstart contains all the information to build an image from the source code using S2I on Java 17:

# TODO Update to point to the released quickstarts and image
# Will need a diff commit in the upstream-to-product repository
build:
  uri: https://github.com/wildfly/quickstart.git
  ref: main
  contextDir: microprofile-reactive-messaging-kafka
deploy:
  replicas: 1
  env:
    - name: MP_MESSAGING_CONNECTOR_SMALLRYE_KAFKA_BOOTSTRAP_SERVERS
      value: my-cluster-kafka-bootstrap:9092

This will create a new deployment on OpenShift and deploy the application.

If you want to see all the configuration elements to customize your deployment you can use the following command:

$ helm show readme wildfly/wildfly

Get the URL of the route to the deployment.

$ oc get route mp-rm-qs -o jsonpath="{.spec.host}"

Access the application in your web browser using the displayed URL.

Note

The Maven profile named openshift is used by the Helm chart to provision the server with the quickstart deployed on the root web context, and thus the application should be accessed with the URL without the /microprofile-reactive-messaging-kafka path segment after HOST:PORT.

Run the Integration Tests with OpenShift

The integration tests included with this quickstart, which verify that the quickstart runs correctly, may also be run with the quickstart running on OpenShift.

Note

The integration tests expect a deployed application, so make sure you have deployed the quickstart on OpenShift before you begin.

Run the integration tests using the following command to run the verify goal with the integration-testing profile activated and the proper URL:

$ mvn verify -Pintegration-testing -Dserver.host=https://$(oc get route mp-rm-qs --template='{{ .spec.host }}') 
Note

The tests are using SSL to connect to the quickstart running on OpenShift. So you need the certificates to be trusted by the machine the tests are run from.

Undeploy the WildFly Source-to-Image (S2I) Quickstart from OpenShift with Helm Charts

$ helm uninstall mp-rm-qs
$ oc delete -f ./charts/kafka-on-openshift.yaml --wait --timeout=10m0s
$ oc delete -f ./charts/amq-operator-on-openshift.yaml --wait --timeout=10m0s

Building and running the quickstart application with Kubernetes

Build the WildFly Quickstart to Kubernetes with Helm Charts

For Kubernetes, the build with Apache Maven uses an openshift Maven profile to provision a WildFly server, suitable for running on Kubernetes.

The server provisioning functionality is provided by the WildFly Maven Plugin, and you may find its configuration in the quickstart pom.xml:

        <profile>
            <id>openshift</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.wildfly.plugins</groupId>
                        <artifactId>wildfly-maven-plugin</artifactId>
                        <configuration>
                            <discover-provisioning-info>
                                <version>${version.server}</version>
                                <context>cloud</context>
                            </discover-provisioning-info>
                            <!--
                                The parent POM's 'openshift' profile renames the output archive to ROOT.war so that the
                                application is deployed in the root web context. Add ROOT.war to the server.
                            -->
                            <filename>ROOT.war</filename>
                            <add-ons>...</add-ons>
                        </configuration>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>package</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    ...
                </plugins>
            </build>
        </profile>

You may note that unlike the provisioned-server profile it uses the cloud context which enables a configuration tuned for Kubernetes environment.

The plugin uses WildFly Glow to discover the feature packs and layers required to run the application, and provisions a server containing those layers.

If you get an error or the server is missing some functionality which cannot be auto-discovered, you can download the WildFly Glow CLI and run the following command to see more information about what add-ons are available:

wildfly-glow show-add-ons

Getting Started with Kubernetes and Helm Charts

This section contains the basic instructions to build and deploy this quickstart to Kubernetes using Helm Charts.

Install Kubernetes

In this example we are using Minikube as our Kubernetes provider. See the Minikube Getting Started guide for how to install it. After installing it, we start it with 4GB of memory.

minikube start --memory='4gb'

The above command should work if you have Docker installed on your machine. If, you are using Podman instead of Docker, you will also need to pass in --driver=podman, as covered in the Minikube documentation.

Once Minikube has started, we need to enable its registry since that is where we will push the image needed to deploy the quickstart, and where we will tell the Helm charts to download it from.

minikube addons enable registry

In order to be able to push images to the registry we need to make it accessible from outside Kubernetes. How we do this depends on your operating system. All the below examples will expose it at localhost:5000

# On Mac:
docker run --rm -it --network=host alpine ash -c "apk add socat && socat TCP-LISTEN:5000,reuseaddr,fork TCP:$(minikube ip):5000"

# On Linux:
kubectl port-forward --namespace kube-system service/registry 5000:80 &

# On Windows:
kubectl port-forward --namespace kube-system service/registry 5000:80
docker run --rm -it --network=host alpine ash -c "apk add socat && socat TCP-LISTEN:5000,reuseaddr,fork TCP:host.docker.internal:5000"

Prerequisites

  • Helm must be installed to deploy the backend on Kubernetes.

Once you have installed Helm, you need to add the repository that provides Helm Charts for WildFly.

$ helm repo add wildfly https://docs.wildfly.org/wildfly-charts/
"wildfly" has been added to your repositories
$ helm search repo wildfly
NAME                    CHART VERSION   APP VERSION     DESCRIPTION
wildfly/wildfly         ...             ...            Build and Deploy WildFly applications on OpenShift
wildfly/wildfly-common  ...             ...            A library chart for WildFly-based applications
Install Strimzi on Kubernetes

The functionality of this quickstart depends on a running instance of the Strimzi Operator. Strimzi is an open source implementation providing Kafka on Kubernetes. To deploy Strimzi on Kubernetes there are a few steps needed. We will cover them in this section.

First of all, since the Strimzi operator installs quite a lot of things, we create a new namespace, and tell kubectl to use that. In this case we tell it to use the name kafka for this namespace, since the charts/strimzi-on-kubernetes.yaml file specified this namespace when downloading it. The comment at the start of the yaml file contains more information.

# Get the name of the current namespace
old_namespace="$(kubectl config view --minify --output 'jsonpath={..namespace}'; echo)"

# Create the 'kafka' namespace
kubectl create namespace kafka

# Tell kubectl to use the 'kafka' namespace when issuing commands
kubectl config set-context --current --namespace="kafka"

Now we install charts/strimzi-on-kubernetes.yaml which will add all the Kubernetes resources for the Strimzi operator:

kubectl apply -f charts/strimzi-on-kubernetes.yaml --wait --timeout=10m0s

Next we create a Kafka cluster called my-cluster and an associated topic called testing in that cluster. The following command might fail if everything that happens after issuing the above command has not completed yet. In this case, just wait a few seconds and try again until it succeeds:

kubectl apply -f ./charts/kafka-on-kubernetes.yaml --wait --timeout=10m0s

The above will install a deployment called strimzi-cluster-operator, as well as its associated pods. These will be visible immediately when you execute kubectl get all, although it will take some time until they reach the Ready state.

However, for the cluster to be usable we need to wait for its entity operator is installed up and running. The name of the pod will be of the format my-cluster-entity-operator-xxxxxxxxx-yyyyy.

To be on the safe side, wait until this pod is ready, as shown in this example:

 oc get pods -w
NAME                     READY   STATUS    RESTARTS   AGE
my-cluster-zookeeper-0   0/1     Running   0          29s
...
my-cluster-entity-operator-cbdbffd4d-m7fzh   0/2     Pending             0          0s
my-cluster-entity-operator-cbdbffd4d-m7fzh   0/2     ContainerCreating   0          0s
my-cluster-entity-operator-cbdbffd4d-m7fzh   0/2     ContainerCreating   0          0s
my-cluster-entity-operator-cbdbffd4d-m7fzh   0/2     Running             0          1s
my-cluster-entity-operator-cbdbffd4d-m7fzh   1/2     Running             0          20s
my-cluster-entity-operator-cbdbffd4d-m7fzh   2/2     Running             0          21s
Cleaning up
Note
The steps in this section should happen after you have finished with your experiments and are no longer using the Kafka instance from your application!

First we remove the Kubernetes objects created by the steps to install Strimzi:

kubectl delete -f charts/kafka-on-kubernetes.yaml --wait --timeout=10m0s
kubectl delete -f charts/strimzi-on-kubernetes.yaml --wait --timeout=10m0s

Next we tell kubectl to use the namespace we were using before installing everything:

kubectl config set-context --current --namespace="${old_namespace}"

Finally, we delete the kafka namespace we created:

kubectl delete namespace kafka

Deploy the WildFly Source-to-Image (S2I) Quickstart to Kubernetes with Helm Charts

The backend will be built and deployed on Kubernetes with a Helm Chart for WildFly.

Navigate to the root directory of this quickstart and run the following commands:

mvn -Popenshift package wildfly:image

This will use the openshift Maven profile we saw earlier to build the application, and create a Docker image containing the WildFly server with the application deployed. The name of the image will be microprofile-reactive-messaging-kafka.

Next we need to tag the image and make it available to Kubernetes. You can push it to a registry like quay.io. In this case we tag as localhost:5000/microprofile-reactive-messaging-kafka:latest and push it to the internal registry in our Kubernetes instance:

# Tag the image
docker tag microprofile-reactive-messaging-kafka localhost:5000/microprofile-reactive-messaging-kafka:latest
# Push the image to the registry
docker push localhost:5000/microprofile-reactive-messaging-kafka:latest

In the below call to helm install which deploys our application to Kubernetes, we are passing in some extra arguments to tweak the Helm build:

  • --set build.enabled=false - This turns off the s2i build for the Helm chart since Kubernetes, unlike OpenShift, does not have s2i. Instead, we are providing the image to use.

  • --set deploy.route.enabled=false - This disables route creation normally performed by the Helm chart. On Kubernetes we will use port-forwards instead to access our application, since routes are an OpenShift specific concept and thus not available on Kubernetes.

  • --set image.name="localhost:5000/microprofile-reactive-messaging-kafka" - This tells the Helm chart to use the image we built, tagged and pushed to Kubernetes' internal registry above.

$ helm install mp-rm-qs -f charts/helm.yaml wildfly/wildfly --wait --timeout=10m0s --set build.enabled=false --set deploy.route.enabled=false --set image.name="localhost:5000/microprofile-reactive-messaging-kafka"
NAME: mp-rm-qs
...
STATUS: deployed
REVISION: 1

This command will return once the application has successfully deployed. In case of a timeout, you can check the status of the application with the following command in another terminal:

kubectl get deployment mp-rm-qs

The Helm Chart for this quickstart contains all the information to build an image from the source code using S2I on Java 17:

# TODO Update to point to the released quickstarts and image
# Will need a diff commit in the upstream-to-product repository
build:
  uri: https://github.com/wildfly/quickstart.git
  ref: main
  contextDir: microprofile-reactive-messaging-kafka
deploy:
  replicas: 1
  env:
    - name: MP_MESSAGING_CONNECTOR_SMALLRYE_KAFKA_BOOTSTRAP_SERVERS
      value: my-cluster-kafka-bootstrap:9092

This will create a new deployment on Kubernetes and deploy the application.

If you want to see all the configuration elements to customize your deployment you can use the following command:

$ helm show readme wildfly/wildfly

To be able to connect to our application running in Kubernetes from outside, we need to set up a port-forward to the mp-rm-qs service created for us by the Helm chart.

This service will run on port 8080, and we set up the port forward to also run on port 8080:

kubectl port-forward service/mp-rm-qs 8080:8080

The server can now be accessed via http://localhost:8080 from outside Kubernetes. Note that the command to create the port-forward will not return, so it is easiest to run this in a separate terminal.

Note

The Maven profile named openshift is used by the Helm chart to provision the server with the quickstart deployed on the root web context, and thus the application should be accessed with the URL without the /microprofile-reactive-messaging-kafka path segment after HOST:PORT.

Run the Integration Tests with Kubernetes

The integration tests included with this quickstart, which verify that the quickstart runs correctly, may also be run with the quickstart running on Kubernetes.

Note

The integration tests expect a deployed application, so make sure you have deployed the quickstart on Kubernetes before you begin.

Run the integration tests using the following command to run the verify goal with the integration-testing profile activated and the proper URL:

$ mvn verify -Pintegration-testing -Dserver.host=http://localhost:8080 

Undeploy the WildFly Source-to-Image (S2I) Quickstart from Kubernetes with Helm Charts

$ helm uninstall mp-rm-qs

To stop the port forward you created earlier use:

$ kubectl port-forward service/mp-rm-qs 8080:8080

Conclusion

MicroProfile Reactive Messaging and Reactive Streams Operators allow you to publish to, process and consume streams, optionally backed by Kafka, by adding @Incoming and @Outgoing annotations to your methods. 'Paired' streams work in memory. To map 'un-paired' streams to be backed by Kafka you need to provide configuration via MicroProfile Config.