Apache Kafka®, Kafka Streams, and ksqlDB to demonstrate real use cases - 3 (message-ordering)

How to maintain message ordering and no message duplication

Question:

How can I maintain the order of messages and prevent message duplication in a Kafka topic partition?

Example use case:

If your application needs to maintain ordering of messages with no duplication, you can enable your Apache Kafka producer for idempotency. An idempotent producer has a unique producer ID and uses sequence IDs for each message, which allows the broker to ensure it is committing ordered messages with no duplication, on a per partition basis.

Short Answer

Set the ProducerConfig configuration parameters relevant to the idempotent producer:

enable.idempotence=trueacks=all
Initialize the project

 

mkdir message-ordering && cd message-ordering

Make the following directories to set up its structure:

mkdir src test
2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform (this tutorial uses just ZooKeeper and the Kafka broker):

---version: '2'services:  zookeeper:    image: confluentinc/cp-zookeeper:6.1.0    hostname: zookeeper    container_name: zookeeper    ports:      - "2181:2181"    environment:      ZOOKEEPER_CLIENT_PORT: 2181      ZOOKEEPER_TICK_TIME: 2000  broker:    image: confluentinc/cp-kafka:6.1.0    hostname: broker    container_name: broker    depends_on:      - zookeeper    ports:      - "29092:29092"    environment:      KAFKA_BROKER_ID: 101      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

And launch it by running:

docker-compose up -d
3
Create the Kafka topic

Create the Kafka topic myTopic with 2 partitions:

docker-compose exec broker kafka-topics --bootstrap-server broker:9092 --topic myTopic --create --replication-factor 1 --partitions 2
4
Describe the topic

Describe the properties of the topic that you just created.

docker-compose exec broker kafka-topics --bootstrap-server broker:9092 --topic myTopic --describe

The output should be the following. Notice that mytopic has two partitions numbered 0 and 1.

Topic: myTopic	PartitionCount: 2	ReplicationFactor: 1	Configs:	Topic: myTopic	Partition: 0	Leader: 101	Replicas: 101	Isr: 101	Topic: myTopic	Partition: 1	Leader: 101	Replicas: 101	Isr: 101
5
Configure the project application

Create the following Gradle build file for the project, named build.gradle:

buildscript {    repositories {        mavenCentral()    }    dependencies {        classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2"    }}plugins {    id "java"    id "idea"    id "eclipse"}sourceCompatibility = "1.8"targetCompatibility = "1.8"version = "0.0.1"repositories {    mavenCentral()    maven {        url "https://packages.confluent.io/maven"    }}apply plugin: "com.github.dpq1422.shadow"dependencies {    implementation "org.slf4j:slf4j-simple:1.7.30"    implementation "org.apache.kafka:kafka-streams:2.7.0"    testImplementation "junit:junit:4.13.2"    testImplementation 'org.hamcrest:hamcrest:2.2'}test {    testLogging {        outputs.upToDateWhen { false }        showStandardStreams = true        exceptionFormat = "full"    }}jar {  manifest {    attributes(      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),      "Main-Class": "io.confluent.developer.KafkaProducerApplication"    )  }}shadowJar {    archiveBaseName = "message-ordering-standalone"    archiveClassifier = ''}

Run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration
6
Set the application properties

Create a development properties file at configuration/dev.properties:

bootstrap.servers=localhost:29092#Properties below this line are specific to code in this applicationoutput.topic.name=myTopic
7
Create the Kafka Producer application

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Before you create your full application code, let’s highlight some of the most important ProducerConfig configuration parameters relevant to the idempotent producer:

enable.idempotence=trueacks=allmax.in.flight.requests.per.connection=5retries=2147483647

The following parameter is required to be explicitly configured:

  • enable.idempotence: when set to true, it enables an idempotent producer which ensures that exactly one copy of each message is written to the brokers, and in order. The default value is enable.idempotence=false, so you must explicitly set this to enable.idempotence=true.

The other parameters may not be required to be explicitly set, but there are some noteworthy caveats:

  • acks: the KafkaProducer uses the acks configuration to tell the leader broker how many acknowledgments to wait for to consider a produce request complete. This value must be acks=all for the idempotent producer to work, otherwise the producer cannot guarantee idempotence. The default value is acks=1, so you have two choices: (a) do not explicitly set it in the configuration and allow the producer automatically override it, or (b) explicitly set this to acks=all. The producer will fail to start if enable.idempotence=true and your application configures this to anything but acks=all.
  • max.in.flight.requests.per.connection: the maximum number of unacknowledged requests the producer sends on a single connection before blocking. The idempotent producer still maintains message order even with pipelining (i.e., max.in.flight.requests.per.connection can be greater than 1), and the maximum value supported with idempotency is 5. The default value is already max.in.flight.requests.per.connection=5, so no change is required for the idempotent producer.
  • retries: setting a value greater than zero will cause the producer to resend any record whose send fails with a potentially transient error. The only requirement for idempotency is that this is greater than zero. The default value is already retries=2147483647, so no change is required for the idempotent producer.

This is only a small subset of producer configuration parameters focused on idempotent producer semantics. For further reading, please see KIP-98. This KIP also discusses other elements of exactly once semantics (EOS), including transactional guarantees which enable applications to produce to multiple partitions atomically, ie. all writes across multiple partitions can succeed or fail as a unit.

Now let’s create the application source code at src/main/java/io/confluent/developer/KafkaProducerApplication.java.

package io.confluent.developer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.common.serialization.StringSerializer;import java.io.FileInputStream;import java.io.IOException;import java.nio.file.Files;import java.nio.file.Paths;import java.util.Collection;import java.util.List;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.stream.Collectors;public class KafkaProducerApplication {    private final Producer<String, String> producer;    final String outTopic;    public KafkaProducerApplication(final Producer<String, String> producer,                                    final String topic) {        this.producer = producer;        outTopic = topic;    }    public void produce(final String message) {        final String[] parts = message.split("-");        final String key, value;        if (parts.length > 1) {            key = parts[0];            value = parts[1];        } else {            key = null;            value = parts[0];        }        final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outTopic, key, value);        producer.send(producerRecord,                (recordMetadata, e) -> {                    if(e != null) {                       e.printStackTrace();                    } else {                      System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());                    }                }            );    }    public void shutdown() {        producer.close();    }    public static Properties loadProperties(String fileName) throws IOException {        final Properties envProps = new Properties();        final FileInputStream input = new FileInputStream(fileName);        envProps.load(input);        input.close();        return envProps;    }    public static void main(String[] args) throws Exception {        if (args.length < 2) {            throw new IllegalArgumentException(                    "This program takes two arguments: the path to an environment configuration file and" +                            "the path to the file with records to send");        }        final Properties props = KafkaProducerApplication.loadProperties(args[0]);        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");        props.put(ProducerConfig.ACKS_CONFIG, "all");        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        final String topic = props.getProperty("output.topic.name");        final Producer<String, String> producer = new KafkaProducer<>(props);        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);        String filePath = args[1];        try {            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));            linesToProduce.stream()                          .filter(l -> !l.trim().isEmpty())                          .forEach(producerApp::produce);            System.out.println("Offsets and timestamps committed in batch from " + filePath);        } catch (IOException e) {            System.err.printf("Error reading file %s due to %s %n", filePath, e);        } finally {          producerApp.shutdown();        }    }}
8
Create data to produce to Kafka

Create the following file input.txt in the base directory of the tutorial. The numbers before the - will be the key and the part after will be the value.

a-1b-2c-3d-4a-5b-6c-7d-8a-9b-10c-11d-12
9
Compile and run the Kafka Producer application

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar for the KafkaProducerApplication, you can launch it locally.

java -jar build/libs/message-ordering-standalone-0.0.1.jar configuration/dev.properties input.txt

After you run the previous command, the application will process the file and you should some logs like this on the console:

[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=myApp] Instantiated an idempotent producer. ....[kafka-producer-network-thread | myApp] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=myApp] ProducerId set to 0 with epoch 0 
The producer is configured for idempotency
This app has been assigned ProducerId=0 (If you were to run the app again, then it would increase to ProducerId=1)

And then you should see the output from the Producer application, which displays confirmation at which offset each record was written to via a Callback lambda expression:

Offsets and timestamps committed in batch from input.txtkey/value a/1   written to topic[partition] myTopic[0] at offset 0key/value b/2   written to topic[partition] myTopic[0] at offset 1key/value c/3   written to topic[partition] myTopic[0] at offset 2key/value a/5   written to topic[partition] myTopic[0] at offset 3key/value b/6   written to topic[partition] myTopic[0] at offset 4key/value c/7   written to topic[partition] myTopic[0] at offset 5key/value a/9   written to topic[partition] myTopic[0] at offset 6key/value b/10  written to topic[partition] myTopic[0] at offset 7key/value c/11  written to topic[partition] myTopic[0] at offset 8key/value d/4   written to topic[partition] myTopic[1] at offset 0key/value d/8   written to topic[partition] myTopic[1] at offset 1key/value d/12  written to topic[partition] myTopic[1] at offset 2

Test it

1
View all records in the topic

Run a console consumer to read all the messages from myTopic to confirm the producer published the expected records.

docker-compose exec broker kafka-console-consumer --topic myTopic \ --bootstrap-server broker:9092 \ --from-beginning \ --property print.key=true \ --property key.separator=" : "

The output from the consumer should look something like below. Notice that the messages are not in order—this is expected! This illustrates that for the consumer, message order is not maintained across topic partitions, it is only maintained per partition (as we will see in the next few steps).

a : 1b : 2c : 3a : 5b : 6c : 7a : 9b : 10c : 11d : 4d : 8d : 12

Close the consumer with a CTRL+C.

2
Consume the data in partition 0

Consume data from the Kafka topic, specifying only to read from partition 0.

docker-compose exec broker kafka-console-consumer \  --bootstrap-server localhost:9092 \  --topic myTopic \  --property print.key=true \  --property key.separator=, \  --partition 0 \  --from-beginning

You should see only some of the records in this partition.

a,1b,2c,3a,5b,6c,7a,9b,10c,11Processed a total of 9 messages

Close the consumer by entering CTRL+C.

3
View the broker log segment file for partition 0

Now let’s look at the Kafka broker’s log segment files using the kafka-dump-log administrative tool. First, examine partition 0, indicated by the 0 in myTopic-0.

docker-compose exec broker kafka-dump-log \  --print-data-log \  --files '/var/lib/kafka/data/myTopic-0/00000000000000000000.log' \  --deep-iteration

You should see:

Dumping /var/lib/kafka/data/myTopic-0/00000000000000000000.logStarting offset: 0baseOffset: 0 lastOffset: 8 count: 9 baseSequence: 0 lastSequence: 8 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1602295941754 size: 144 magic: 2 compresscodec: NONE crc: 1009740801 isvalid: true| offset: 0 CreateTime: 1602295941743 keysize: 1 valuesize: 1 sequence: 0 headerKeys: [] key: a payload: 1| offset: 1 CreateTime: 1602295941753 keysize: 1 valuesize: 1 sequence: 1 headerKeys: [] key: b payload: 2| offset: 2 CreateTime: 1602295941753 keysize: 1 valuesize: 1 sequence: 2 headerKeys: [] key: c payload: 3| offset: 3 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 3 headerKeys: [] key: a payload: 5| offset: 4 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 4 headerKeys: [] key: b payload: 6| offset: 5 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 5 headerKeys: [] key: c payload: 7| offset: 6 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 6 headerKeys: [] key: a payload: 9| offset: 7 CreateTime: 1602295941754 keysize: 1 valuesize: 2 sequence: 7 headerKeys: [] key: b payload: 10| offset: 8 CreateTime: 1602295941754 keysize: 1 valuesize: 2 sequence: 8 headerKeys: [] key: c payload: 11

Note the familiar producerId: 0, which corresponds to the earlier log output from the producer application run. (If the producer were not configured to be idempotent, this would show producerId: -1.)

Also observe that each message has a unique sequence number, starting with sequence: 0 through sequence: 8, that are not duplicated and are all in order. The broker checks the sequence number to ensure idempotency per partition, such that if a producer experiences a retriable exception and resends a message, sequence numbers will not be duplicated or out of order in the committed log. (If the producer were not configured to be idempotent, the messages would show sequence: -1.)

4
Consume the data in partition 1

Consume data from the Kafka topic, specifying only to read from partition 1.

docker-compose exec broker kafka-console-consumer \  --bootstrap-server localhost:9092 \  --topic myTopic \  --property print.key=true \  --property key.separator=, \  --partition 1 \  --from-beginning

You should see only some of the records in this partition.

d,4d,8d,12Processed a total of 3 messages

Close the consumer by entering CTRL+C.

5
View the broker log segment file for partition 1

Use the kafka-dump-log administrative tool again to examine partition 1, indicated by the 1 in myTopic-1.

docker-compose exec broker kafka-dump-log \  --print-data-log \  --files '/var/lib/kafka/data/myTopic-1/00000000000000000000.log' \  --deep-iteration

You should see:

Dumping /var/lib/kafka/data/myTopic-1/00000000000000000000.logStarting offset: 0baseOffset: 0 lastOffset: 2 count: 3 baseSequence: 0 lastSequence: 2 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1602295941754 size: 89 magic: 2 compresscodec: NONE crc: 308733939 isvalid: true| offset: 0 CreateTime: 1602295941753 keysize: 1 valuesize: 1 sequence: 0 headerKeys: [] key: d payload: 4| offset: 1 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 1 headerKeys: [] key: d payload: 8| offset: 2 CreateTime: 1602295941754 keysize: 1 valuesize: 2 sequence: 2 headerKeys: [] key: d payload: 12

The producerId is the same as shown in the log output from the previous partition, because it is the same producer application with the same producer ID. The sequence numbers in this partition are unique and unrelated to the other partition, so these records have sequence: 0 through sequence: 2.

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)