NiFi: ExecuteSQL Processor

In this tutorial I will guide you through how to add a processor for querying a SQL table to NiFi.

For this tutorial you will need an AVRO schema called “dttest” and it’s contents are as follows.

  1. {
  2. "type": "record",
  3. "namespace": "com.example",
  4. "name": "FullName",
  5. "fields": [
  6. { "name": "id", "type": "int" },
  7. { "name": "name", "type": "string" }
  8. ]
  9. }

First we need to drag the processor onto the grid.

Next we need select the processor ExecuteSQLRecord.

Next we must configure the processor.

 

 

 

 

 

 

 

 

Now we must create the JsonRecordWriter service.

Now we name the JsonRecordWriter

Configure the JsonWriter

Next we create the DB Connection Service

Next we name the DB Connection Service

Configure the DB Service

Now validate all the settings are as below

Now you are all done. It will now query your table.

NiFi: Custom Processor

The following tutorial shows you how to create a custom nifi processor.

Create Project:

  1. Install Maven
  2. Create a folder called “nifi”
  3. navigate into “nifi” folder and run
    1. mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=1.0.0 -DnifiVersion=1.0.0
  4. Put in your “groupId” when it asks.
    1. I used “com.test”
  5. Put in your “artifactId” when it asks.
    1. I used “processor”
  6. You can accept the default “version”.
  7. Put in your “artifactBaseName” when it asks.
    1. I used “MyProcessor”
  8. Once it completes you can import the maven project into Eclipse.
  9. You will get two projects
    1. nar
    2. processor
  10. You should then have two files like below created.

MyProcessor.java:

  1. package com.test.processors;
  2.  
  3. import org.apache.nifi.components.PropertyDescriptor;
  4. import org.apache.nifi.flowfile.FlowFile;
  5. import org.apache.nifi.processor.*;
  6. import org.apache.nifi.annotation.behavior.ReadsAttribute;
  7. import org.apache.nifi.annotation.behavior.ReadsAttributes;
  8. import org.apache.nifi.annotation.behavior.WritesAttribute;
  9. import org.apache.nifi.annotation.behavior.WritesAttributes;
  10. import org.apache.nifi.annotation.lifecycle.OnScheduled;
  11. import org.apache.nifi.annotation.documentation.CapabilityDescription;
  12. import org.apache.nifi.annotation.documentation.SeeAlso;
  13. import org.apache.nifi.annotation.documentation.Tags;
  14. import org.apache.nifi.processor.exception.ProcessException;
  15. import org.apache.nifi.processor.util.StandardValidators;
  16.  
  17. import java.util.*;
  18.  
  19. @Tags({"example"})
  20. @CapabilityDescription("Provide a description")
  21. @SeeAlso({})
  22. @ReadsAttributes({@ReadsAttribute(attribute="", description="")})
  23. @WritesAttributes({@WritesAttribute(attribute="", description="")})
  24. public class MyProcessor extends AbstractProcessor {
  25.  
  26. public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
  27. .Builder().name("My Property")
  28. .description("Example Property")
  29. .required(true)
  30. .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
  31. .build();
  32.  
  33. public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
  34. .name("my_relationship")
  35. .description("Example relationship")
  36. .build();
  37.  
  38. private List descriptors;
  39.  
  40. private Set relationships;
  41.  
  42. @Override
  43. protected void init(final ProcessorInitializationContext context) {
  44. final List descriptors = new ArrayList();
  45. descriptors.add(MY_PROPERTY);
  46. this.descriptors = Collections.unmodifiableList(descriptors);
  47.  
  48. final Set relationships = new HashSet();
  49. relationships.add(MY_RELATIONSHIP);
  50. this.relationships = Collections.unmodifiableSet(relationships);
  51. }
  52.  
  53. @Override
  54. public Set getRelationships() {
  55. return this.relationships;
  56. }
  57.  
  58. @Override
  59. public final List getSupportedPropertyDescriptors() {
  60. return descriptors;
  61. }
  62.  
  63. @OnScheduled
  64. public void onScheduled(final ProcessContext context) {
  65.  
  66. }
  67.  
  68. @Override
  69. public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
  70. FlowFile flowFile = session.get();
  71. if ( flowFile == null ) {
  72. return;
  73. }
  74. // TODO implement
  75. session.transfer(flowFile, MY_RELATIONSHIP);
  76. }
  77. }

MyProcessorTest.java:

This is the unit test for nifi.

  1. package com.test.processors;
  2.  
  3. import static org.junit.Assert.*;
  4.  
  5. import java.io.ByteArrayInputStream;
  6. import java.io.InputStream;
  7. import java.util.List;
  8.  
  9. import org.apache.nifi.util.MockFlowFile;
  10. import org.apache.nifi.util.TestRunner;
  11. import org.apache.nifi.util.TestRunners;
  12. import org.junit.Before;
  13. import org.junit.Test;
  14.  
  15. public class MyProcessorTest {
  16. private TestRunner testRunner;
  17.  
  18. @Before
  19. public void init() {
  20. testRunner = TestRunners.newTestRunner(MyProcessor.class);
  21. }
  22.  
  23. @Test
  24. public void testProcessor() {
  25. final InputStream content = new ByteArrayInputStream(new byte[0]);
  26. testRunner.setProperty("My Property", "test");
  27. testRunner.enqueue(content);
  28. testRunner.run(1);
  29. testRunner.assertQueueEmpty();
  30. final List results = testRunner.getFlowFilesForRelationship(MyProcessor.MY_RELATIONSHIP);
  31. assertTrue("1 match", results.size() == 1);
  32. }
  33. }

Optional:

Nar Directory:

You can create a custom nar directory to deploy your custom nifi processors to. You can either use the nifi/lib directory or specify your own. To specify your own edit the “nifi.properties” file.

  1. cd /nifi/conf/
  2. nano nifi.properties

Look for “nifi.nar.library.directory.”.
Add the following: nifi.nar.library.directory.anyname=/your/directory/