NiFi: Kerberized Kafka Consumer Processor

In this tutorial I will guide you through how to add a Kafka consumer to NiFi which is Kerberized.

For this tutorial you will need an AVRO schema called “person” and it’s contents are as follows.

  1. {
  2. "type": "record",
  3. "namespace": "com.example",
  4. "name": "FullName",
  5. "fields": [
  6. { "name": "first_name", "type": "string" },
  7. { "name": "last_name", "type": "string" }
  8. ]
  9. }

When ready you can publish this record to Kafka using the Kafka Producer.

  1. { "first_name": "John", "last_name": "Smith" }

First we need to drag the processor onto the grid.

Next we need select the Kafka Consumer.

Next we configure the processor

 

 

 

 

 

 

 

We will need to create 5 controller services.
First is the Kerberos Service

Next is the SSL Service

Next is the Json Record Reader

Next is the Avro Registry

Next is the Json Record Writer

Now you have finished configuring the services. Ensure your final Kafka Consumer configuration looks like this and you are ready.

Next we need to enable all the controller services

We need to start the processor to start receiving data

Now the record i gave you earlier you can now put to the queue. As you can see the data starts flowing in.

You can now view the queue to see the data.

We are done now and you can start using the consumer.

Kafka & Java: Secured Consumer Read Record

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka Kerberos yet please do so.

Import SSL Cert to Java:

Follow this tutorial to “Installing unlimited strength encryption Java libraries

If on Windows do the following

  1. #Import it
  2. "C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -import -file hadoop.csr -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts" -alias "hadoop"
  3.  
  4. #Check it
  5. "C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -list -v -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"
  6.  
  7. #If you want to delete it
  8. "C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -delete -alias hadoop -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"

POM.xml

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>1.1.0</version>
  5. </dependency>

Imports

  1. import org.apache.kafka.clients.consumer.*;
  2. import java.util.Properties;
  3. import java.io.InputStream;
  4. import java.util.Arrays;

Consumer JAAS Conf (client_jaas.conf)

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useTicketCache=false
  4. refreshKrb5Config=true
  5. debug=true
  6. useKeyTab=true
  7. storeKey=true
  8. keyTab="c:\\data\\kafka.service.keytab"
  9. principal="kafka/hadoop@REALM.CA";
  10. };

Consumer Props File

You can go here to view all the options for consumer properties.

  1. bootstrap.servers=hadoop:9094
  2. group.id=test
  3.  
  4. security.protocol=SASL_SSL
  5. sasl.kerberos.service.name=kafka
  6.  
  7. #offset will be periodically committed in the background
  8. enable.auto.commit=true
  9.  
  10. # The serializer for the key
  11. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  12.  
  13. # The serializer for the value
  14. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  15.  
  16. # heartbeat to detect worker failures
  17. session.timeout.ms=10000
  18.  
  19. #Automatically reset offset to earliest offset
  20. auto.offset.reset=earliest

Initiate Kerberos Authentication

  1. System.setProperty("java.security.auth.login.config", "C:\\data\\kafkaconnect\\kafka\\src\\main\\resources\\client_jaas.conf");
  2. System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2");
  3. System.setProperty("java.security.krb5.conf", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\krb5.conf");
  4. System.setProperty("java.security.krb5.realm", "REALM.CA");
  5. System.setProperty("java.security.krb5.kdc", "REALM.CA");
  6. System.setProperty("sun.security.krb5.debug", "false");
  7. System.setProperty("javax.net.debug", "false");
  8. System.setProperty("javax.net.ssl.keyStorePassword", "changeit");
  9. System.setProperty("javax.net.ssl.keyStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
  10. System.setProperty("javax.net.ssl.trustStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
  11. System.setProperty("javax.net.ssl.trustStorePassword", "changeit");
  12. System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");

Consumer Connection/Send

The record we will read will just be a string for both key and value.

  1. Consumer<String, String> consumer = null;
  2.  
  3. try {
  4. ClassLoader classLoader = getClass().getClassLoader();
  5.  
  6. try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
  7. Properties properties = new Properties();
  8. properties.load(props);
  9. consumer = new KafkaConsumer<>(properties);
  10. }
  11. System.out.println("Consumer Created");
  12.  
  13. // Subscribe to the topic.
  14. consumer.subscribe(Arrays.asList("testTopic"));
  15.  
  16. while (true) {
  17. final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
  18. if (consumerRecords.count() == 0) {
  19. //Keep reading till no records
  20. break;
  21. }
  22.  
  23. consumerRecords.forEach(record -> {
  24. System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
  25. });
  26.  
  27. //Commit offsets returned on the last poll() for all the subscribed list of topics and partition
  28. consumer.commitAsync();
  29. }
  30. } finally {
  31. consumer.close();
  32. }
  33. System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.

Kafka: Kerberize/SSL

In this tutorial I will show you how to use Kerberos/SSL with NiFi. I will use self signed certs for this example. Before you begin ensure you have installed Kerberos Server and Kafka.

If you don’t want to use the built in Zookeeper you can setup your own. To do that following this tutorial.

This assumes your hostname is “hadoop”

Create Kerberos Principals

  1. cd /etc/security/keytabs/
  2.  
  3. sudo kadmin.local
  4.  
  5. #You can list princepals
  6. listprincs
  7.  
  8. #Create the following principals
  9. addprinc -randkey kafka/hadoop@REALM.CA
  10. addprinc -randkey zookeeper/hadoop@REALM.CA
  11.  
  12. #Create the keytab files.
  13. #You will need these for Hadoop to be able to login
  14. xst -k kafka.service.keytab kafka/hadoop@REALM.CA
  15. xst -k zookeeper.service.keytab zookeeper/hadoop@REALM.CA

Set Keytab Permissions/Ownership

  1. sudo chown root:hadoopuser /etc/security/keytabs/*
  2. sudo chmod 750 /etc/security/keytabs/*

Hosts Update

  1. sudo nano /etc/hosts
  2.  
  3. #Remove 127.0.1.1 line
  4.  
  5. #Change 127.0.0.1 to the following
  6. 127.0.0.1 realm.ca hadoop localhost

Ubuntu Firewall

  1. sudo ufw disable

SSL

Setup SSL Directories if you have not previously done so.

  1. sudo mkdir -p /etc/security/serverKeys
  2. sudo chown -R root:hadoopuser /etc/security/serverKeys/
  3. sudo chmod 755 /etc/security/serverKeys/
  4.  
  5. cd /etc/security/serverKeys

Setup Keystore

  1. sudo keytool -genkey -alias NAMENODE -keyalg RSA -keysize 1024 -dname "CN=NAMENODE,OU=ORGANIZATION_UNIT,C=canada" -keypass PASSWORD -keystore /etc/security/serverKeys/keystore.jks -storepass PASSWORD
  2. sudo keytool -export -alias NAMENODE -keystore /etc/security/serverKeys/keystore.jks -rfc -file /etc/security/serverKeys/NAMENODE.csr -storepass PASSWORD

Setup Truststore

  1. sudo keytool -import -noprompt -alias NAMENODE -file /etc/security/serverKeys/NAMENODE.csr -keystore /etc/security/serverKeys/truststore.jks -storepass PASSWORD

Generate Self Signed Certifcate

  1. sudo openssl genrsa -out /etc/security/serverKeys/NAMENODE.key 2048
  2.  
  3. sudo openssl req -x509 -new -key /etc/security/serverKeys/NAMENODE.key -days 300 -out /etc/security/serverKeys/NAMENODE.pem
  4.  
  5. sudo keytool -keystore /etc/security/serverKeys/keystore.jks -alias NAMENODE -certreq -file /etc/security/serverKeys/NAMENODE.cert -storepass PASSWORD -keypass PASSWORD
  6.  
  7. sudo openssl x509 -req -CA /etc/security/serverKeys/NAMENODE.pem -CAkey /etc/security/serverKeys/NAMENODE.key -in /etc/security/serverKeys/NAMENODE.cert -out /etc/security/serverKeys/NAMENODE.signed -days 300 -CAcreateserial

Setup File Permissions

  1. sudo chmod 440 /etc/security/serverKeys/*
  2. sudo chown root:hadoopuser /etc/security/serverKeys/*

Edit server.properties Config

  1. cd /usr/local/kafka/config
  2.  
  3. sudo nano server.properties
  4.  
  5. #Edit or Add the following properties.
  6. ssl.endpoint.identification.algorithm=HTTPS
  7. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
  8. ssl.key.password=PASSWORD
  9. ssl.keystore.location=/etc/security/serverKeys/keystore.jks
  10. ssl.keystore.password=PASSWORD
  11. ssl.truststore.location=/etc/security/serverKeys/truststore.jks
  12. ssl.truststore.password=PASSWORD
  13. listeners=SASL_SSL://:9094
  14. security.inter.broker.protocol=SASL_SSL
  15. ssl.client.auth=required
  16. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  17. ssl.keystore.type=JKS
  18. ssl.truststore.type=JKS
  19. sasl.kerberos.service.name=kafka
  20. zookeeper.connect=hadoop:2181
  21. sasl.mechanism.inter.broker.protocol=GSSAPI
  22. sasl.enabled.mechanisms=GSSAPI

Edit zookeeper.properties Config

  1. sudo nano zookeeper.properties
  2.  
  3. #Edit or Add the following properties.
  4.  
  5. server.1=hadoop:2888:3888
  6. clientPort=2181
  7. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  8. requireClientAuthScheme=SASL
  9. jaasLoginRenew=3600000

Edit producer.properties Config

  1. sudo nano producer.properties
  2.  
  3. bootstrap.servers=hadoop:9094
  4. security.protocol=SASL_SSL
  5. sasl.kerberos.service.name=kafka
  6. ssl.truststore.location=/etc/security/serverKeys/truststore.jks
  7. ssl.truststore.password=PASSWORD
  8. ssl.keystore.location=/etc/security/serverKeys/keystore.jks
  9. ssl.keystore.password=PASSWORD
  10. ssl.key.password=PASSWORD
  11. sasl.mechanism=GSSAPI

Edit consumer.properties Config

  1. sudo nano consumer.properties
  2.  
  3. zookeeper.connect=hadoop:2181
  4. bootstrap.servers=hadoop:9094
  5. group.id=securing-kafka-group
  6. security.protocol=SASL_SSL
  7. sasl.kerberos.service.name=kafka
  8. ssl.truststore.location=/etc/security/serverKeys/truststore.jks
  9. ssl.truststore.password=PASSWORD
  10. sasl.mechanism=GSSAPI

Add zookeeper_jass.conf Config

  1. sudo nano zookeeper_jass.conf
  2.  
  3. Server {
  4. com.sun.security.auth.module.Krb5LoginModule required
  5. debug=true
  6. useKeyTab=true
  7. keyTab="/etc/security/keytabs/zookeeper.service.keytab"
  8. storeKey=true
  9. useTicketCache=true
  10. refreshKrb5Config=true
  11. principal="zookeeper/hadoop@REALM.CA";
  12. };

Add kafkaserver_jass.conf Config

  1. sudo nano kafkaserver_jass.conf
  2.  
  3. KafkaServer {
  4. com.sun.security.auth.module.Krb5LoginModule required
  5. debug=true
  6. useKeyTab=true
  7. storeKey=true
  8. refreshKrb5Config=true
  9. keyTab="/etc/security/keytabs/kafka.service.keytab"
  10. principal="kafka/hadoop@REALM.CA";
  11. };
  12.  
  13. kafkaClient {
  14. com.sun.security.auth.module.Krb5LoginModule required
  15. useTicketCache=true
  16. refreshKrb5Config=true
  17. debug=true
  18. useKeyTab=true
  19. storeKey=true
  20. keyTab="/etc/security/keytabs/kafka.service.keytab"
  21. principal="kafka/hadoop@REALM.CA";
  22. };

Edit kafka-server-start.sh

  1. cd /usr/local/kafka/bin/
  2.  
  3. sudo nano kafka-server-start.sh
  4.  
  5. jaas="$base_dir/../config/kafkaserver_jaas.conf"
  6.  
  7. export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=$jaas"

Edit zookeeper-server-start.sh

  1. sudo nano zookeeper-server-start.sh
  2.  
  3. jaas="$base_dir/../config/zookeeper_jaas.conf"
  4.  
  5. export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=$jaas"

Kafka-ACL

  1. cd /usr/local/kafka/bin/
  2.  
  3. #Grant topic access and cluster access
  4. ./kafka-acls.sh --operation All --allow-principal User:kafka --authorizer-properties zookeeper.connect=hadoop:2181 --add --cluster
  5. ./kafka-acls.sh --operation All --allow-principal User:kafka --authorizer-properties zookeeper.connect=hadoop:2181 --add --topic TOPIC
  6.  
  7. #Grant all groups for a specific topic
  8. ./kafka-acls.sh --operation All --allow-principal User:kafka --authorizer-properties zookeeper.connect=hadoop:2181 --add --topic TOPIC --group *
  9.  
  10. #If you want to remove cluster access
  11. ./kafka-acls.sh --authorizer-properties zookeeper.connect=hadoop:2181 --remove --cluster
  12.  
  13. #If you want to remove topic access
  14. ./kafka-acls.sh --authorizer-properties zookeeper.connect=hadoop:2181 --remove --topic TOPIC
  15.  
  16. #List access for cluster
  17. ./kafka-acls.sh --list --authorizer-properties zookeeper.connect=hadoop:2181 --cluster
  18.  
  19. #List access for topic
  20. ./kafka-acls.sh --list --authorizer-properties zookeeper.connect=hadoop:2181 --topic TOPIC

kafka-console-producer.sh

If you want to test using the console producer you need to make these changes.

  1. cd /usr/local/kafka/bin/
  2. nano kafka-console-producer.sh
  3.  
  4. #Add the below before the last line
  5.  
  6. base_dir=$(dirname $0)
  7. jaas="$base_dir/../config/kafkaserver_jaas.conf"
  8. export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=$jaas"
  9.  
  10.  
  11. #Now you can run the console producer
  12. ./kafka-console-producer.sh --broker-list hadoop:9094 --topic TOPIC -producer.config ../config/producer.properties

kafka-console-consumer.sh

If you want to test using the console consumer you need to make these changes.

  1. cd /usr/local/kafka/bin/
  2. nano kafka-console-consumer.sh
  3.  
  4. #Add the below before the last line
  5.  
  6. base_dir=$(dirname $0)
  7. jaas="$base_dir/../config/kafkaserver_jaas.conf"
  8. export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=$jaas"
  9.  
  10.  
  11. #Now you can run the console consumer
  12. ./kafka-console-consumer.sh --bootstrap-server hadoop:9094 --topic TOPIC --consumer.config ../config/consumer.properties --from-beginning

References

https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/
https://github.com/confluentinc/securing-kafka-blog/blob/master/manifests/default.pp

Kafka & Java: Consumer Seek To Beginning

This is a quick tutorial on how to seek to beginning using a Kafka consumer. If you haven’t setup the consumer yet follow this tutorial.

This is all that is required once you have setup the consumer. This will put the kafka offset for the topic of your choice to the beginning so once you start reading you will get all records.

  1. consumer.seekToBeginning(consumer.assignment());

Kafka & Java: Consumer List Topics

In this tutorial I will show you how to list all topics in Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. Also you should go through this tutorial to setup the consumer.

Imports

  1. import java.util.Map;
  2. import java.util.List;
  3. import org.apache.kafka.common.PartitionInfo;

Consumer List Topics

  1. Map<String, List> listTopics = consumer.listTopics();
  2. System.out.println("list of topic size :" + listTopics.size());
  3.  
  4. for (String topic : listTopics.keySet()) {
  5. System.out.println("topic name :" + topic);
  6. }

 

 

 

Kafka & Java: Unsecure Consumer Read Record

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka yet please do so.

POM.xml

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>1.1.0</version>
  5. </dependency>

Imports

  1. import org.apache.kafka.clients.consumer.*;
  2. import java.util.Properties;
  3. import java.io.InputStream;
  4. import java.util.Arrays;

Consumer Props File

You can go here to view all the options for consumer properties.

  1. # The url to kafka
  2. bootstrap.servers=localhost:9092
  3.  
  4. #identify consumer group
  5. group.id=test
  6.  
  7. #offset will be periodically committed in the background
  8. enable.auto.commit=true
  9.  
  10. # The serializer for the key
  11. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  12.  
  13. # The serializer for the value
  14. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  15.  
  16. # heartbeat to detect worker failures
  17. session.timeout.ms=10000
  18.  
  19. #Automatically reset offset to earliest offset
  20. auto.offset.reset=earliest

Consumer Connection/Send

The record we will read will just be a string for both key and value.

  1. Consumer<String, String> consumer = null;
  2.  
  3. try {
  4. ClassLoader classLoader = getClass().getClassLoader();
  5.  
  6. try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
  7. Properties properties = new Properties();
  8. properties.load(props);
  9. consumer = new KafkaConsumer<>(properties);
  10. }
  11. System.out.println("Consumer Created");
  12.  
  13. // Subscribe to the topic.
  14. consumer.subscribe(Arrays.asList("testTopic"));
  15.  
  16. while (true) {
  17. final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
  18. if (consumerRecords.count() == 0) {
  19. //Keep reading till no records
  20. break;
  21. }
  22.  
  23. consumerRecords.forEach(record -> {
  24. System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
  25. });
  26.  
  27. //Commit offsets returned on the last poll() for all the subscribed list of topics and partition
  28. consumer.commitAsync();
  29. }
  30. } finally {
  31. consumer.close();
  32. }
  33. System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.

Kafka: Installation (Basic)

To install Kafka is really straight forward. There is a quick start guide you can follow. The only thing I found was that it didn’t call out Java 8. I will be using Ubuntu 16.04 for this installation.

Install Java 8

  1. sudo apt-get install openjdk-8-jdk

Install Kafka

  1. wget http://apache.forsale.plus/kafka/1.1.0/kafka_2.11-1.1.0.tgz
  2. tar -xzf kafka_2.11-1.1.0.tgz
  3. sudo mv kafka_2.11-1.1.0/ /usr/local/kafka
  4. cd /usr/local/kafka/

Setup .bashrc:

  1. sudo nano ~/.bashrc

Add the following to the end of the file.

#KAFKA VARIABLES START
export KAFKA_HOME=/usr/local/kafka
export KAFKA_CONF_DIR=/usr/local/kafka/conf
export PATH=$PATH:$KAFKA_HOME/bin
#KAFKA VARIABLES STOP

  1. source ~/.bashrc

ZooKeeper

Zookeeper comes pre-installed with kafka but you can run your own. For the purposes of this we just use the built in zookeeper.

  1. bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka Server

Now we can run the kafka server and start receiving messages on topics.

  1. bin/kafka-server-start.sh config/server.properties

List Topics

  1. /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper hadoop:2181

Create Topic

  1. /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic test

Auto Start

So if you want Kafka to run at startup then do the following.

  1. touch kafka_start.sh
  2. sudo chmod +x kafka_start.sh
  3. touch kafka_stop.sh
  4. sudo chmod +x kafka_stop.sh
  5. crontab -e

Add the following and save.

  1. @reboot /home/kafka/kafka_start.sh

kafka_start.sh

  1. #!/bin/bash
  2.  
  3. /usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties
  4. sleep 2
  5. /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

kafka_stop.sh

  1. #!/bin/bash
  2.  
  3. /usr/local/kafka/bin/zookeeper-server-stop.sh
  4. sleep 2
  5. /usr/local/kafka/bin/kafka-server-stop.sh