NiFi: Custom Processor

(Last Updated On: )

The following tutorial shows you how to create a custom nifi processor.

Create Project:

  1. Install Maven
  2. Create a folder called “nifi”
  3. navigate into “nifi” folder and run
    1. mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=1.0.0 -DnifiVersion=1.0.0
  4. Put in your “groupId” when it asks.
    1. I used “com.test”
  5. Put in your “artifactId” when it asks.
    1. I used “processor”
  6. You can accept the default “version”.
  7. Put in your “artifactBaseName” when it asks.
    1. I used “MyProcessor”
  8. Once it completes you can import the maven project into Eclipse.
  9. You will get two projects
    1. nar
    2. processor
  10. You should then have two files like below created.

MyProcessor.java:

  1. package com.test.processors;
  2.  
  3. import org.apache.nifi.components.PropertyDescriptor;
  4. import org.apache.nifi.flowfile.FlowFile;
  5. import org.apache.nifi.processor.*;
  6. import org.apache.nifi.annotation.behavior.ReadsAttribute;
  7. import org.apache.nifi.annotation.behavior.ReadsAttributes;
  8. import org.apache.nifi.annotation.behavior.WritesAttribute;
  9. import org.apache.nifi.annotation.behavior.WritesAttributes;
  10. import org.apache.nifi.annotation.lifecycle.OnScheduled;
  11. import org.apache.nifi.annotation.documentation.CapabilityDescription;
  12. import org.apache.nifi.annotation.documentation.SeeAlso;
  13. import org.apache.nifi.annotation.documentation.Tags;
  14. import org.apache.nifi.processor.exception.ProcessException;
  15. import org.apache.nifi.processor.util.StandardValidators;
  16.  
  17. import java.util.*;
  18.  
  19. @Tags({"example"})
  20. @CapabilityDescription("Provide a description")
  21. @SeeAlso({})
  22. @ReadsAttributes({@ReadsAttribute(attribute="", description="")})
  23. @WritesAttributes({@WritesAttribute(attribute="", description="")})
  24. public class MyProcessor extends AbstractProcessor {
  25.  
  26. public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
  27. .Builder().name("My Property")
  28. .description("Example Property")
  29. .required(true)
  30. .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
  31. .build();
  32.  
  33. public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
  34. .name("my_relationship")
  35. .description("Example relationship")
  36. .build();
  37.  
  38. private List descriptors;
  39.  
  40. private Set relationships;
  41.  
  42. @Override
  43. protected void init(final ProcessorInitializationContext context) {
  44. final List descriptors = new ArrayList();
  45. descriptors.add(MY_PROPERTY);
  46. this.descriptors = Collections.unmodifiableList(descriptors);
  47.  
  48. final Set relationships = new HashSet();
  49. relationships.add(MY_RELATIONSHIP);
  50. this.relationships = Collections.unmodifiableSet(relationships);
  51. }
  52.  
  53. @Override
  54. public Set getRelationships() {
  55. return this.relationships;
  56. }
  57.  
  58. @Override
  59. public final List getSupportedPropertyDescriptors() {
  60. return descriptors;
  61. }
  62.  
  63. @OnScheduled
  64. public void onScheduled(final ProcessContext context) {
  65.  
  66. }
  67.  
  68. @Override
  69. public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
  70. FlowFile flowFile = session.get();
  71. if ( flowFile == null ) {
  72. return;
  73. }
  74. // TODO implement
  75. session.transfer(flowFile, MY_RELATIONSHIP);
  76. }
  77. }

MyProcessorTest.java:

This is the unit test for nifi.

  1. package com.test.processors;
  2.  
  3. import static org.junit.Assert.*;
  4.  
  5. import java.io.ByteArrayInputStream;
  6. import java.io.InputStream;
  7. import java.util.List;
  8.  
  9. import org.apache.nifi.util.MockFlowFile;
  10. import org.apache.nifi.util.TestRunner;
  11. import org.apache.nifi.util.TestRunners;
  12. import org.junit.Before;
  13. import org.junit.Test;
  14.  
  15. public class MyProcessorTest {
  16. private TestRunner testRunner;
  17.  
  18. @Before
  19. public void init() {
  20. testRunner = TestRunners.newTestRunner(MyProcessor.class);
  21. }
  22.  
  23. @Test
  24. public void testProcessor() {
  25. final InputStream content = new ByteArrayInputStream(new byte[0]);
  26. testRunner.setProperty("My Property", "test");
  27. testRunner.enqueue(content);
  28. testRunner.run(1);
  29. testRunner.assertQueueEmpty();
  30. final List results = testRunner.getFlowFilesForRelationship(MyProcessor.MY_RELATIONSHIP);
  31. assertTrue("1 match", results.size() == 1);
  32. }
  33. }

Optional:

Nar Directory:

You can create a custom nar directory to deploy your custom nifi processors to. You can either use the nifi/lib directory or specify your own. To specify your own edit the “nifi.properties” file.

  1. cd /nifi/conf/
  2. nano nifi.properties

Look for “nifi.nar.library.directory.”.
Add the following: nifi.nar.library.directory.anyname=/your/directory/