Apache Kafka®, Kafka Streams, and ksqlDB to demonstrate real use cases - 4 (Kafka-console-consumer-read-specific-offsets-partitions)

How to read from a specific offset and partition with the Kafka Console Consumer

Question:

How do I read from a specific offset and partition of a Kafka topic?

Example use case:

You are confirming record arrivals and you’d like to read from a specific offset in a topic partition. In this tutorial you’ll learn how to use the Kafka console consumer to quickly debug issues by reading from a specific offset as well as control the number of records you read.

Short Answer

Use the kafka-console-consumer command with the --partition and --offset flags to read from a specific partition and offset.

kafka-console-consumer --topic example-topic --bootstrap-server broker:9092 \ --property print.key=true \ --property key.separator="-" \ --partition 1 \ --offset 6
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir console-consumer-read-specific-offsets-partition && cd console-consumer-read-specific-offsets-partition
2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform.

---version: '2'services:  zookeeper:    image: confluentinc/cp-zookeeper:6.1.0    hostname: zookeeper    container_name: zookeeper    ports:      - "2181:2181"    environment:      ZOOKEEPER_CLIENT_PORT: 2181      ZOOKEEPER_TICK_TIME: 2000  broker:    image: confluentinc/cp-kafka:6.1.0    hostname: broker    container_name: broker    depends_on:      - zookeeper    ports:      - "29092:29092"    environment:      KAFKA_BROKER_ID: 1      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

Now launch Confluent Platform by running:

docker-compose up -d
3
Create a topic with multiple partitions

Your first step is to create a topic to produce to and consume from. This time you’ll add more than one partition so you can see how the keys end up on different partitions.

Your first step is to open a shell on the broker container:

docker-compose exec broker bash

Then use the following command to create the topic:

kafka-topics --create --topic example-topic --bootstrap-server broker:9092 --replication-factor 1 --partitions 2

Keep the container shell you just started open, as you’ll use it in the next step.

4
Produce records with keys and values

To get started, lets produce some records to your new topic.

Since you’ve created a topic with more than one partition, you’ll send full key-value pairs so you’ll be able to see how different keys end up on different partitions. To send full key-value pairs you’ll specify the parse.key and key.separator options to the console producer command.

Let’s run the following command in the broker container shell from the previous step to start a new console producer:

kafka-console-producer --topic example-topic --bootstrap-server broker:9092 \  --property parse.key=true \  --property key.separator=":"

Then enter these records either one at time or copy-paste all of them into the terminal and hit enter:

key1:the lazykey2:fox jumpedkey3:over thekey4:brown cowkey1:Allkey2:streamskey3:leadkey4:tokey1:Kafkakey2:Go tokey3:Kafkakey4:summit

After you’ve sent the records, you can close the producer with a CTRL+C command, but keep the broker container shell open as you’ll still need it for the next few steps.

5
Start a console consumer to read from the first partition

Next let’s open up a console consumer to read records sent to the topic in the previous step, but you’ll only read from the first partition. Kafka partitions are zero based so your two partitions are numbered 0, and 1 respectively.

Using the broker container shell, lets start a console consumer to read only records from the first partition, 0

kafka-console-consumer --topic example-topic --bootstrap-server broker:9092 \ --from-beginning \ --property print.key=true \ --property key.separator="-" \ --partition 0

After a few seconds you should see something like this (your output will vary depending on the hashing algorithm):

key1-the lazykey1-Allkey1-Kafka

You’ll notice you sent 12 records, but only 3 went to the first partition. The reason for this is the way Kafka calculates the partition assignment for a given record. Kafka calculates the partition by taking the hash of the key modulo the number of partitions. So, even though you have 2 partitions, depending on what the key hash value is, you aren’t guaranteed an even distribution of records across partitions.

Go ahead and shut down the current consumer with a CTRL+C

6
Start a console consumer to read from the second partition

In the previous step, you consumed records from the first partition of your topic. In this step you’ll consume the rest of your records from the second partition 1.

If you haven’t done so already, close the previous console consumer with a CTRL+C.

Then start a new console consumer to read only records from the second partition:

kafka-console-consumer --topic example-topic --bootstrap-server broker:9092 \ --from-beginning \ --property print.key=true \ --property key.separator="-" \ --partition 1

After a few seconds you should see something like this

key2-fox jumpedkey3-over thekey4-brown cowkey2-streamskey3-leadkey4-tokey2-Go tokey3-Kafkakey4-summit

As you’d expect, the remaining 9 records are on the second partition.

Go ahead and shut down the current consumer with a CTRL+C

7
Read records starting from a specific offset

So far you’ve learned how to consume records from a specific partition. When you specify the partition, you can optionally specify the offset to start consuming from. Specifying a specific offset can be helpful when debugging an issue, in that you can skip consuming records that you know aren’t a potential problem.

If you haven’t done so already, close the previous console consumer with a CTRL+C.

From the previous step you know there are 9 records in the second partition. In this step you’ll only consume records starting from offset 6, so you should only see the last 3 records on the screen. The changes in this command include removing the --from-beginning property and adding an --offset flag

Here’s the command to read records from the second partition starting at offset 6:

kafka-console-consumer --topic example-topic --bootstrap-server broker:9092 \ --property print.key=true \ --property key.separator="-" \ --partition 1 \ --offset 6

After a few seconds you should see something like this

key2-Go tokey3-Kafkakey4-summit

As you can see, you’ve consumed records starting from offset 6 to the end of the log.

Go ahead and shut down the current consumer with a CTRL+C

8
Clean up

You’re all done now!

Go back to your open windows and stop any console consumers with a CTRL+C then close the container shells with a CTRL+D command.

Then you can shut down the docker container by running:

docker-compose down

 

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)