In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka Kerberos yet please do so.
Import SSL Cert to Java:
Follow this tutorial to “Installing unlimited strength encryption Java libraries”
If on Windows do the following
#Import it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -import -file hadoop.csr -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts" -alias "hadoop"
#Check it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -list -v -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"
#If you want to delete it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -delete -alias hadoop -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"
POM.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
Imports
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.io.InputStream;
import java.util.Arrays;
Consumer JAAS Conf (client_jaas.conf)
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=false
refreshKrb5Config=true
debug=true
useKeyTab=true
storeKey=true
keyTab="c:\\data\\kafka.service.keytab"
principal="kafka/hadoop@REALM.CA";
};
Consumer Props File
You can go here to view all the options for consumer properties.
bootstrap.servers=hadoop:9094
group.id=test
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
#offset will be periodically committed in the background
enable.auto.commit=true
# The serializer for the key
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# The serializer for the value
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# heartbeat to detect worker failures
session.timeout.ms=10000
#Automatically reset offset to earliest offset
auto.offset.reset=earliest
Initiate Kerberos Authentication
System.setProperty("java.security.auth.login.config", "C:\\data\\kafkaconnect\\kafka\\src\\main\\resources\\client_jaas.conf");
System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2");
System.setProperty("java.security.krb5.conf", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\krb5.conf");
System.setProperty("java.security.krb5.realm", "REALM.CA");
System.setProperty("java.security.krb5.kdc", "REALM.CA");
System.setProperty("sun.security.krb5.debug", "false");
System.setProperty("javax.net.debug", "false");
System.setProperty("javax.net.ssl.keyStorePassword", "changeit");
System.setProperty("javax.net.ssl.keyStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
System.setProperty("javax.net.ssl.trustStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
System.setProperty("javax.net.ssl.trustStorePassword", "changeit");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");
Consumer Connection/Send
The record we will read will just be a string for both key and value.
Consumer<String, String> consumer = null;
try {
ClassLoader classLoader = getClass().getClassLoader();
try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
Properties properties = new Properties();
properties.load(props);
consumer = new KafkaConsumer<>(properties);
}
System.out.println("Consumer Created");
// Subscribe to the topic.
consumer.subscribe(Arrays.asList("testTopic"));
while (true) {
final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
if (consumerRecords.count() == 0) {
//Keep reading till no records
break;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
});
//Commit offsets returned on the last poll() for all the subscribed list of topics and partition
consumer.commitAsync();
}
} finally {
consumer.close();
}
System.out.println("Consumer Closed");
References
I used kafka-sample-programs as a guide for setting up props.
You must be logged in to post a comment.