NiFi: Kerberized Kafka Consumer Processor

In this tutorial I will guide you through how to add a Kafka consumer to NiFi which is Kerberized.

For this tutorial you will need an AVRO schema called “person” and it’s contents are as follows.

{
     "type": "record",
     "namespace": "com.example",
     "name": "FullName",
     "fields": [
       { "name": "first_name", "type": "string" },
       { "name": "last_name", "type": "string" }
     ]
}

When ready you can publish this record to Kafka using the Kafka Producer.

{ "first_name": "John", "last_name": "Smith" }

First we need to drag the processor onto the grid.

Next we need select the Kafka Consumer.

Next we configure the processor

 

 

 

 

 

 

 

We will need to create 5 controller services.
First is the Kerberos Service

Next is the SSL Service

Next is the Json Record Reader

Next is the Avro Registry

Next is the Json Record Writer

Now you have finished configuring the services. Ensure your final Kafka Consumer configuration looks like this and you are ready.

Next we need to enable all the controller services

We need to start the processor to start receiving data

Now the record i gave you earlier you can now put to the queue. As you can see the data starts flowing in.

You can now view the queue to see the data.

We are done now and you can start using the consumer.

Kafka: Kerberize/SSL

In this tutorial I will show you how to use Kerberos/SSL with NiFi. I will use self signed certs for this example. Before you begin ensure you have installed Kerberos Server and Kafka.

If you don’t want to use the built in Zookeeper you can setup your own. To do that following this tutorial.

This assumes your hostname is “hadoop”

Create Kerberos Principals

cd /etc/security/keytabs/

sudo kadmin.local

#You can list princepals
listprincs

#Create the following principals
addprinc -randkey kafka/hadoop@REALM.CA
addprinc -randkey zookeeper/hadoop@REALM.CA

#Create the keytab files.
#You will need these for Hadoop to be able to login
xst -k kafka.service.keytab kafka/hadoop@REALM.CA
xst -k zookeeper.service.keytab zookeeper/hadoop@REALM.CA

Set Keytab Permissions/Ownership

sudo chown root:hadoopuser /etc/security/keytabs/*
sudo chmod 750 /etc/security/keytabs/*

Hosts Update

sudo nano /etc/hosts

#Remove 127.0.1.1 line

#Change 127.0.0.1 to the following
127.0.0.1 realm.ca hadoop localhost

Ubuntu Firewall

sudo ufw disable

SSL

Setup SSL Directories if you have not previously done so.

sudo mkdir -p /etc/security/serverKeys
sudo chown -R root:hadoopuser /etc/security/serverKeys/
sudo chmod 755 /etc/security/serverKeys/

cd /etc/security/serverKeys

Setup Keystore

sudo keytool -genkey -alias NAMENODE -keyalg RSA -keysize 1024 -dname "CN=NAMENODE,OU=ORGANIZATION_UNIT,C=canada" -keypass PASSWORD -keystore /etc/security/serverKeys/keystore.jks -storepass PASSWORD
sudo keytool -export -alias NAMENODE -keystore /etc/security/serverKeys/keystore.jks -rfc -file /etc/security/serverKeys/NAMENODE.csr -storepass PASSWORD

Setup Truststore

sudo keytool -import -noprompt -alias NAMENODE -file /etc/security/serverKeys/NAMENODE.csr -keystore /etc/security/serverKeys/truststore.jks -storepass PASSWORD

Generate Self Signed Certifcate

sudo openssl genrsa -out /etc/security/serverKeys/NAMENODE.key 2048

sudo openssl req -x509 -new -key /etc/security/serverKeys/NAMENODE.key -days 300 -out /etc/security/serverKeys/NAMENODE.pem

sudo keytool -keystore /etc/security/serverKeys/keystore.jks -alias NAMENODE -certreq -file /etc/security/serverKeys/NAMENODE.cert -storepass PASSWORD -keypass PASSWORD

sudo openssl x509 -req -CA /etc/security/serverKeys/NAMENODE.pem -CAkey /etc/security/serverKeys/NAMENODE.key -in /etc/security/serverKeys/NAMENODE.cert -out /etc/security/serverKeys/NAMENODE.signed -days 300 -CAcreateserial

Setup File Permissions

sudo chmod 440 /etc/security/serverKeys/*
sudo chown root:hadoopuser /etc/security/serverKeys/*

Edit server.properties Config

cd /usr/local/kafka/config

sudo nano server.properties

#Edit or Add the following properties.
ssl.endpoint.identification.algorithm=HTTPS
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.key.password=PASSWORD
ssl.keystore.location=/etc/security/serverKeys/keystore.jks
ssl.keystore.password=PASSWORD
ssl.truststore.location=/etc/security/serverKeys/truststore.jks
ssl.truststore.password=PASSWORD
listeners=SASL_SSL://:9094
security.inter.broker.protocol=SASL_SSL
ssl.client.auth=required
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
ssl.keystore.type=JKS
ssl.truststore.type=JKS
sasl.kerberos.service.name=kafka
zookeeper.connect=hadoop:2181
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI

Edit zookeeper.properties Config

sudo nano zookeeper.properties

#Edit or Add the following properties.

server.1=hadoop:2888:3888
clientPort=2181
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=SASL
jaasLoginRenew=3600000

Edit producer.properties Config

sudo nano producer.properties

bootstrap.servers=hadoop:9094
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=/etc/security/serverKeys/truststore.jks
ssl.truststore.password=PASSWORD
ssl.keystore.location=/etc/security/serverKeys/keystore.jks
ssl.keystore.password=PASSWORD
ssl.key.password=PASSWORD
sasl.mechanism=GSSAPI

Edit consumer.properties Config

sudo nano consumer.properties

zookeeper.connect=hadoop:2181
bootstrap.servers=hadoop:9094
group.id=securing-kafka-group
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=/etc/security/serverKeys/truststore.jks
ssl.truststore.password=PASSWORD
sasl.mechanism=GSSAPI

Add zookeeper_jass.conf Config

sudo nano zookeeper_jass.conf

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  debug=true
  useKeyTab=true
  keyTab="/etc/security/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=true
  refreshKrb5Config=true
  principal="zookeeper/hadoop@REALM.CA";
};

Add kafkaserver_jass.conf Config

sudo nano kafkaserver_jass.conf

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    debug=true
    useKeyTab=true
    storeKey=true
    refreshKrb5Config=true
    keyTab="/etc/security/keytabs/kafka.service.keytab"
    principal="kafka/hadoop@REALM.CA";
};

kafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useTicketCache=true
    refreshKrb5Config=true
    debug=true
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka.service.keytab"
    principal="kafka/hadoop@REALM.CA";
};

Edit kafka-server-start.sh

cd /usr/local/kafka/bin/

sudo nano kafka-server-start.sh

jaas="$base_dir/../config/kafkaserver_jaas.conf"

export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=$jaas"

Edit zookeeper-server-start.sh

sudo nano zookeeper-server-start.sh

jaas="$base_dir/../config/zookeeper_jaas.conf"

export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=$jaas"

Kafka-ACL

cd /usr/local/kafka/bin/

#Grant topic access and cluster access
./kafka-acls.sh  --operation All --allow-principal User:kafka --authorizer-properties zookeeper.connect=hadoop:2181 --add --cluster
./kafka-acls.sh  --operation All --allow-principal User:kafka --authorizer-properties zookeeper.connect=hadoop:2181 --add --topic TOPIC

#Grant all groups for a specific topic
./kafka-acls.sh --operation All --allow-principal User:kafka --authorizer-properties zookeeper.connect=hadoop:2181 --add --topic TOPIC --group *

#If you want to remove cluster access
./kafka-acls.sh --authorizer-properties zookeeper.connect=hadoop:2181 --remove --cluster

#If you want to remove topic access
./kafka-acls.sh --authorizer-properties zookeeper.connect=hadoop:2181 --remove --topic TOPIC

#List access for cluster
./kafka-acls.sh --list --authorizer-properties zookeeper.connect=hadoop:2181 --cluster

#List access for topic
./kafka-acls.sh --list --authorizer-properties zookeeper.connect=hadoop:2181 --topic TOPIC

kafka-console-producer.sh

If you want to test using the console producer you need to make these changes.

cd /usr/local/kafka/bin/
nano kafka-console-producer.sh

#Add the below before the last line

base_dir=$(dirname $0)
jaas="$base_dir/../config/kafkaserver_jaas.conf"
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=$jaas"


#Now you can run the console producer
./kafka-console-producer.sh --broker-list hadoop:9094 --topic TOPIC -producer.config ../config/producer.properties

kafka-console-consumer.sh

If you want to test using the console consumer you need to make these changes.

cd /usr/local/kafka/bin/
nano kafka-console-consumer.sh

#Add the below before the last line

base_dir=$(dirname $0)
jaas="$base_dir/../config/kafkaserver_jaas.conf"
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=$jaas"


#Now you can run the console consumer
./kafka-console-consumer.sh --bootstrap-server hadoop:9094 --topic TOPIC --consumer.config ../config/consumer.properties --from-beginning

References

https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/
https://github.com/confluentinc/securing-kafka-blog/blob/master/manifests/default.pp

Kafka & Java: Consumer Seek To Beginning

This is a quick tutorial on how to seek to beginning using a Kafka consumer. If you haven’t setup the consumer yet follow this tutorial.

This is all that is required once you have setup the consumer. This will put the kafka offset for the topic of your choice to the beginning so once you start reading you will get all records.

consumer.seekToBeginning(consumer.assignment());

Kafka & Java: Consumer List Topics

In this tutorial I will show you how to list all topics in Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. Also you should go through this tutorial to setup the consumer.

Imports

import java.util.Map;
import java.util.List;
import org.apache.kafka.common.PartitionInfo;

Consumer List Topics

Map<String, List> listTopics = consumer.listTopics();
System.out.println("list of topic size :" + listTopics.size());

for (String topic : listTopics.keySet()) {
	System.out.println("topic name :" + topic);
}

 

 

 

Kafka & Java: Unsecure Consumer Read Record

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka yet please do so.

POM.xml

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>1.1.0</version>
</dependency>

Imports

import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.io.InputStream;
import java.util.Arrays;

Consumer Props File

You can go here to view all the options for consumer properties.

# The url to kafka
bootstrap.servers=localhost:9092

#identify consumer group
group.id=test

#offset will be periodically committed in the background
enable.auto.commit=true

# The serializer for the key
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# The serializer for the value
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# heartbeat to detect worker failures
session.timeout.ms=10000

#Automatically reset offset to earliest offset
auto.offset.reset=earliest

Consumer Connection/Send

The record we will read will just be a string for both key and value.

Consumer<String, String> consumer = null;

try {
	ClassLoader classLoader = getClass().getClassLoader();

	try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
		Properties properties = new Properties();
		properties.load(props);
		consumer = new KafkaConsumer<>(properties);
	}
	
	System.out.println("Consumer Created");

	// Subscribe to the topic.
	consumer.subscribe(Arrays.asList("testTopic"));

	while (true) {
		final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
		
		if (consumerRecords.count() == 0) {
			//Keep reading till no records
			break;
		}

		consumerRecords.forEach(record -> {
			System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
		});

		//Commit offsets returned on the last poll() for all the subscribed list of topics and partition
		consumer.commitAsync();
	}
} finally {
	consumer.close();
}
System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.

Kafka: Installation (Basic)

To install Kafka is really straight forward. There is a quick start guide you can follow. The only thing I found was that it didn’t call out Java 8. I will be using Ubuntu 16.04 for this installation.

Install Java 8

sudo apt-get install openjdk-8-jdk

Install Kafka

wget http://apache.forsale.plus/kafka/1.1.0/kafka_2.11-1.1.0.tgz 
tar -xzf kafka_2.11-1.1.0.tgz
sudo mv kafka_2.11-1.1.0/ /usr/local/kafka
cd /usr/local/kafka/

Setup .bashrc:

 sudo nano ~/.bashrc

Add the following to the end of the file.

#KAFKA VARIABLES START
export KAFKA_HOME=/usr/local/kafka
export KAFKA_CONF_DIR=/usr/local/kafka/conf
export PATH=$PATH:$KAFKA_HOME/bin
#KAFKA VARIABLES STOP

 source ~/.bashrc

ZooKeeper

Zookeeper comes pre-installed with kafka but you can run your own. For the purposes of this we just use the built in zookeeper.

bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka Server

Now we can run the kafka server and start receiving messages on topics.

bin/kafka-server-start.sh config/server.properties

List Topics

/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper hadoop:2181

Create Topic

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic test

Auto Start

So if you want Kafka to run at startup then do the following.

touch kafka_start.sh
sudo chmod +x kafka_start.sh
touch kafka_stop.sh
sudo chmod +x kafka_stop.sh
crontab -e

Add the following and save.

@reboot /home/kafka/kafka_start.sh

kafka_start.sh

#!/bin/bash

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties
sleep 2
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

kafka_stop.sh

#!/bin/bash

/usr/local/kafka/bin/zookeeper-server-stop.sh
sleep 2
/usr/local/kafka/bin/kafka-server-stop.sh