API for more information and configuration of Kafka messages
Overview
With the current offering of Reactive Messaging introduced in WildFLy 23, and in WildFly 24, There is currently no way to get Kafka related information when receiving a message, nor is it possible to set to Kafka when sending a message. The most important thing we are missing is the key of the Kafka record being sent/received.
SmallRye Reactive Messaging has had this functionality for a long time in its Kafka connector module, but as this mixes implementation details with public API, with no clear separation of what is what, it was decided not to introduce it until a suitable API was extracted.
SmallRye Reactive Messaging has been enhanced to extract a Kafka user API in https://github.com/smallrye/smallrye-reactive-messaging/pull/1272. Note that this is a starting point, and more constructs are likely to end up in the Kafka API. As this is an external project, we have influence over what we want to add (as in I will add), but we don’t have influence to stop them from adding what they see fit to add.
Issue Metadata
Issue
Related Issues
Dev Contacts
QE Contacts
-
jstourac@redhat.com}">Jan Stourac
Testing By
-
Engineering
-
QE
Affected Projects or Components
-
WildFly - will need adjusting to add the new Kafka API maven module to the module containing the Kafka Connector
-
WildFly BOMs - the new API artifact will be added to these
-
SmallRye Reactive Messaging - contains the new API Maven module
-
WildFly Quickstarts - The Reactive Messaging Quickstart should be enhanced to show the Kafka API
Other Interested Projects
Relevant Installation Types
-
Traditional standalone server (unzipped or provisioned by Galleon)
-
Managed domain
-
OpenShift s2i
-
Bootable jar
Requirements
Hard Requirements
The Kafka user API contains the following classes, the below shows what they allow you to access
-
KafkaMetadataUtil - contains utility methods to set
OutgoingKafkaRecordMetadata
in a Reactive MessagingMessage
to configure how messages are sent, and to readIncomingKafkaRecordMetadata
from aMessage
received from Kafka to get information relating to Kafka. This utility class is present to wrap usage ofMetadata
which has made its way into the spec interfaces in the SmallRye Reactive Messaging API Maven module, while not yet part of the Microprofile Reactive Messaging specification. -
IncomingKafkaRecordMetadata contains methods to read information relating to Kafka from an incoming
Message
. This information includes:-
key
- the key of the Kafka record represented by theMessage
. -
topic
- the Kafka topic theMessage
was received on. -
partition
- the Kafka partition theMessage
was received on. -
timestamp
- the timestamp of theMessage
. -
timeStampType
- theorg.apache.kafka.common.record.TimestampType
of the message. -
offset
- the Kafka offset of the Kafka record represented by theMessage
. -
headers
- An instance oforg.apache.kafka.common.header.Headers
. They have no meaning to the Kafka broker, which just forwards them on to the consumers. These can be thought of attachments/metadata on theMessage
to be used as additional information in the producer to consumer, and are dealt with on the application level. For example, a header could indicate which data centre was the origin of a message, and consumers are free to deal with this information or ignore it.
-
-
OutgoingKafkaRecordMetadata contains a builder to set data to influence how Kafka sends a
Message
> This includes:-
key
- the key of the Kafka record represented by theMessage
. -
topic
- generally users should configure the Kafka topic for a stream via the application MicroProfile Config. However, in some cases they may want to specify the topic dynamically, for example depending on the contents of the data in the message that is being sent. -
partition
- the Kafka partition theMessage
will be sent to. Generally you should let the partitioner do this work for you. Still SmallRye Reactive Messaging allows for this, probably due to a customer request in Quarkus. -
timestamp
- the timestamp of theMessage
. -
timeStampType
- theorg.apache.kafka.common.record.TimestampType
of the message. -
offset
- the Kafka offset of the Kafka record represented by theMessage
. -
headers
- Allows us to setorg.apache.kafka.common.header.Headers
as explained above.
-
Nice-to-Have Requirements
-
If the SmallRye Reactive Messaging Kafka User API evolves in such a way that it brings in things we don’t want in WildFly, it would be good to come up with a mechanism to give an error if a user tries to use them. We will not be able to do this as compile time checks, so we would need to do this at runtime and document.
-
I would like to look into porting the following to the SmallRye Reactive Messaging Kafka user API (time permitting, and in order of priority):
-
DeserializationFailureHandler - Seems good to handle issues with deserialization for records. Was requested by a customer for Quarkus.
-
KafkaRecord - A simple wrapper around message payload and the Kafka key, and easier to use than using the IncomingKafkaRecordMetadata and OutgoingKafkaRecordMetadata in the hard requirements. I think this will make it into the Kafka user API regardless of if we want it or not.
-
Non-Requirements
-
At runtime all classes from the SmallRye Reactive Messaging Kafka connector are available, we cannot stop users from compiling against that if they choose to side-step using the BOMs. As done for WFLY-14798 the mechanism to control what the users use is done via the BOMs (WFLY-14800).
-
To hide Kafka classes exposed via the API. We need to document which classes we allow users to access. WFLY-14798 already exposes a few (e.g. classes/interfaces from the
org.apache.kafka.common.serialization
package)
Test Plan
Tests will be added to the WildFly testsuite to make sure that the data set in the OutgoingKafkaRecordMetadata
takes effect, and is represented in the IncomingKafkaRecordMetadata
on the receiving end. Tests will include checking that the topic and the partition can be overridden in the OutgoingKafkaRecordMetadata, and has an effect on where the data is sent, which in turn can be checked in the IncomingKafkaRecordMetadata
.
Apache Kafka Client classes exposed
As well as the classes from the org.apache.kafka.common.serialization
package of the Apache Kafka Client jar mentioned in WFLY-14798, this RFE also exposes the following classes from said jar to users:
-
To deal with headers:
-
org.apache.kafka.common.header.Header
-
org.apache.kafka.common.header.Headers
-
org.apache.kafka.common.header.internals.RecordHeader
-
org.apache.kafka.common.header.internals.RecordHeaders
-
-
org.apache.kafka.common.record.TimestampType
-
org.apache.kafka.clients.consumer.ConsumerRecord
Community Documentation
The functionality is already documented in the SmallRye documentation. However, it will be mentioned in the Reactive Messaging subsystem documentation, and the WFLY-14800 - Quickstart will be updated to show some of the functionality.
Release Note Content
The MicroProfile Reactive Messaging subsystem with SmallRye now supports additional configuration of messages sent to Kafka, and provides means of getting information from Kafka on the receiving end.