Kafka & Java: Unsecure Consumer Read Record

(Last Updated On: )

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka yet please do so.

POM.xml

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>1.1.0</version>
  5. </dependency>

Imports

  1. import org.apache.kafka.clients.consumer.*;
  2. import java.util.Properties;
  3. import java.io.InputStream;
  4. import java.util.Arrays;

Consumer Props File

You can go here to view all the options for consumer properties.

  1. # The url to kafka
  2. bootstrap.servers=localhost:9092
  3.  
  4. #identify consumer group
  5. group.id=test
  6.  
  7. #offset will be periodically committed in the background
  8. enable.auto.commit=true
  9.  
  10. # The serializer for the key
  11. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  12.  
  13. # The serializer for the value
  14. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  15.  
  16. # heartbeat to detect worker failures
  17. session.timeout.ms=10000
  18.  
  19. #Automatically reset offset to earliest offset
  20. auto.offset.reset=earliest

Consumer Connection/Send

The record we will read will just be a string for both key and value.

  1. Consumer<String, String> consumer = null;
  2.  
  3. try {
  4. ClassLoader classLoader = getClass().getClassLoader();
  5.  
  6. try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
  7. Properties properties = new Properties();
  8. properties.load(props);
  9. consumer = new KafkaConsumer<>(properties);
  10. }
  11. System.out.println("Consumer Created");
  12.  
  13. // Subscribe to the topic.
  14. consumer.subscribe(Arrays.asList("testTopic"));
  15.  
  16. while (true) {
  17. final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
  18. if (consumerRecords.count() == 0) {
  19. //Keep reading till no records
  20. break;
  21. }
  22.  
  23. consumerRecords.forEach(record -> {
  24. System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
  25. });
  26.  
  27. //Commit offsets returned on the last poll() for all the subscribed list of topics and partition
  28. consumer.commitAsync();
  29. }
  30. } finally {
  31. consumer.close();
  32. }
  33. System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.

2 thoughts on “Kafka & Java: Unsecure Consumer Read Record”

Comments are closed.