Apache Kafka®, Kafka Streams, and ksqlDB to demonstrate real use cases - 3 (message-ordering)
How to maintain message ordering and no message duplication
Question:
How can I maintain the order of messages and prevent message duplication in a Kafka topic partition?
Example use case:
If your application needs to maintain ordering of messages with no duplication, you can enable your Apache Kafka producer for idempotency. An idempotent producer has a unique producer ID and uses sequence IDs for each message, which allows the broker to ensure it is committing ordered messages with no duplication, on a per partition basis.
Short Answer
Set the ProducerConfig
configuration parameters relevant to the idempotent producer:
enable.idempotence=trueacks=all
mkdir message-ordering && cd message-ordering
Make the following directories to set up its structure:
mkdir src test
Next, create the following docker-compose.yml
file to obtain Confluent Platform (this tutorial uses just ZooKeeper and the Kafka broker):
---version: '2'services: zookeeper: image: confluentinc/cp-zookeeper:6.1.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:6.1.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" environment: KAFKA_BROKER_ID: 101 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
And launch it by running:
docker-compose up -d
Create the Kafka topic myTopic
with 2 partitions:
docker-compose exec broker kafka-topics --bootstrap-server broker:9092 --topic myTopic --create --replication-factor 1 --partitions 2
Describe the properties of the topic that you just created.
docker-compose exec broker kafka-topics --bootstrap-server broker:9092 --topic myTopic --describe
The output should be the following. Notice that mytopic
has two partitions numbered 0 and 1.
Topic: myTopic PartitionCount: 2 ReplicationFactor: 1 Configs: Topic: myTopic Partition: 0 Leader: 101 Replicas: 101 Isr: 101 Topic: myTopic Partition: 1 Leader: 101 Replicas: 101 Isr: 101
Create the following Gradle build file for the project, named build.gradle
:
buildscript { repositories { mavenCentral() } dependencies { classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2" }}plugins { id "java" id "idea" id "eclipse"}sourceCompatibility = "1.8"targetCompatibility = "1.8"version = "0.0.1"repositories { mavenCentral() maven { url "https://packages.confluent.io/maven" }}apply plugin: "com.github.dpq1422.shadow"dependencies { implementation "org.slf4j:slf4j-simple:1.7.30" implementation "org.apache.kafka:kafka-streams:2.7.0" testImplementation "junit:junit:4.13.2" testImplementation 'org.hamcrest:hamcrest:2.2'}test { testLogging { outputs.upToDateWhen { false } showStandardStreams = true exceptionFormat = "full" }}jar { manifest { attributes( "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "), "Main-Class": "io.confluent.developer.KafkaProducerApplication" ) }}shadowJar { archiveBaseName = "message-ordering-standalone" archiveClassifier = ''}
Run the following command to obtain the Gradle wrapper:
gradle wrapper
Next, create a directory for configuration data:
mkdir configuration
Create a development properties file at configuration/dev.properties
:
bootstrap.servers=localhost:29092#Properties below this line are specific to code in this applicationoutput.topic.name=myTopic
Create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer
Before you create your full application code, let’s highlight some of the most important ProducerConfig
configuration parameters relevant to the idempotent producer:
enable.idempotence=trueacks=allmax.in.flight.requests.per.connection=5retries=2147483647
The following parameter is required to be explicitly configured:
- enable.idempotence: when set to
true
, it enables an idempotent producer which ensures that exactly one copy of each message is written to the brokers, and in order. The default value isenable.idempotence=false
, so you must explicitly set this toenable.idempotence=true
.
The other parameters may not be required to be explicitly set, but there are some noteworthy caveats:
- acks: the
KafkaProducer
uses theacks
configuration to tell the leader broker how many acknowledgments to wait for to consider a produce request complete. This value must beacks=all
for the idempotent producer to work, otherwise the producer cannot guarantee idempotence. The default value isacks=1
, so you have two choices: (a) do not explicitly set it in the configuration and allow the producer automatically override it, or (b) explicitly set this toacks=all
. The producer will fail to start ifenable.idempotence=true
and your application configures this to anything butacks=all
. - max.in.flight.requests.per.connection: the maximum number of unacknowledged requests the producer sends on a single connection before blocking. The idempotent producer still maintains message order even with pipelining (i.e.,
max.in.flight.requests.per.connection
can be greater than 1), and the maximum value supported with idempotency is 5. The default value is alreadymax.in.flight.requests.per.connection=5
, so no change is required for the idempotent producer. - retries: setting a value greater than zero will cause the producer to resend any record whose send fails with a potentially transient error. The only requirement for idempotency is that this is greater than zero. The default value is already
retries=2147483647
, so no change is required for the idempotent producer.
This is only a small subset of producer configuration parameters focused on idempotent producer semantics. For further reading, please see KIP-98. This KIP also discusses other elements of exactly once semantics (EOS), including transactional guarantees which enable applications to produce to multiple partitions atomically, ie. all writes across multiple partitions can succeed or fail as a unit.
Now let’s create the application source code at src/main/java/io/confluent/developer/KafkaProducerApplication.java
.
package io.confluent.developer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.common.serialization.StringSerializer;import java.io.FileInputStream;import java.io.IOException;import java.nio.file.Files;import java.nio.file.Paths;import java.util.Collection;import java.util.List;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.stream.Collectors;public class KafkaProducerApplication { private final Producer<String, String> producer; final String outTopic; public KafkaProducerApplication(final Producer<String, String> producer, final String topic) { this.producer = producer; outTopic = topic; } public void produce(final String message) { final String[] parts = message.split("-"); final String key, value; if (parts.length > 1) { key = parts[0]; value = parts[1]; } else { key = null; value = parts[0]; } final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outTopic, key, value); producer.send(producerRecord, (recordMetadata, e) -> { if(e != null) { e.printStackTrace(); } else { System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset()); } } ); } public void shutdown() { producer.close(); } public static Properties loadProperties(String fileName) throws IOException { final Properties envProps = new Properties(); final FileInputStream input = new FileInputStream(fileName); envProps.load(input); input.close(); return envProps; } public static void main(String[] args) throws Exception { if (args.length < 2) { throw new IllegalArgumentException( "This program takes two arguments: the path to an environment configuration file and" + "the path to the file with records to send"); } final Properties props = KafkaProducerApplication.loadProperties(args[0]); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); final String topic = props.getProperty("output.topic.name"); final Producer<String, String> producer = new KafkaProducer<>(props); final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic); String filePath = args[1]; try { List<String> linesToProduce = Files.readAllLines(Paths.get(filePath)); linesToProduce.stream() .filter(l -> !l.trim().isEmpty()) .forEach(producerApp::produce); System.out.println("Offsets and timestamps committed in batch from " + filePath); } catch (IOException e) { System.err.printf("Error reading file %s due to %s %n", filePath, e); } finally { producerApp.shutdown(); } }}
Create the following file input.txt
in the base directory of the tutorial. The numbers before the -
will be the key and the part after will be the value.
a-1b-2c-3d-4a-5b-6c-7d-8a-9b-10c-11d-12
In your terminal, run:
./gradlew shadowJar
Now that you have an uberjar for the KafkaProducerApplication, you can launch it locally.
java -jar build/libs/message-ordering-standalone-0.0.1.jar configuration/dev.properties input.txt
After you run the previous command, the application will process the file and you should some logs like this on the console:
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=myApp] Instantiated an idempotent producer. ....[kafka-producer-network-thread | myApp] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=myApp] ProducerId set to 0 with epoch 0
The producer is configured for idempotency | |
This app has been assigned ProducerId=0 (If you were to run the app again, then it would increase to ProducerId=1 ) |
And then you should see the output from the Producer application, which displays confirmation at which offset each record was written to via a Callback
lambda expression:
Offsets and timestamps committed in batch from input.txtkey/value a/1 written to topic[partition] myTopic[0] at offset 0key/value b/2 written to topic[partition] myTopic[0] at offset 1key/value c/3 written to topic[partition] myTopic[0] at offset 2key/value a/5 written to topic[partition] myTopic[0] at offset 3key/value b/6 written to topic[partition] myTopic[0] at offset 4key/value c/7 written to topic[partition] myTopic[0] at offset 5key/value a/9 written to topic[partition] myTopic[0] at offset 6key/value b/10 written to topic[partition] myTopic[0] at offset 7key/value c/11 written to topic[partition] myTopic[0] at offset 8key/value d/4 written to topic[partition] myTopic[1] at offset 0key/value d/8 written to topic[partition] myTopic[1] at offset 1key/value d/12 written to topic[partition] myTopic[1] at offset 2
Test it
Run a console consumer to read all the messages from myTopic
to confirm the producer published the expected records.
docker-compose exec broker kafka-console-consumer --topic myTopic \ --bootstrap-server broker:9092 \ --from-beginning \ --property print.key=true \ --property key.separator=" : "
The output from the consumer should look something like below. Notice that the messages are not in order—this is expected! This illustrates that for the consumer, message order is not maintained across topic partitions, it is only maintained per partition (as we will see in the next few steps).
a : 1b : 2c : 3a : 5b : 6c : 7a : 9b : 10c : 11d : 4d : 8d : 12
Close the consumer with a CTRL+C
.
Consume data from the Kafka topic, specifying only to read from partition 0.
docker-compose exec broker kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic myTopic \ --property print.key=true \ --property key.separator=, \ --partition 0 \ --from-beginning
You should see only some of the records in this partition.
a,1b,2c,3a,5b,6c,7a,9b,10c,11Processed a total of 9 messages
Close the consumer by entering CTRL+C
.
Now let’s look at the Kafka broker’s log segment files using the kafka-dump-log
administrative tool. First, examine partition 0, indicated by the 0
in myTopic-0
.
docker-compose exec broker kafka-dump-log \ --print-data-log \ --files '/var/lib/kafka/data/myTopic-0/00000000000000000000.log' \ --deep-iteration
You should see:
Dumping /var/lib/kafka/data/myTopic-0/00000000000000000000.logStarting offset: 0baseOffset: 0 lastOffset: 8 count: 9 baseSequence: 0 lastSequence: 8 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1602295941754 size: 144 magic: 2 compresscodec: NONE crc: 1009740801 isvalid: true| offset: 0 CreateTime: 1602295941743 keysize: 1 valuesize: 1 sequence: 0 headerKeys: [] key: a payload: 1| offset: 1 CreateTime: 1602295941753 keysize: 1 valuesize: 1 sequence: 1 headerKeys: [] key: b payload: 2| offset: 2 CreateTime: 1602295941753 keysize: 1 valuesize: 1 sequence: 2 headerKeys: [] key: c payload: 3| offset: 3 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 3 headerKeys: [] key: a payload: 5| offset: 4 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 4 headerKeys: [] key: b payload: 6| offset: 5 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 5 headerKeys: [] key: c payload: 7| offset: 6 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 6 headerKeys: [] key: a payload: 9| offset: 7 CreateTime: 1602295941754 keysize: 1 valuesize: 2 sequence: 7 headerKeys: [] key: b payload: 10| offset: 8 CreateTime: 1602295941754 keysize: 1 valuesize: 2 sequence: 8 headerKeys: [] key: c payload: 11
Note the familiar producerId: 0
, which corresponds to the earlier log output from the producer application run. (If the producer were not configured to be idempotent, this would show producerId: -1
.)
Also observe that each message has a unique sequence number, starting with sequence: 0
through sequence: 8
, that are not duplicated and are all in order. The broker checks the sequence number to ensure idempotency per partition, such that if a producer experiences a retriable exception and resends a message, sequence numbers will not be duplicated or out of order in the committed log. (If the producer were not configured to be idempotent, the messages would show sequence: -1
.)
Consume data from the Kafka topic, specifying only to read from partition 1.
docker-compose exec broker kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic myTopic \ --property print.key=true \ --property key.separator=, \ --partition 1 \ --from-beginning
You should see only some of the records in this partition.
d,4d,8d,12Processed a total of 3 messages
Close the consumer by entering CTRL+C
.
Use the kafka-dump-log
administrative tool again to examine partition 1, indicated by the 1
in myTopic-1
.
docker-compose exec broker kafka-dump-log \ --print-data-log \ --files '/var/lib/kafka/data/myTopic-1/00000000000000000000.log' \ --deep-iteration
You should see:
Dumping /var/lib/kafka/data/myTopic-1/00000000000000000000.logStarting offset: 0baseOffset: 0 lastOffset: 2 count: 3 baseSequence: 0 lastSequence: 2 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1602295941754 size: 89 magic: 2 compresscodec: NONE crc: 308733939 isvalid: true| offset: 0 CreateTime: 1602295941753 keysize: 1 valuesize: 1 sequence: 0 headerKeys: [] key: d payload: 4| offset: 1 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 1 headerKeys: [] key: d payload: 8| offset: 2 CreateTime: 1602295941754 keysize: 1 valuesize: 2 sequence: 2 headerKeys: [] key: d payload: 12
The producerId
is the same as shown in the log output from the previous partition, because it is the same producer application with the same producer ID. The sequence numbers in this partition are unique and unrelated to the other partition, so these records have sequence: 0
through sequence: 2
.