API for more information and configuration of Kafka messages

In  microprofile

Overview

With the current offering of Reactive Messaging introduced in WildFLy 23, and in WildFly 24, There is currently no way to get Kafka related information when receiving a message, nor is it possible to set to Kafka when sending a message. The most important thing we are missing is the key of the Kafka record being sent/received.

SmallRye Reactive Messaging has had this functionality for a long time in its Kafka connector module, but as this mixes implementation details with public API, with no clear separation of what is what, it was decided not to introduce it until a suitable API was extracted.

SmallRye Reactive Messaging has been enhanced to extract a Kafka user API in https://github.com/smallrye/smallrye-reactive-messaging/pull/1272. Note that this is a starting point, and more constructs are likely to end up in the Kafka API. As this is an external project, we have influence over what we want to add (as in I will add), but we don’t have influence to stop them from adding what they see fit to add.

Issue Metadata

Issue

Dev Contacts

QE Contacts

Testing By

  • Engineering

  • QE

Affected Projects or Components

  • WildFly - will need adjusting to add the new Kafka API maven module to the module containing the Kafka Connector

  • WildFly BOMs - the new API artifact will be added to these

  • SmallRye Reactive Messaging - contains the new API Maven module

  • WildFly Quickstarts - The Reactive Messaging Quickstart should be enhanced to show the Kafka API

Other Interested Projects

Relevant Installation Types

  • Traditional standalone server (unzipped or provisioned by Galleon)

  • Managed domain

  • OpenShift s2i

  • Bootable jar

Requirements

Hard Requirements

The Kafka user API contains the following classes, the below shows what they allow you to access

  • KafkaMetadataUtil - contains utility methods to set OutgoingKafkaRecordMetadata in a Reactive Messaging Message to configure how messages are sent, and to read IncomingKafkaRecordMetadata from a Message received from Kafka to get information relating to Kafka. This utility class is present to wrap usage of Metadata which has made its way into the spec interfaces in the SmallRye Reactive Messaging API Maven module, while not yet part of the Microprofile Reactive Messaging specification.

  • IncomingKafkaRecordMetadata contains methods to read information relating to Kafka from an incoming Message. This information includes:

    • key - the key of the Kafka record represented by the Message.

    • topic - the Kafka topic the Message was received on.

    • partition - the Kafka partition the Message was received on.

    • timestamp - the timestamp of the Message.

    • timeStampType - the org.apache.kafka.common.record.TimestampType of the message.

    • offset - the Kafka offset of the Kafka record represented by the Message.

    • headers - An instance of org.apache.kafka.common.header.Headers. They have no meaning to the Kafka broker, which just forwards them on to the consumers. These can be thought of attachments/metadata on the Message to be used as additional information in the producer to consumer, and are dealt with on the application level. For example, a header could indicate which data centre was the origin of a message, and consumers are free to deal with this information or ignore it.

  • OutgoingKafkaRecordMetadata contains a builder to set data to influence how Kafka sends a Message> This includes:

    • key - the key of the Kafka record represented by the Message.

    • topic - generally users should configure the Kafka topic for a stream via the application MicroProfile Config. However, in some cases they may want to specify the topic dynamically, for example depending on the contents of the data in the message that is being sent.

    • partition - the Kafka partition the Message will be sent to. Generally you should let the partitioner do this work for you. Still SmallRye Reactive Messaging allows for this, probably due to a customer request in Quarkus.

    • timestamp - the timestamp of the Message.

    • timeStampType - the org.apache.kafka.common.record.TimestampType of the message.

    • offset - the Kafka offset of the Kafka record represented by the Message.

    • headers - Allows us to set org.apache.kafka.common.header.Headers as explained above.

Nice-to-Have Requirements

  • If the SmallRye Reactive Messaging Kafka User API evolves in such a way that it brings in things we don’t want in WildFly, it would be good to come up with a mechanism to give an error if a user tries to use them. We will not be able to do this as compile time checks, so we would need to do this at runtime and document.

  • I would like to look into porting the following to the SmallRye Reactive Messaging Kafka user API (time permitting, and in order of priority):

    • DeserializationFailureHandler - Seems good to handle issues with deserialization for records. Was requested by a customer for Quarkus.

    • KafkaRecord - A simple wrapper around message payload and the Kafka key, and easier to use than using the IncomingKafkaRecordMetadata and OutgoingKafkaRecordMetadata in the hard requirements. I think this will make it into the Kafka user API regardless of if we want it or not.

Non-Requirements

  • At runtime all classes from the SmallRye Reactive Messaging Kafka connector are available, we cannot stop users from compiling against that if they choose to side-step using the BOMs. As done for WFLY-14798 the mechanism to control what the users use is done via the BOMs (WFLY-14800).

  • To hide Kafka classes exposed via the API. We need to document which classes we allow users to access. WFLY-14798 already exposes a few (e.g. classes/interfaces from the org.apache.kafka.common.serialization package)

Test Plan

Tests will be added to the WildFly testsuite to make sure that the data set in the OutgoingKafkaRecordMetadata takes effect, and is represented in the IncomingKafkaRecordMetadata on the receiving end. Tests will include checking that the topic and the partition can be overridden in the OutgoingKafkaRecordMetadata, and has an effect on where the data is sent, which in turn can be checked in the IncomingKafkaRecordMetadata.

Apache Kafka Client classes exposed

As well as the classes from the org.apache.kafka.common.serialization package of the Apache Kafka Client jar mentioned in WFLY-14798, this RFE also exposes the following classes from said jar to users:

  • To deal with headers:

    • org.apache.kafka.common.header.Header

    • org.apache.kafka.common.header.Headers

    • org.apache.kafka.common.header.internals.RecordHeader

    • org.apache.kafka.common.header.internals.RecordHeaders

  • org.apache.kafka.common.record.TimestampType

  • org.apache.kafka.clients.consumer.ConsumerRecord

Community Documentation

The functionality is already documented in the SmallRye documentation. However, it will be mentioned in the Reactive Messaging subsystem documentation, and the WFLY-14800 - Quickstart will be updated to show some of the functionality.

Release Note Content

The MicroProfile Reactive Messaging subsystem with SmallRye now supports additional configuration of messages sent to Kafka, and provides means of getting information from Kafka on the receiving end.