In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka yet please do so.
POM.xml
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>1.1.0</version>
- </dependency>
Imports
- import org.apache.kafka.clients.consumer.*;
- import java.util.Properties;
- import java.io.InputStream;
- import java.util.Arrays;
Consumer Props File
You can go here to view all the options for consumer properties.
- # The url to kafka
- bootstrap.servers=localhost:9092
- #identify consumer group
- group.id=test
- #offset will be periodically committed in the background
- enable.auto.commit=true
- # The serializer for the key
- key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- # The serializer for the value
- value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- # heartbeat to detect worker failures
- session.timeout.ms=10000
- #Automatically reset offset to earliest offset
- auto.offset.reset=earliest
Consumer Connection/Send
The record we will read will just be a string for both key and value.
- Consumer<String, String> consumer = null;
- try {
- ClassLoader classLoader = getClass().getClassLoader();
- try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
- Properties properties = new Properties();
- properties.load(props);
- consumer = new KafkaConsumer<>(properties);
- }
- System.out.println("Consumer Created");
- // Subscribe to the topic.
- consumer.subscribe(Arrays.asList("testTopic"));
- while (true) {
- final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
- if (consumerRecords.count() == 0) {
- //Keep reading till no records
- break;
- }
- consumerRecords.forEach(record -> {
- System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
- });
- //Commit offsets returned on the last poll() for all the subscribed list of topics and partition
- consumer.commitAsync();
- }
- } finally {
- consumer.close();
- }
- System.out.println("Consumer Closed");
References
I used kafka-sample-programs as a guide for setting up props.
2 thoughts on “Kafka & Java: Unsecure Consumer Read Record”
Comments are closed.