ElasticSearch: Low Level Rest Client Connection

This entry is part 1 of 3 in the series ElasticSearch Low Level Rest Client

In this tutorial I will show you how to use the ElasticSearch low level rest client.

First you will need to add the low level rest to the pom.

  1. <properties>
  2. <elasticSearch.version>6.2.4</elasticSearch.version>
  3. </properties>
  4.  
  5. <dependency>
  6. <groupId>org.elasticsearch.client</groupId>
  7. <artifactId>elasticsearch-rest-client</artifactId>
  8. <version>${elasticSearch.version}</version>
  9. </dependency>

Next you will need to specify the imports.

  1. import org.apache.http.HttpHost;
  2. import org.elasticsearch.client.Response;
  3. import org.elasticsearch.client.RestClient;
  4. import org.elasticsearch.client.RestClientBuilder;

Now you can connect to ElasticSearch.

  1. final RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
  2. final RestClient restClient = builder.build();

Now you can do whatever you need to!

NiFi: Rest API

NiFi has a bunch of Rest API’s that you can use. They are located here.

They are very comprehensive. The only thing that I would say is missing is getting the root process group of NiFi. It is not documented what the api call would be. All api calls must be authenticated as well.

The api call to get the root process group called “NiFi Flow” which is the main process group is.

  1. https://lcoalhost/nifi-api/process-groups/root

ElasticSearch Installation

To install ElasticSearch is really straight forward. I will be using Ubuntu 16.04 for this installation.

Java 8

  1. java -version
  2. #if not installed run the following
  3. sudo apt-get install openjdk-8-jdk

Download

  1. wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.3.rpm

Directories

It is recommended to change the log and data directory from default implementations.

  1. #create log and data directory
  2. sudo mkdir /my/dir/log/elasticsearch
  3. sudo mkdir /my/dir/elasticsearch
  4.  
  5. # Change owner
  6. sudo chown -R elasticsearch /my/dir/log/elasticsearch
  7. sudo chown -R elasticsearch /my/dir/elasticsearch

Install

  1. sudo rpm -ivh elasticsearch-6.2.3.rpm

Change Settings

  1. sudo vi /etc/elasticsearch/elasticsearch.yml
  2.  
  3. #Change the following settings
  4. #----------SETTINGS-----------------
  5. cluster.name: logsearch
  6. node.name: ##THE_HOST_NAME##
  7. node.master: true #The node is master eligable
  8. node.data: true #Hold data and perform data related operations
  9. path.data: /my/dir/elasticsearch
  10. path.logs: /my/dir/log/elasticsearch
  11. network.host: ##THE_HOST_NAME##
  12. http.port: 9200
  13. discovery.zen.ping.unicast.hosts: ["##THE_HOST_NAME##"]
  14. #----------SETTINGS-----------------

Start/Stop/Status ElasticSearch

  1. sudo service elasticsearch start
  2. sudo service elasticsearch stop
  3. sudo service elasticsearch status

Rest API

http://localhost:9200

 

Avro & Java: Record Parsing

This tutorial will guide you through how to convert json to avro and then back to json. I suggest you first read through the documentation on Avro to familiarize yourself with it. This tutorial assumes you have a maven project already setup and a resources folder.

POM:

Add Avro Dependency

 

 

 

 

Add Jackson Dependency

Avro Schema File:

Next you need to create the avro schema file in your resources folder. Name the file “schema.avsc”. The extension avsc is the Avro schema extension.

  1. {
  2. "namespace": "test.avro",
  3. "type": "record",
  4. "name": "MY_NAME",
  5. "fields": [
  6. {"name": "name_1", "type": "int"},
  7. {"name": "name_2", "type": {"type": "array", "items": "float"}},
  8. {"name": "name_3", "type": "float"}
  9. ]
  10. }

Json Record to Validate:

Next you need to create a json file that conforms to your schema you just made. Name the file “record.json” and put it in your resources folder. The contents can be whatever you want as long as it conforms to your schema above.

  1. { "name_1": 234, "name_2": [23.34,654.98], "name_3": 234.7}

It’s Avro Time:

Imports:

  1. import java.io.ByteArrayOutputStream;
  2. import java.io.DataInputStream;
  3. import java.io.File;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6.  
  7. import org.apache.avro.Schema;
  8. import org.apache.avro.generic.GenericData;
  9. import org.apache.avro.generic.GenericDatumReader;
  10. import org.apache.avro.generic.GenericDatumWriter;
  11. import org.apache.avro.io.DatumReader;
  12. import org.apache.avro.io.Decoder;
  13. import org.apache.avro.io.DecoderFactory;
  14. import org.apache.avro.io.Encoder;
  15. import org.apache.avro.io.EncoderFactory;
  16.  
  17. import com.fasterxml.jackson.databind.JsonNode;
  18. import com.fasterxml.jackson.databind.ObjectMapper;

Conversion to Avro and Back:

  1. private void run() throws IOException {
  2. //Get the schema and json record from resources
  3. final ClassLoader loader = getClass().getClassLoader();
  4. final File schemaFile = new File(loader.getResource("schema.avsc").getFile());
  5. final InputStream record = loader.getResourceAsStream("record.json");
  6. //Create avro schema
  7. final Schema schema = new Schema.Parser().parse(schemaFile);
  8.  
  9. //Encode to avro
  10. final byte[] avro = encodeToAvro(schema, record);
  11.  
  12. //Decode back to json
  13. final JsonNode node = decodeToJson(schema, avro);
  14.  
  15. System.out.println(node);
  16. System.out.println("done");
  17. }
  18.  
  19. /**
  20. * Encode json to avro
  21. *
  22. * @param schema the schema the avro pertains to
  23. * @param record the data to convert to avro
  24. * @return the avro bytes
  25. * @throws IOException if decoding fails
  26. */
  27. private byte[] encodeToAvro(Schema schema, InputStream record) throws IOException {
  28. final DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
  29. final DataInputStream din = new DataInputStream(record);
  30. final Decoder decoder = new DecoderFactory().jsonDecoder(schema, din);
  31. final Object datum = reader.read(null, decoder);
  32. final GenericDatumWriter<Object> writer = new GenericDatumWriter<>(schema);
  33. final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
  34. final Encoder encoder = new EncoderFactory().binaryEncoder(outputStream, null);
  35. writer.write(datum, encoder);
  36. encoder.flush();
  37.  
  38. return outputStream.toByteArray();
  39. }
  40.  
  41. /**
  42. * Decode avro back to json.
  43. *
  44. * @param schema the schema the avro pertains to
  45. * @param avro the avro bytes
  46. * @return the json
  47. * @throws IOException if jackson fails
  48. */
  49. private JsonNode decodeToJson(Schema schema, byte[] avro) throws IOException {
  50. final ObjectMapper mapper = new ObjectMapper();
  51. final DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
  52. final Decoder decoder = new DecoderFactory().binaryDecoder(avro, null);
  53. final JsonNode node = mapper.readTree(reader.read(null, decoder).toString());
  54.  
  55. return node;
  56. }

HBASE & Java: Scan Filters

This tutorial will guide you through how to use filtering when scanning a HBASE table using Java 8. Make sure you first follow this tutorial on connecting to HBASE and this tutorial on scanning HBase.

Row Key Filter (PrefixFilter):

  1. final PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(myRoKey));
  2. scan.addFilter(prefixFilter);

Column Value Filter:

  1. final SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(myColumnFamily, myColumnName, CompareOp.EQUAL, Bytes.toBytes(myValue));
  2. scan.addFilter(columnValueFilter);

Regex Filter:

  1. final RegexStringComparator regexStringComparator = new RegexStringComparator(".*");
  2. final SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(myColumnFamily, myColumnName, CompareOp.EQUAL, regexStringComparator);
  3. scan.addFilter(singleColumnValueFilter);

 

HBASE & Java: Delete a Table

This tutorial will guide you through how to delete a HBASE table using Java 8. Make sure you first follow this tutorial on connecting to HBASE.

Import:

  1. import org.apache.hadoop.hbase.client.Admin;

Delete:

  1. //You must first disable the table
  2. conn.getAdmin().disableTable(TableName.valueOf("myTable"));
  3.  
  4. //Now you can delete the table
  5. conn.getAdmin().deleteTable(TableName.valueOf("myTable"));

NiFi: Custom Processor

The following tutorial shows you how to create a custom nifi processor.

Create Project:

  1. Install Maven
  2. Create a folder called “nifi”
  3. navigate into “nifi” folder and run
    1. mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=1.0.0 -DnifiVersion=1.0.0
  4. Put in your “groupId” when it asks.
    1. I used “com.test”
  5. Put in your “artifactId” when it asks.
    1. I used “processor”
  6. You can accept the default “version”.
  7. Put in your “artifactBaseName” when it asks.
    1. I used “MyProcessor”
  8. Once it completes you can import the maven project into Eclipse.
  9. You will get two projects
    1. nar
    2. processor
  10. You should then have two files like below created.

MyProcessor.java:

  1. package com.test.processors;
  2.  
  3. import org.apache.nifi.components.PropertyDescriptor;
  4. import org.apache.nifi.flowfile.FlowFile;
  5. import org.apache.nifi.processor.*;
  6. import org.apache.nifi.annotation.behavior.ReadsAttribute;
  7. import org.apache.nifi.annotation.behavior.ReadsAttributes;
  8. import org.apache.nifi.annotation.behavior.WritesAttribute;
  9. import org.apache.nifi.annotation.behavior.WritesAttributes;
  10. import org.apache.nifi.annotation.lifecycle.OnScheduled;
  11. import org.apache.nifi.annotation.documentation.CapabilityDescription;
  12. import org.apache.nifi.annotation.documentation.SeeAlso;
  13. import org.apache.nifi.annotation.documentation.Tags;
  14. import org.apache.nifi.processor.exception.ProcessException;
  15. import org.apache.nifi.processor.util.StandardValidators;
  16.  
  17. import java.util.*;
  18.  
  19. @Tags({"example"})
  20. @CapabilityDescription("Provide a description")
  21. @SeeAlso({})
  22. @ReadsAttributes({@ReadsAttribute(attribute="", description="")})
  23. @WritesAttributes({@WritesAttribute(attribute="", description="")})
  24. public class MyProcessor extends AbstractProcessor {
  25.  
  26. public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
  27. .Builder().name("My Property")
  28. .description("Example Property")
  29. .required(true)
  30. .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
  31. .build();
  32.  
  33. public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
  34. .name("my_relationship")
  35. .description("Example relationship")
  36. .build();
  37.  
  38. private List descriptors;
  39.  
  40. private Set relationships;
  41.  
  42. @Override
  43. protected void init(final ProcessorInitializationContext context) {
  44. final List descriptors = new ArrayList();
  45. descriptors.add(MY_PROPERTY);
  46. this.descriptors = Collections.unmodifiableList(descriptors);
  47.  
  48. final Set relationships = new HashSet();
  49. relationships.add(MY_RELATIONSHIP);
  50. this.relationships = Collections.unmodifiableSet(relationships);
  51. }
  52.  
  53. @Override
  54. public Set getRelationships() {
  55. return this.relationships;
  56. }
  57.  
  58. @Override
  59. public final List getSupportedPropertyDescriptors() {
  60. return descriptors;
  61. }
  62.  
  63. @OnScheduled
  64. public void onScheduled(final ProcessContext context) {
  65.  
  66. }
  67.  
  68. @Override
  69. public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
  70. FlowFile flowFile = session.get();
  71. if ( flowFile == null ) {
  72. return;
  73. }
  74. // TODO implement
  75. session.transfer(flowFile, MY_RELATIONSHIP);
  76. }
  77. }

MyProcessorTest.java:

This is the unit test for nifi.

  1. package com.test.processors;
  2.  
  3. import static org.junit.Assert.*;
  4.  
  5. import java.io.ByteArrayInputStream;
  6. import java.io.InputStream;
  7. import java.util.List;
  8.  
  9. import org.apache.nifi.util.MockFlowFile;
  10. import org.apache.nifi.util.TestRunner;
  11. import org.apache.nifi.util.TestRunners;
  12. import org.junit.Before;
  13. import org.junit.Test;
  14.  
  15. public class MyProcessorTest {
  16. private TestRunner testRunner;
  17.  
  18. @Before
  19. public void init() {
  20. testRunner = TestRunners.newTestRunner(MyProcessor.class);
  21. }
  22.  
  23. @Test
  24. public void testProcessor() {
  25. final InputStream content = new ByteArrayInputStream(new byte[0]);
  26. testRunner.setProperty("My Property", "test");
  27. testRunner.enqueue(content);
  28. testRunner.run(1);
  29. testRunner.assertQueueEmpty();
  30. final List results = testRunner.getFlowFilesForRelationship(MyProcessor.MY_RELATIONSHIP);
  31. assertTrue("1 match", results.size() == 1);
  32. }
  33. }

Optional:

Nar Directory:

You can create a custom nar directory to deploy your custom nifi processors to. You can either use the nifi/lib directory or specify your own. To specify your own edit the “nifi.properties” file.

  1. cd /nifi/conf/
  2. nano nifi.properties

Look for “nifi.nar.library.directory.”.
Add the following: nifi.nar.library.directory.anyname=/your/directory/

 

HBASE Phoenix & Java: Unsecure Connection

In this tutorial I will show you how to do a basic connection to remote unsecure HBase Pheonix Query Server using Java. Phoenix allows you to run SQL commands over top HBASE. You can find the commands listed here.

POM.xml:

  1. <dependency>
  2. <groupId>org.apache.phoenix</groupId>
  3. <artifactId>phoenix-server-client</artifactId>
  4. <version>4.7.0-HBase-1.1</version>
  5. </dependency>

Imports:

  1. import java.sql.DriverManager;
  2. import java.sql.SQLException;

Connect:

  1. Class.forName("org.apache.phoenix.queryserver.client.Driver");
  2. Connection conn = DriverManager.getConnection("jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF");

Hadoop & Java: Connect Remote Unsecured HDFS

In this tutorial I will show you how to connect to remote unsecured HDFS cluster using Java. If you haven’t install hdfs yet follow the tutorial.

POM.xml:

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>2.9.1</version>
  5. </dependency>

Imports:

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.FileSystem;
  3. import java.net.URI;

Connect:

  1. //Setup the configuration object.
  2. final Configuration config = new Configuration();
  3.  
  4. //If you want you can add any properties you want here.
  5.  
  6. //Setup the hdfs file system object.
  7. final FileSystem fs = FileSystem.get(new URI("hdfs://localhost:50070"), config);
  8.  
  9. //Do whatever you need to.

HBASE & Java: Search for Data

This tutorial will give you a quick overview of how to search for data using HBASE. If you have not done so yet. Follow the following two tutorials on HBASE: Connecting and HBASE: Create a Table.

Search for Data:

Basically we have to scan the table for data. So we must first setup a scan object then search for the data.

  1. import org.apache.hadoop.hbase.client.Result;
  2. import org.apache.hadoop.hbase.client.ResultScanner;
  3. import org.apache.hadoop.hbase.client.Scan;
  4. import org.apache.hadoop.hbase.Cell;
  5. import org.apache.hadoop.hbase.client.Table;
  6. import org.apache.hadoop.hbase.TableName;
  7. import org.apache.hadoop.hbase.util.Bytes;
  8.  
  9. //Lets setup our scan object.
  10. final Scan scan = new Scan();
  11. //Search a particular column
  12. scan.addColumn(Bytes.toBytes("columnFamily"), Bytes.toBytes("columnName"));
  13. //Check the row key prefix
  14. scan.setRowPrefixFilter(Bytes.toBytes("rowkey"));
  15.  
  16. final TableName table = TableName.valueOf(yourTableName);
  17.  
  18. //Get the table you want to work with. using the connection from the tutorial above.
  19. final Table table = conn.getTable(table);
  20. //Create our scanner based on the scan object above.
  21. final ResultScanner scanner = table.getScanner(scan);
  22.  
  23. //Now we will loop through our results
  24. for (Result result = scanner.next(); result != null; result = scanner.next()) {
  25. //Lets get our row key
  26. final String rowIdentifier = Bytes.toString(result.getRow());
  27.  
  28. //Now based on each record found we will loop through the available cells for that record.
  29. for (final Cell cell : result.listCells()) {
  30. //now we can do whatever we need to with the data.
  31. log.info("column {} value {}", Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
  32. }
  33. }
  34.  

HBASE & Java: Create a Table

This tutorial will guide you through how to create a HBASE table using Java 8. Make sure you first follow this tutorial on connecting to HBASE.

Table Exists:

This checks if the table already exists in HBASE.

  1. import org.apache.hadoop.hbase.TableName;
  2.  
  3. final TableName table = TableName.valueOf(yourTableName);
  4.  
  5. //Use the connection object to getAdmin from the connection tutorial above.
  6. conn.getAdmin().tableExists(table);

Create Table:

In the most basic example of creating a HBASE table you need to know the name and the column families. A column family is columns grouped together. The data is related in some way and stored together on disk. Notice how we don’t define columns in the table design. Columns are added as we put data. Which I will give example below.

  1. import org.apache.hadoop.hbase.HColumnDescriptor;
  2. import org.apache.hadoop.hbase.HTableDescriptor;
  3. import org.apache.hadoop.hbase.TableName;
  4.  
  5. final TableName table = TableName.valueOf(yourTableName);
  6.  
  7. final HTableDescriptor hTableBuilder = new HTableDescriptor(table);
  8. final HColumnDescriptor column = new HColumnDescriptor(family);
  9. hTableBuilder.addFamily(column);
  10.  
  11. //Use the connection object to getAdmin from the connection tutorial above.
  12. conn.getAdmin().createTable(hTableBuilder);

Get a Table:

This will retrieve a table from HBASE so you can use it to put data, etc.

  1. import org.apache.hadoop.hbase.TableName;
  2. import org.apache.hadoop.hbase.client.Table;
  3.  
  4. final TableName table = TableName.valueOf(yourTableName);
  5.  
  6. //Use the connection object from the connection tutorial above.
  7. final Table table = conn.getTable(table);

Put Data:

Now we will put data into the table we have reference to above. Notice how the columns are referenced.

  1. import org.apache.hadoop.hbase.client.Put;
  2. import org.apache.hadoop.hbase.util.Bytes;
  3.  
  4. final byte[] rowKey = Bytes.toBytes("some row identifier");
  5. final byte[] columnFamily = Bytes.toBytes("myFamily");
  6. final byte[] columnName = Bytes.toBytes("columnName");
  7. final byte[] data = Bytes.toBytes(myData);
  8.  
  9. final Put put = new Put(rowKey);
  10. put.addColumn(columnFamily, columnName, data);
  11.  
  12. //Insert the data.
  13. table.put(put);
  14. //Close the table.
  15. table.close();

HBASE: Connecting Unsecure

In this tutorial I will show you how to connect to an Unsecure HBASE using Java. It’s rather straight forward. This tutorial assumes no security. There are so many different options you can set we will just take the bare minimum so you can connect.

POM:

  1. <dependency>
  2. <groupId>org.apache.hbase</groupId>
  3. <artifactId>hbase-client</artifactId>
  4. <version>1.4.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase</artifactId>
  9. <version>1.4.1</version>
  10. <type>pom</type>
  11. </dependency>

Imports:

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.hbase.HBaseConfiguration;
  3. import org.apache.hadoop.hbase.client.Admin;
  4. import org.apache.hadoop.hbase.client.Connection;
  5. import org.apache.hadoop.hbase.client.ConnectionFactory;

Config:

We will use the basic configuration here. You should secure the cluster and use appropriate settings for that.

  1. final Configuration config = HBaseConfiguration.create();
  2. config.set("hbase.zookeeper.quorum", "myurl.com"); //Can be comma seperated if you have more than 1
  3. config.set("hbase.zookeeper.property.clientPort", "2181");
  4. config.set("zookeeper.znode.parent", "/hbase-unsecure");

Connect:

Now we create the connection.

  1. Connection conn = ConnectionFactory.createConnection(config);
  2.  
  3. //Later when we are done we will want to close the connection.
  4. conn.close();

Hbase Admin:

Retrieve an Admin implementation to administer an HBase cluster. If you need it.

  1. Admin admin = conn.getAdmin();
  2. //Later when we are done we will want to close the connection.
  3. admin.close();

NiFi Installation (Basic)

In this tutorial I will guide you through installing NiFi on Ubuntu 16.04 and setting to run as a service. We will assume you have a user called “hduser”.

Install Java 8

  1. sudo apt-get install openjdk-8-jdk

Install NiFi

  1. wget http://mirror.dsrg.utoronto.ca/apache/nifi/1.8.0/nifi-1.8.0-bin.tar.gz
  2. tar -xzf nifi-1.8.0-bin.tar.gz
  3. sudo mv nifi-1.8.0/ /usr/local/nifi

Set Ownership:

  1. sudo chown -R hduser:hduser /usr/local/nifi

Setup .bashrc:

  1. sudo nano ~/.bashrc

Add the following to the end of the file.

#NIFI VARIABLES START
export NIFI_HOME=/usr/local/nifi
export NIFI_CONF_DIR=/usr/local/nifi/conf
export PATH=$PATH:$NIFI_HOME/bin
#NIFI VARIABLES STOP

  1. source ~/.bashrc

Install NiFi As Service

  1. cd /usr/local/nifi/bin
  2. sudo ./nifi.sh install
  3. reboot

Start/Stop/Status Service

  1. sudo service nifi start
  2. sudo service nifi stop
  3. sudo service nifi status

Your site is now available http://localhost:8080/nifi

Uninstall

  1. sudo rm /etc/rc2.d/S65nifi
  2. sudo rm /etc/init.d/nifi
  3. sudo rm /etc/rc2.d/K65nifi
  4.  
  5. sudo rm -R /usr/local/nifi/

Kafka & Java: Consumer List Topics

In this tutorial I will show you how to list all topics in Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. Also you should go through this tutorial to setup the consumer.

Imports

  1. import java.util.Map;
  2. import java.util.List;
  3. import org.apache.kafka.common.PartitionInfo;

Consumer List Topics

  1. Map<String, List> listTopics = consumer.listTopics();
  2. System.out.println("list of topic size :" + listTopics.size());
  3.  
  4. for (String topic : listTopics.keySet()) {
  5. System.out.println("topic name :" + topic);
  6. }

 

 

 

Kafka & Java: Unsecure Consumer Read Record

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka yet please do so.

POM.xml

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>1.1.0</version>
  5. </dependency>

Imports

  1. import org.apache.kafka.clients.consumer.*;
  2. import java.util.Properties;
  3. import java.io.InputStream;
  4. import java.util.Arrays;

Consumer Props File

You can go here to view all the options for consumer properties.

  1. # The url to kafka
  2. bootstrap.servers=localhost:9092
  3.  
  4. #identify consumer group
  5. group.id=test
  6.  
  7. #offset will be periodically committed in the background
  8. enable.auto.commit=true
  9.  
  10. # The serializer for the key
  11. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  12.  
  13. # The serializer for the value
  14. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  15.  
  16. # heartbeat to detect worker failures
  17. session.timeout.ms=10000
  18.  
  19. #Automatically reset offset to earliest offset
  20. auto.offset.reset=earliest

Consumer Connection/Send

The record we will read will just be a string for both key and value.

  1. Consumer<String, String> consumer = null;
  2.  
  3. try {
  4. ClassLoader classLoader = getClass().getClassLoader();
  5.  
  6. try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
  7. Properties properties = new Properties();
  8. properties.load(props);
  9. consumer = new KafkaConsumer<>(properties);
  10. }
  11. System.out.println("Consumer Created");
  12.  
  13. // Subscribe to the topic.
  14. consumer.subscribe(Arrays.asList("testTopic"));
  15.  
  16. while (true) {
  17. final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
  18. if (consumerRecords.count() == 0) {
  19. //Keep reading till no records
  20. break;
  21. }
  22.  
  23. consumerRecords.forEach(record -> {
  24. System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
  25. });
  26.  
  27. //Commit offsets returned on the last poll() for all the subscribed list of topics and partition
  28. consumer.commitAsync();
  29. }
  30. } finally {
  31. consumer.close();
  32. }
  33. System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.

Kafka: Installation (Basic)

To install Kafka is really straight forward. There is a quick start guide you can follow. The only thing I found was that it didn’t call out Java 8. I will be using Ubuntu 16.04 for this installation.

Install Java 8

  1. sudo apt-get install openjdk-8-jdk

Install Kafka

  1. wget http://apache.forsale.plus/kafka/1.1.0/kafka_2.11-1.1.0.tgz
  2. tar -xzf kafka_2.11-1.1.0.tgz
  3. sudo mv kafka_2.11-1.1.0/ /usr/local/kafka
  4. cd /usr/local/kafka/

Setup .bashrc:

  1. sudo nano ~/.bashrc

Add the following to the end of the file.

#KAFKA VARIABLES START
export KAFKA_HOME=/usr/local/kafka
export KAFKA_CONF_DIR=/usr/local/kafka/conf
export PATH=$PATH:$KAFKA_HOME/bin
#KAFKA VARIABLES STOP

  1. source ~/.bashrc

ZooKeeper

Zookeeper comes pre-installed with kafka but you can run your own. For the purposes of this we just use the built in zookeeper.

  1. bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka Server

Now we can run the kafka server and start receiving messages on topics.

  1. bin/kafka-server-start.sh config/server.properties

List Topics

  1. /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper hadoop:2181

Create Topic

  1. /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic test

Auto Start

So if you want Kafka to run at startup then do the following.

  1. touch kafka_start.sh
  2. sudo chmod +x kafka_start.sh
  3. touch kafka_stop.sh
  4. sudo chmod +x kafka_stop.sh
  5. crontab -e

Add the following and save.

  1. @reboot /home/kafka/kafka_start.sh

kafka_start.sh

  1. #!/bin/bash
  2.  
  3. /usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties
  4. sleep 2
  5. /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

kafka_stop.sh

  1. #!/bin/bash
  2.  
  3. /usr/local/kafka/bin/zookeeper-server-stop.sh
  4. sleep 2
  5. /usr/local/kafka/bin/kafka-server-stop.sh

Avro & Python: How to Schema, Write, Read

I have been experimenting with Apache Avro and Python. Below is what I have learned thus far.

Pip Install

At the time of this writing I am using 1.8.2.

  1. pip install avro-python3

Schema

There are so many different ways to work with the schema definition. There are primitive and complex types. You can find way more documentation on the schema definition here.

  1. import json
  2. import avro.schema
  3.  
  4. my_schema = avro.schema.Parse(json.dumps(
  5. {
  6. 'namespace': 'test.avro',
  7. 'type': 'record',
  8. 'name': 'MY_NAME',
  9. 'fields': [
  10. {'name': 'name_1', 'type': 'int'},
  11. {'name': 'name_2', 'type': {'type': 'array', 'items': 'float'}},
  12. {'name': 'name_3', 'type': 'float'},
  13. ]
  14. }))

Method 1

Write

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3. import io
  4.  
  5. #write binary
  6. file = open(filename, 'wb')
  7.  
  8. datum_writer = DatumWriter()
  9. fwriter = DataFileWriter(file, datum_writer, my_schema)
  10. fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
  11. fwriter.close()

Write Deflate

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3.  
  4. #write binary
  5. file = open(filename, 'wb')
  6.  
  7. datum_writer = DatumWriter()
  8. fwriter = DataFileWriter(file, datum_writer, my_schema, codec = 'deflate')
  9. fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
  10. fwriter.close()

Append

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3. import io
  4.  
  5. #append binary
  6. file = open(filename, 'a+b')
  7.  
  8. datum_writer = DatumWriter()
  9. #Notice that the schema is not added the the datafilewriter. This is because you are appending to an existing avro file
  10. fwriter = DataFileWriter(file, datum_writer)
  11. fwriter.append({'name_1': 645675, 'name_2': [5.6,34.9], 'name_3': 649.5645})
  12. fwriter.close()

Read Schema

  1. from avro.datafile import DataFileReader
  2. from avro.io import DatumReader
  3.  
  4. file = open(filename, 'rb')
  5. datum_reader = DatumReader()
  6. file_reader = DataFileReader(file, datum_reader)
  7.  
  8. print(file_reader .meta)

Read

  1. from avro.datafile import DataFileReader
  2. from avro.io import DatumReader
  3.  
  4. #read binary
  5. fd = open(filename, 'rb')
  6. datum_reader = DatumReader()
  7. file_reader = DataFileReader(fd, datum_reader)
  8.  
  9. for datum in file_reader:
  10. print(datum['name_1'])
  11. print(datum['name_2'])
  12. print(datum['name_3'])
  13. file_reader.close()

Method 2

Write/Append BinaryEncoder

  1. import io
  2. from avro.io import DatumWriter, BinaryEncoder
  3.  
  4. #write binary
  5. file = open(filename, 'wb')
  6. #append binary
  7. file = open(filename, 'a+b')
  8. bytes_writer = io.BytesIO()
  9. encoder = BinaryEncoder(bytes_writer)
  10. writer_binary = DatumWriter(my_schema)
  11. writer_binary.write({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645}, encoder)
  12. file.write(bytes_writer.getvalue())

Read BinaryDecoder

  1. import io
  2. from avro.io import DatumReader, BinaryDecoder
  3.  
  4. file = open(filename, 'rb')
  5. bytes_reader = io.BytesIO(file.read())
  6. decoder = BinaryDecoder(bytes_reader)
  7. reader = DatumReader(my_schema)
  8.  
  9. while True:
  10. try:
  11. rec = reader.read(decoder)
  12. print(rec['name_1'])
  13. print(rec['name_2'])
  14. print(rec['name_3'])
  15. except:
  16. break

 

 

 

HortonWorks: Kerberize Ambari Server

This entry is part 7 of 7 in the series HortonWorks

You may want to integrate Kerberos authentication into your Ambari Server implementation. If you do follow the next few steps. It’s that easy.

Step 1: Stop Ambari Server

  1. sudo ambari-server stop

Step 2: Create keytab file

  1. ktutil
  2. addent -password -p ##USER##@##DOMAIN##.COM -k 1 -e RC4-HMAC
  3. # Enter password
  4. wkt ##USER##.keytab
  5. q
  6. $ sudo mkdir /etc/security/keytabs
  7. $ mv ##USER##.keytab /etc/security/keytabs

Step 3: Test Keytab. You should see the ticket once you klist.

  1. kinit -kt /etc/security/keytabs/ambarisa.keytab -a ambarisa@AERYON.COM
  2. klist

Step 4: Run Ambari Server Kerberos Setup

  1. sudo ambari-server setup-kerberos

Follow the prompts. Say true to enabling kerberos. The keytab file will be the /etc/security/##USER##.keytab file. You should be able to leave the rest defaults. Save the settings and you are done.

Step 5: Remove the kinit ticket you created that way you can make sure you kerberos authentication is working correctly.

  1. kdestroy

Step 6: Start Ambari Server

  1. sudo ambari-server start

Step 7: Validate Kerberos. You should see your ticket get created and you should now be able to login with no issues.

  1. klist

HortonWorks: Install YARN/MR

This entry is part 6 of 7 in the series HortonWorks

This tutorial guides you through installing YARN/MapReduce on Hortonworks using a multi node cluster setup with Ubuntu OS.

Step 1: Go to “Stack and Version”. Then click “Add Service” on YARN. You will notice that “MapReduce2” comes with it.

Step 2: Assign Masters I usually put the ResourceManager, History Server and App Timeline Server all on the secondary namenode. But it is totally up to you how you setup your environment.

Step 3: Assign Slaves and Clients I put NodeManagers on all the datanodes and Client’s on all servers. Up to you though. This is what worked for me and my requirements.

Step 4: During Customize Services you may get the warning that Ambari Metrics “hbase_master_heapsize” needs to be increased. I recommend doing this change but it’s up to you and what makes sense in your environment.

Step 5: Follow the remaining steps and installation should complete with no issues. Should an issue arise review the error and if it was just a turning on connection error then you may not have any issues and it just needs all services to be stopped and started again. Please not Ambari Metrics may report errors but they should clear in around 15 minutes.