Kafka & Java: Secured Consumer Read Record

(Last Updated On: )

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka Kerberos yet please do so.

Import SSL Cert to Java:

Follow this tutorial to “Installing unlimited strength encryption Java libraries

If on Windows do the following

  1. #Import it
  2. "C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -import -file hadoop.csr -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts" -alias "hadoop"
  3.  
  4. #Check it
  5. "C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -list -v -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"
  6.  
  7. #If you want to delete it
  8. "C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -delete -alias hadoop -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"

POM.xml

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>1.1.0</version>
  5. </dependency>

Imports

  1. import org.apache.kafka.clients.consumer.*;
  2. import java.util.Properties;
  3. import java.io.InputStream;
  4. import java.util.Arrays;

Consumer JAAS Conf (client_jaas.conf)

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useTicketCache=false
  4. refreshKrb5Config=true
  5. debug=true
  6. useKeyTab=true
  7. storeKey=true
  8. keyTab="c:\\data\\kafka.service.keytab"
  9. principal="kafka/hadoop@REALM.CA";
  10. };

Consumer Props File

You can go here to view all the options for consumer properties.

  1. bootstrap.servers=hadoop:9094
  2. group.id=test
  3.  
  4. security.protocol=SASL_SSL
  5. sasl.kerberos.service.name=kafka
  6.  
  7. #offset will be periodically committed in the background
  8. enable.auto.commit=true
  9.  
  10. # The serializer for the key
  11. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  12.  
  13. # The serializer for the value
  14. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  15.  
  16. # heartbeat to detect worker failures
  17. session.timeout.ms=10000
  18.  
  19. #Automatically reset offset to earliest offset
  20. auto.offset.reset=earliest

Initiate Kerberos Authentication

  1. System.setProperty("java.security.auth.login.config", "C:\\data\\kafkaconnect\\kafka\\src\\main\\resources\\client_jaas.conf");
  2. System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2");
  3. System.setProperty("java.security.krb5.conf", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\krb5.conf");
  4. System.setProperty("java.security.krb5.realm", "REALM.CA");
  5. System.setProperty("java.security.krb5.kdc", "REALM.CA");
  6. System.setProperty("sun.security.krb5.debug", "false");
  7. System.setProperty("javax.net.debug", "false");
  8. System.setProperty("javax.net.ssl.keyStorePassword", "changeit");
  9. System.setProperty("javax.net.ssl.keyStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
  10. System.setProperty("javax.net.ssl.trustStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
  11. System.setProperty("javax.net.ssl.trustStorePassword", "changeit");
  12. System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");

Consumer Connection/Send

The record we will read will just be a string for both key and value.

  1. Consumer<String, String> consumer = null;
  2.  
  3. try {
  4. ClassLoader classLoader = getClass().getClassLoader();
  5.  
  6. try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
  7. Properties properties = new Properties();
  8. properties.load(props);
  9. consumer = new KafkaConsumer<>(properties);
  10. }
  11. System.out.println("Consumer Created");
  12.  
  13. // Subscribe to the topic.
  14. consumer.subscribe(Arrays.asList("testTopic"));
  15.  
  16. while (true) {
  17. final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
  18. if (consumerRecords.count() == 0) {
  19. //Keep reading till no records
  20. break;
  21. }
  22.  
  23. consumerRecords.forEach(record -> {
  24. System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
  25. });
  26.  
  27. //Commit offsets returned on the last poll() for all the subscribed list of topics and partition
  28. consumer.commitAsync();
  29. }
  30. } finally {
  31. consumer.close();
  32. }
  33. System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.