NiFi: Custom Processor

(Last Updated On: )

The following tutorial shows you how to create a custom nifi processor.

Create Project:

  1. Install Maven
  2. Create a folder called “nifi”
  3. navigate into “nifi” folder and run
    mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=1.0.0 -DnifiVersion=1.0.0
  4. Put in your “groupId” when it asks.
    1. I used “com.test”
  5. Put in your “artifactId” when it asks.
    1. I used “processor”
  6. You can accept the default “version”.
  7. Put in your “artifactBaseName” when it asks.
    1. I used “MyProcessor”
  8. Once it completes you can import the maven project into Eclipse.
  9. You will get two projects
    1. nar
    2. processor
  10. You should then have two files like below created.

MyProcessor.java:

package com.test.processors;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.*;

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder().name("My Property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
            .name("my_relationship")
            .description("Example relationship")
            .build();

    private List descriptors;

    private Set relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List descriptors = new ArrayList();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set relationships = new HashSet();
        relationships.add(MY_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set getRelationships() {
        return this.relationships;
    }

    @Override
    public final List getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        // TODO implement
        session.transfer(flowFile, MY_RELATIONSHIP);
    }
}

MyProcessorTest.java:

This is the unit test for nifi.

package com.test.processors;

import static org.junit.Assert.*;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.List;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;

public class MyProcessorTest {
    private TestRunner testRunner;

    @Before
    public void init() {
        testRunner = TestRunners.newTestRunner(MyProcessor.class);
    }

    @Test
    public void testProcessor() {
    	final InputStream content = new ByteArrayInputStream(new byte[0]);
    	testRunner.setProperty("My Property", "test");
        testRunner.enqueue(content);
        testRunner.run(1);
        testRunner.assertQueueEmpty();
        
        final List results = testRunner.getFlowFilesForRelationship(MyProcessor.MY_RELATIONSHIP);
        assertTrue("1 match", results.size() == 1);
    }
}

Optional:

Nar Directory:

You can create a custom nar directory to deploy your custom nifi processors to. You can either use the nifi/lib directory or specify your own. To specify your own edit the “nifi.properties” file.

cd /nifi/conf/
nano nifi.properties

Look for “nifi.nar.library.directory.”.
Add the following: nifi.nar.library.directory.anyname=/your/directory/