In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka yet please do so.
POM.xml
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency>
Imports
import org.apache.kafka.clients.consumer.*; import java.util.Properties; import java.io.InputStream; import java.util.Arrays;
Consumer Props File
You can go here to view all the options for consumer properties.
# The url to kafka bootstrap.servers=localhost:9092 #identify consumer group group.id=test #offset will be periodically committed in the background enable.auto.commit=true # The serializer for the key key.deserializer=org.apache.kafka.common.serialization.StringDeserializer # The serializer for the value value.deserializer=org.apache.kafka.common.serialization.StringDeserializer # heartbeat to detect worker failures session.timeout.ms=10000 #Automatically reset offset to earliest offset auto.offset.reset=earliest
Consumer Connection/Send
The record we will read will just be a string for both key and value.
Consumer<String, String> consumer = null; try { ClassLoader classLoader = getClass().getClassLoader(); try (InputStream props = classLoader.getResourceAsStream("consumer.props")) { Properties properties = new Properties(); properties.load(props); consumer = new KafkaConsumer<>(properties); } System.out.println("Consumer Created"); // Subscribe to the topic. consumer.subscribe(Arrays.asList("testTopic")); while (true) { final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); if (consumerRecords.count() == 0) { //Keep reading till no records break; } consumerRecords.forEach(record -> { System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset()); }); //Commit offsets returned on the last poll() for all the subscribed list of topics and partition consumer.commitAsync(); } } finally { consumer.close(); } System.out.println("Consumer Closed");
References
I used kafka-sample-programs as a guide for setting up props.
2 thoughts on “Kafka & Java: Unsecure Consumer Read Record”
Comments are closed.