In this article, we will go through simplest example of Apache Kafka along with basic installation steps for Windows operating system. We will use Java for code examples.
Example in this article
- Install Kafka on Windows machine.
- Create a producer which will mimic customer & deposit bank check.
- Deposited check amount will be published to a Kafka topic.
- Bank Check processor consumer will pick amounts from Kafka topic & process it.
We will use this example & execute in different ways to understanding Kafka features.
Installation
Kafka Setup on Windows OS | Basic Installation, Setup, Verification, Cluster Setup, Storage
Kafka client Java API
To connect to Kafka, publish or subscribe messages we need to use kafka-client java library. Below are references.
- Maven Dependency – https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
- Javadoc – https://kafka.apache.org/24/javadoc/index.html
- Producer configurations – https://kafka.apache.org/documentation/#producerconfigs
- Consumer configurations – https://kafka.apache.org/documentation/#consumerconfigs
Its not required to create topic from command line. By default “auto.create.topics.enable” property is true due to which topic is automatically created when producer or consumer tries to access through code.
Kafka Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
package com.itsallbinary.tutorial.kafka.messaging; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CheckTransactionProducer { public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println(LocalTime.now().format(DateTimeFormatter.ISO_TIME) + " | Starting PRODUCER."); /* * Producer properties */ Properties props = new Properties(); // Kafka server host & port props.put("bootstrap.servers", "localhost:9092"); // Wait till published record is replicated to all replicas & ack is received. // Strongest guarantee of not losing record. props.put("acks", "all"); // Serializer class to use to serialize key of the published record. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Serializer class to use to serialize value of the published record. props.put("value.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"); // Example check amounts to deposit double[] checksToDeposit = { 1000, 5000, 500, 2000, 500, 1500, 3000, 2500, 500, 4500 }; // Create kafka producer Producer<String, Double> producer = new KafkaProducer<>(props); // Iterate through example amounts & mimic check being deposited by users. for (int i = 0; i < checksToDeposit.length; i++) { // Create a producer record by providing topic name, key & value. ProducerRecord<String, Double> record = new ProducerRecord<String, Double>("bankcheck-deposit-topic", "transaction-key-" + i, checksToDeposit[i]); // Publish producer record. Future<RecordMetadata> recordMetadata = producer.send(record); System.out.printf( LocalTime.now().format(DateTimeFormatter.ISO_TIME) + " | Check Deposited. | offset = %d, key = %s, Amount = %s%n", recordMetadata.get().offset(), "transaction-key-" + i, checksToDeposit[i]); } // Close the producer as all records are published. producer.close(); } } |
Kafka Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
package com.itsallbinary.tutorial.kafka.messaging; import java.time.Duration; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class CheckProcessingConsumer { public static void main(String[] args) { String CONSUMER_GROUP = "check-deposit-consumer-1111"; System.out.println(LocalTime.now().format(DateTimeFormatter.ISO_TIME) + " | Starting CONSUMER from group " + CONSUMER_GROUP); /* * Consumer properties */ Properties consumerProps = new Properties(); // Kafka server host & port consumerProps.setProperty("bootstrap.servers", "localhost:9092"); // Group id. Multiple consumers can be grouped under single group & they will // act as single consumer. consumerProps.setProperty("group.id", CONSUMER_GROUP); // Automatically commit offset to kakfa so next time messages consumed beyond // current offset. consumerProps.setProperty("enable.auto.commit", "true"); // Time interval after which offset should be auto committed. consumerProps.setProperty("auto.commit.interval.ms", "1000"); // Deserializer class to use to deserialize key of the consumed record. consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // Deserializer class to use to deserialize value of the consumed record. consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer"); // Property indicating to consume messaging from starting. // consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Create consumer & subscriber to the topic KafkaConsumer<String, Double> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Arrays.asList("bankcheck-deposit-topic")); // Loop infinite & keep polling topic. while (true) { // Poll topic to get records starting from last committed offset. Wait for 100 // millis or else timeout. ConsumerRecords<String, Double> checksToDeposit = consumer.poll(Duration.ofMillis(100)); // Iterate through consumed records & process checks. for (ConsumerRecord<String, Double> checkRecord : checksToDeposit) { System.out.printf( LocalTime.now().format(DateTimeFormatter.ISO_TIME) + " | Check Processed. | offset = %d, key = %s, Amount = %s%n", checkRecord.offset(), checkRecord.key(), checkRecord.value()); } } } } |
Execute program & observe Kafka features
We will try different scenarios to observe behaviors of Kafka in different cases.
New Consumer connects before Producer publishes
- Scenario
- Run the consumer first which will keep polling Kafka topic
- Then run the producer & publish messages to Kafka topic.
This is the producer log which is started after consumer. As per code, producer will send 10 records & then close producer. So producer java program exits after that.
1 2 3 4 5 6 7 8 9 10 11 |
14:14:20.2306207 | Starting PRODUCER. 14:14:21.7006233 | Check Deposited. | offset = 0, key = transaction-key-0, Amount = 1000.0 14:14:21.7856236 | Check Deposited. | offset = 1, key = transaction-key-1, Amount = 5000.0 14:14:21.7906262 | Check Deposited. | offset = 2, key = transaction-key-2, Amount = 500.0 14:14:21.7966236 | Check Deposited. | offset = 3, key = transaction-key-3, Amount = 2000.0 14:14:21.8016241 | Check Deposited. | offset = 4, key = transaction-key-4, Amount = 500.0 14:14:21.8076269 | Check Deposited. | offset = 5, key = transaction-key-5, Amount = 1500.0 14:14:21.8126256 | Check Deposited. | offset = 6, key = transaction-key-6, Amount = 3000.0 14:14:21.8756228 | Check Deposited. | offset = 7, key = transaction-key-7, Amount = 2500.0 14:14:21.879627 | Check Deposited. | offset = 8, key = transaction-key-8, Amount = 500.0 14:14:21.884625 | Check Deposited. | offset = 9, key = transaction-key-9, Amount = 4500.0 |
Below is the consumer log. After producer publishes records, consumer receives records & processes it. Consumer JVM keeps running since its a infinite loop.
1 2 3 4 5 6 7 8 9 10 11 |
14:12:28.8747644 | Starting CONSUMER from group check-deposit-consumer 14:14:21.7756277 | Check Processed. | offset = 0, key = transaction-key-0, Amount = 1000.0 14:14:21.7886239 | Check Processed. | offset = 1, key = transaction-key-1, Amount = 5000.0 14:14:21.7936241 | Check Processed. | offset = 2, key = transaction-key-2, Amount = 500.0 14:14:21.8006234 | Check Processed. | offset = 3, key = transaction-key-3, Amount = 2000.0 14:14:21.8066271 | Check Processed. | offset = 4, key = transaction-key-4, Amount = 500.0 14:14:21.8106249 | Check Processed. | offset = 5, key = transaction-key-5, Amount = 1500.0 14:14:21.8726234 | Check Processed. | offset = 6, key = transaction-key-6, Amount = 3000.0 14:14:21.8776224 | Check Processed. | offset = 7, key = transaction-key-7, Amount = 2500.0 14:14:21.8826282 | Check Processed. | offset = 8, key = transaction-key-8, Amount = 500.0 14:14:21.8896247 | Check Processed. | offset = 9, key = transaction-key-9, Amount = 4500.0 |
Notice the offset value of the last record which is ‘9’. This is committed to Kafka automatically due to property we set consumerProps.setProperty("enable.auto.commit", "true"); . Kafka internally stores offsets in internal topic called “__consumer_offsets”. You can verify current offsets using below CLI command in command prompt.
1 2 3 4 |
bin\windows>kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --all-groups --all-topics GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID check-deposit-consumer bankcheck-deposit-topic 0 10 10 0 consumer-check-deposit-consumer-1-320b5d12-5125-4faf-92fe-a0f5daa0bb2a /123.123.0.19 consumer-check-deposit-consumer-1 |
We will see how this offset is used in next example.
Existing Consumer re-connects AFTER producer published
- Scenario
- Stop existing running consumer from earlier section.
- Start producer & publish all the messages when consumer is not running.
- After producer publishing finishes, then start consumer.
By default, Kafka configuration comes with below property. Below property means that the published message will remain in kafka for 168 hours regardless of whether consumer reads it or not.
1 2 3 4 |
. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 . |
Now here is the producer log which publishes messages and finishes. Notice offset which is “+1” from offset from earlier test.
1 2 3 4 5 6 7 8 9 10 11 |
14:22:05.2302999 | Starting PRODUCER. 14:22:06.7623008 | Check Deposited. | offset = 10, key = transaction-key-0, Amount = 1000.0 14:22:06.7893 | Check Deposited. | offset = 11, key = transaction-key-1, Amount = 5000.0 14:22:06.7932999 | Check Deposited. | offset = 12, key = transaction-key-2, Amount = 500.0 14:22:06.7973032 | Check Deposited. | offset = 13, key = transaction-key-3, Amount = 2000.0 14:22:06.8022996 | Check Deposited. | offset = 14, key = transaction-key-4, Amount = 500.0 14:22:06.8053016 | Check Deposited. | offset = 15, key = transaction-key-5, Amount = 1500.0 14:22:06.8093027 | Check Deposited. | offset = 16, key = transaction-key-6, Amount = 3000.0 14:22:06.8123008 | Check Deposited. | offset = 17, key = transaction-key-7, Amount = 2500.0 14:22:06.8163014 | Check Deposited. | offset = 18, key = transaction-key-8, Amount = 500.0 14:22:06.8222997 | Check Deposited. | offset = 19, key = transaction-key-9, Amount = 4500.0 |
Now in this test, consumer was not connected when publisher published message. In other MQ providers like ActiveMQ or RabbitMQ, message is lost if consumer was not subscribed at the time message was published. In Kafka, due to above configuration, Kafka consumer can connect later (Before 168 hours in our case) & still consume message.
Below is consumer log which is started few minutes later. In earlier example, offset was stored as ‘9’. So now consumer starts from offset 10 onwards & reads all messages.
1 2 3 4 5 6 7 8 9 10 11 |
14:25:59.6097974 | Starting CONSUMER from group check-deposit-consumer 14:26:01.2488028 | Check Processed. | offset = 10, key = transaction-key-0, Amount = 1000.0 14:26:01.2508032 | Check Processed. | offset = 11, key = transaction-key-1, Amount = 5000.0 14:26:01.2518011 | Check Processed. | offset = 12, key = transaction-key-2, Amount = 500.0 14:26:01.2528014 | Check Processed. | offset = 13, key = transaction-key-3, Amount = 2000.0 14:26:01.2528014 | Check Processed. | offset = 14, key = transaction-key-4, Amount = 500.0 14:26:01.2538019 | Check Processed. | offset = 15, key = transaction-key-5, Amount = 1500.0 14:26:01.2548003 | Check Processed. | offset = 16, key = transaction-key-6, Amount = 3000.0 14:26:01.2558047 | Check Processed. | offset = 17, key = transaction-key-7, Amount = 2500.0 14:26:01.2568013 | Check Processed. | offset = 18, key = transaction-key-8, Amount = 500.0 14:26:01.2578014 | Check Processed. | offset = 19, key = transaction-key-9, Amount = 4500.0 |
Note that this is true for the consumers which had connected to particular topic before, committed offset to kafka & reconnecting again. If you connect new consumer with different consumer group, then it won’t read past messages by default because it never committed offset to kafka. For this check next section.
New Consumer Group connect & consume all messages from starting.
- Scenario
- Connect new consumer to existing topic which already had published messages.
- Consume all past messages from beginning.
If new consumer group wants to read messages from past then use below shown property in consumer code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import org.apache.kafka.clients.consumer.ConsumerConfig; public class CheckProcessingConsumer { public static void main(String[] args) { String CONSUMER_GROUP = "check-deposit-consumer-NEW"; . . // Property indicating to consume messaging from starting. consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); . . } } |
You can see this new consumer consumed all messages starting from offset 0 till 19 i.e. all messages from earlier tests.
1 2 3 4 5 6 7 8 9 10 11 |
17:23:36.5271387 | Starting CONSUMER from group check-deposit-consumer-NEW 17:23:38.2781422 | Check Processed. | offset = 0, key = transaction-key-0, Amount = 1000.0 17:23:38.2791417 | Check Processed. | offset = 1, key = transaction-key-1, Amount = 5000.0 17:23:38.2791417 | Check Processed. | offset = 2, key = transaction-key-2, Amount = 500.0 17:23:38.2801402 | Check Processed. | offset = 3, key = transaction-key-3, Amount = 2000.0 17:23:38.2801402 | Check Processed. | offset = 4, key = transaction-key-4, Amount = 500.0 . . . 17:23:38.2871411 | Check Processed. | offset = 18, key = transaction-key-8, Amount = 500.0 17:23:38.2871411 | Check Processed. | offset = 19, key = transaction-key-9, Amount = 4500.0 |
Existing Consumer re-connects AFTER retention period is over.
- Scenario
- Run producer first & publish all messages.
- Wait for retention period to be over.
- Connect consumer after retention period & try to consume messages.
- Run producer again to get new messages & see if those messages get consumed.
To make this test say, lets modify properties so that retention period is smaller to test quickly.
1 2 3 4 5 6 7 8 |
. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=1 . # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=1000 . |
Here is the producer log. Notice that offset started from where we left off in earlier test.
1 2 3 4 5 6 7 8 9 10 11 |
14:49:30.3673549 | Starting PRODUCER. 14:49:31.9843576 | Check Deposited. | offset = 20, key = transaction-key-0, Amount = 1000.0 14:49:32.0213583 | Check Deposited. | offset = 21, key = transaction-key-1, Amount = 5000.0 14:49:32.0253577 | Check Deposited. | offset = 22, key = transaction-key-2, Amount = 500.0 14:49:32.0293583 | Check Deposited. | offset = 23, key = transaction-key-3, Amount = 2000.0 14:49:32.0363564 | Check Deposited. | offset = 24, key = transaction-key-4, Amount = 500.0 14:49:32.0403579 | Check Deposited. | offset = 25, key = transaction-key-5, Amount = 1500.0 14:49:32.0443596 | Check Deposited. | offset = 26, key = transaction-key-6, Amount = 3000.0 14:49:32.0513588 | Check Deposited. | offset = 27, key = transaction-key-7, Amount = 2500.0 14:49:32.0553597 | Check Deposited. | offset = 28, key = transaction-key-8, Amount = 500.0 14:49:32.0603588 | Check Deposited. | offset = 29, key = transaction-key-9, Amount = 4500.0 |
Now wait for 1 hour ………..
After more than hour, start the consumer from earlier tests. Here is consumer log after more than an hour.
1 |
16:08:22.2311635 | Starting CONSUMER from group check-deposit-consumer |
No messages consumed even though offset was committed because all messages are now deleted since retention period is over. Lets try running producer again & produce more new messages. Keep consumer running.
1 2 3 4 5 6 7 8 9 10 11 |
16:09:29.027444 | Starting PRODUCER. 16:09:30.4954449 | Check Deposited. | offset = 30, key = transaction-key-0, Amount = 1000.0 16:09:30.5204583 | Check Deposited. | offset = 31, key = transaction-key-1, Amount = 5000.0 16:09:30.5234502 | Check Deposited. | offset = 32, key = transaction-key-2, Amount = 500.0 16:09:30.5274473 | Check Deposited. | offset = 33, key = transaction-key-3, Amount = 2000.0 16:09:30.5324457 | Check Deposited. | offset = 34, key = transaction-key-4, Amount = 500.0 16:09:30.5374484 | Check Deposited. | offset = 35, key = transaction-key-5, Amount = 1500.0 16:09:30.540446 | Check Deposited. | offset = 36, key = transaction-key-6, Amount = 3000.0 16:09:30.5464508 | Check Deposited. | offset = 37, key = transaction-key-7, Amount = 2500.0 16:09:30.5564492 | Check Deposited. | offset = 38, key = transaction-key-8, Amount = 500.0 16:09:30.5624487 | Check Deposited. | offset = 39, key = transaction-key-9, Amount = 4500.0 |
Here is the consumer log now. It will start consuming newer messages from newer offset i.e. 30.
1 2 3 4 5 6 7 8 9 10 11 |
16:08:22.2311635 | Starting CONSUMER from group check-deposit-consumer 16:09:30.5334472 | Check Processed. | offset = 30, key = transaction-key-0, Amount = 1000.0 16:09:30.53845 | Check Processed. | offset = 31, key = transaction-key-1, Amount = 5000.0 16:09:30.5394475 | Check Processed. | offset = 32, key = transaction-key-2, Amount = 500.0 16:09:30.5394475 | Check Processed. | offset = 33, key = transaction-key-3, Amount = 2000.0 16:09:30.5394475 | Check Processed. | offset = 34, key = transaction-key-4, Amount = 500.0 16:09:30.542448 | Check Processed. | offset = 35, key = transaction-key-5, Amount = 1500.0 16:09:30.5464508 | Check Processed. | offset = 36, key = transaction-key-6, Amount = 3000.0 16:09:30.5544532 | Check Processed. | offset = 37, key = transaction-key-7, Amount = 2500.0 16:09:30.559446 | Check Processed. | offset = 38, key = transaction-key-8, Amount = 500.0 16:09:30.5654522 | Check Processed. | offset = 39, key = transaction-key-9, Amount = 4500.0 |
Partitioned topic & multiple consumers with same consumer group.
- Scenario
- Run 2 Kafka servers & form a Kafka cluster.
- Create partitioned & replicated topic.
- Run producer & publish messages to partitioned & replicated topic.
- Run 2 consumer instances & consume from all partitions.
Refer to earlier article to setup simple Kafka cluster with 2 Kafka broker. Then create a partitioned & replicated topic. Verify that topic is created as expected.
1 2 3 4 5 6 |
bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic bankcheck-deposit-topic-cluster Created topic bankcheck-deposit-topic-cluster. bin\windows>kafka-topics.bat --describe --topic bankcheck-deposit-topic-cluster --zookeeper localhost:2181 Topic: bankcheck-deposit-topic-cluster PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: bankcheck-deposit-topic-cluster Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1 Topic: bankcheck-deposit-topic-cluster Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 |
As shown in above command output, Kafka created 2 partitions of topic & put each partition on each Kafka server to make it scalable. Kafka also created replicas of each partition on other Kafka server to make it highly available.
Now modify our code as shown below. Change the topic name to newly created topic & add logging for partition.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class CheckProcessingConsumer { public static void main(String[] args) { . KafkaConsumer<String, Double> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Arrays.asList("bankcheck-deposit-topic-cluster")); . . System.out.printf( LocalTime.now().format(DateTimeFormatter.ISO_TIME) + " | Check Processed. | partition = %d, offset = %d, key = %s, Amount = %s%n", checkRecord.partition(), checkRecord.offset(), checkRecord.key(), checkRecord.value()); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class CheckTransactionProducer { public static void main(String[] args) throws InterruptedException, ExecutionException { . . // Create a producer record by providing topic name, key & value. ProducerRecord<String, Double> record = new ProducerRecord<String, Double>( "bankcheck-deposit-topic-cluster", "transaction-key-" + i, checksToDeposit[i]); . . System.out.printf( LocalTime.now().format(DateTimeFormatter.ISO_TIME) + " | Check Deposited. | partition = %d, offset = %d, key = %s, Amount = %s%n", recordMetadata.get().partition(), recordMetadata.get().offset(), "transaction-key-" + i, checksToDeposit[i]); . } } |
Now start 2 instances of consumers first. Simply run consumer java program twice. Then run producer code once. Below are logs.
1 2 3 4 5 6 7 8 9 10 11 12 |
18:03:52.9334636 | Starting PRODUCER. 18:03:54.9554664 | Check Deposited. | partition = 1, offset = 0, key = transaction-key-0, Amount = 1000.0 18:03:55.0814653 | Check Deposited. | partition = 1, offset = 1, key = transaction-key-1, Amount = 5000.0 18:03:55.0894678 | Check Deposited. | partition = 1, offset = 2, key = transaction-key-2, Amount = 500.0 18:03:55.1034647 | Check Deposited. | partition = 0, offset = 0, key = transaction-key-3, Amount = 2000.0 18:03:55.2274653 | Check Deposited. | partition = 0, offset = 1, key = transaction-key-4, Amount = 500.0 18:03:55.2434678 | Check Deposited. | partition = 0, offset = 2, key = transaction-key-5, Amount = 1500.0 18:03:55.2614648 | Check Deposited. | partition = 0, offset = 3, key = transaction-key-6, Amount = 3000.0 18:03:55.2714683 | Check Deposited. | partition = 0, offset = 4, key = transaction-key-7, Amount = 2500.0 18:03:55.2814652 | Check Deposited. | partition = 1, offset = 3, key = transaction-key-8, Amount = 500.0 18:03:55.2934652 | Check Deposited. | partition = 1, offset = 4, key = transaction-key-9, Amount = 4500.0 18:03:55.3064656 | Check Deposited. | partition = 0, offset = 5, key = transaction-key-10, Amount = 1000.0 |
You can see that Kafka client distributed published messages among partitions evenly.
Kafka assigns one consumer per partition. So as you can see below in consumer logs that each consumer from same group consumed messages from single partition.
1 2 3 4 5 6 |
18:03:41.1814623 | Starting CONSUMER from group check-deposit-consumer 18:03:55.0934661 | Check Processed. | partition = 1, offset = 0, key = transaction-key-0, Amount = 1000.0 18:03:55.0994653 | Check Processed. | partition = 1, offset = 1, key = transaction-key-1, Amount = 5000.0 18:03:55.100465 | Check Processed. | partition = 1, offset = 2, key = transaction-key-2, Amount = 500.0 18:03:55.2884672 | Check Processed. | partition = 1, offset = 3, key = transaction-key-8, Amount = 500.0 18:03:55.304466 | Check Processed. | partition = 1, offset = 4, key = transaction-key-9, Amount = 4500.0 |
1 2 3 4 5 6 7 |
18:03:46.0994643 | Starting CONSUMER from group check-deposit-consumer 18:03:55.2434678 | Check Processed. | partition = 0, offset = 0, key = transaction-key-3, Amount = 2000.0 18:03:55.2514654 | Check Processed. | partition = 0, offset = 1, key = transaction-key-4, Amount = 500.0 18:03:55.2584699 | Check Processed. | partition = 0, offset = 2, key = transaction-key-5, Amount = 1500.0 18:03:55.2684671 | Check Processed. | partition = 0, offset = 3, key = transaction-key-6, Amount = 3000.0 18:03:55.2774675 | Check Processed. | partition = 0, offset = 4, key = transaction-key-7, Amount = 2500.0 18:03:55.348465 | Check Processed. | partition = 0, offset = 5, key = transaction-key-10, Amount = 1000.0 |
Replicated topic & Consumer consuming after broker instance failure
- Scenario
- Stop all consumers from past examples.
- Run producer first & publish all messages to Kafka cluster.
- Stop/kill one of the Kafka server/broker to imitate broker failure.
- Run consumer & observe messages being consumed.
1 2 3 4 5 6 7 8 9 10 11 12 |
18:49:04.8486955 | Starting PRODUCER. 18:49:06.4196138 | Check Deposited. | partition = 1, offset = 5, key = transaction-key-0, Amount = 1000.0 18:49:06.4496105 | Check Deposited. | partition = 1, offset = 6, key = transaction-key-1, Amount = 5000.0 18:49:06.4546094 | Check Deposited. | partition = 1, offset = 7, key = transaction-key-2, Amount = 500.0 18:49:06.4596106 | Check Deposited. | partition = 0, offset = 6, key = transaction-key-3, Amount = 2000.0 18:49:06.4736113 | Check Deposited. | partition = 0, offset = 7, key = transaction-key-4, Amount = 500.0 18:49:06.4796096 | Check Deposited. | partition = 0, offset = 8, key = transaction-key-5, Amount = 1500.0 18:49:06.4856115 | Check Deposited. | partition = 0, offset = 9, key = transaction-key-6, Amount = 3000.0 18:49:06.4926134 | Check Deposited. | partition = 0, offset = 10, key = transaction-key-7, Amount = 2500.0 18:49:06.5016128 | Check Deposited. | partition = 1, offset = 8, key = transaction-key-8, Amount = 500.0 18:49:06.5096111 | Check Deposited. | partition = 1, offset = 9, key = transaction-key-9, Amount = 4500.0 18:49:06.517611 | Check Deposited. | partition = 0, offset = 11, key = transaction-key-10, Amount = 1000.0 |
Now you can see that messages are published to different broker partitions. Now stop one of the kafka broker server. Then run the consumer. You can see that consumer will consume all messages from both partitions because partitions are also replicated.
1 2 3 4 5 6 7 8 9 10 11 12 |
18:51:38.6558947 | Starting CONSUMER from group check-deposit-consumer 18:51:40.2909033 | Check Processed. | partition = 0, offset = 6, key = transaction-key-3, Amount = 2000.0 18:51:40.2918986 | Check Processed. | partition = 0, offset = 7, key = transaction-key-4, Amount = 500.0 18:51:40.2928988 | Check Processed. | partition = 0, offset = 8, key = transaction-key-5, Amount = 1500.0 18:51:40.2938997 | Check Processed. | partition = 0, offset = 9, key = transaction-key-6, Amount = 3000.0 18:51:40.2948985 | Check Processed. | partition = 0, offset = 10, key = transaction-key-7, Amount = 2500.0 18:51:40.2958988 | Check Processed. | partition = 0, offset = 11, key = transaction-key-10, Amount = 1000.0 18:51:40.2958988 | Check Processed. | partition = 1, offset = 5, key = transaction-key-0, Amount = 1000.0 18:51:40.296898 | Check Processed. | partition = 1, offset = 6, key = transaction-key-1, Amount = 5000.0 18:51:40.2978992 | Check Processed. | partition = 1, offset = 7, key = transaction-key-2, Amount = 500.0 18:51:40.2988979 | Check Processed. | partition = 1, offset = 8, key = transaction-key-8, Amount = 500.0 18:51:40.2998978 | Check Processed. | partition = 1, offset = 9, key = transaction-key-9, Amount = 4500.0 |