The following tutorial shows you how to create a custom nifi processor.
Create Project:
- Install Maven
- Create a folder called “nifi”
- navigate into “nifi” folder and run
- mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=1.0.0 -DnifiVersion=1.0.0
- Put in your “groupId” when it asks.
- I used “com.test”
- Put in your “artifactId” when it asks.
- I used “processor”
- You can accept the default “version”.
- Put in your “artifactBaseName” when it asks.
- I used “MyProcessor”
- Once it completes you can import the maven project into Eclipse.
- You will get two projects
- nar
- processor
- You should then have two files like below created.
MyProcessor.java:
- package com.test.processors;
- import org.apache.nifi.components.PropertyDescriptor;
- import org.apache.nifi.flowfile.FlowFile;
- import org.apache.nifi.processor.*;
- import org.apache.nifi.annotation.behavior.ReadsAttribute;
- import org.apache.nifi.annotation.behavior.ReadsAttributes;
- import org.apache.nifi.annotation.behavior.WritesAttribute;
- import org.apache.nifi.annotation.behavior.WritesAttributes;
- import org.apache.nifi.annotation.lifecycle.OnScheduled;
- import org.apache.nifi.annotation.documentation.CapabilityDescription;
- import org.apache.nifi.annotation.documentation.SeeAlso;
- import org.apache.nifi.annotation.documentation.Tags;
- import org.apache.nifi.processor.exception.ProcessException;
- import org.apache.nifi.processor.util.StandardValidators;
- import java.util.*;
- @Tags({"example"})
- @CapabilityDescription("Provide a description")
- @SeeAlso({})
- @ReadsAttributes({@ReadsAttribute(attribute="", description="")})
- @WritesAttributes({@WritesAttribute(attribute="", description="")})
- public class MyProcessor extends AbstractProcessor {
- public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
- .Builder().name("My Property")
- .description("Example Property")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
- .name("my_relationship")
- .description("Example relationship")
- .build();
- private List descriptors;
- private Set relationships;
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List descriptors = new ArrayList();
- descriptors.add(MY_PROPERTY);
- this.descriptors = Collections.unmodifiableList(descriptors);
- final Set relationships = new HashSet();
- relationships.add(MY_RELATIONSHIP);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
- @Override
- public Set getRelationships() {
- return this.relationships;
- }
- @Override
- public final List getSupportedPropertyDescriptors() {
- return descriptors;
- }
- @OnScheduled
- public void onScheduled(final ProcessContext context) {
- }
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- FlowFile flowFile = session.get();
- if ( flowFile == null ) {
- return;
- }
- // TODO implement
- session.transfer(flowFile, MY_RELATIONSHIP);
- }
- }
MyProcessorTest.java:
This is the unit test for nifi.
- package com.test.processors;
- import static org.junit.Assert.*;
- import java.io.ByteArrayInputStream;
- import java.io.InputStream;
- import java.util.List;
- import org.apache.nifi.util.MockFlowFile;
- import org.apache.nifi.util.TestRunner;
- import org.apache.nifi.util.TestRunners;
- import org.junit.Before;
- import org.junit.Test;
- public class MyProcessorTest {
- private TestRunner testRunner;
- @Before
- public void init() {
- testRunner = TestRunners.newTestRunner(MyProcessor.class);
- }
- @Test
- public void testProcessor() {
- final InputStream content = new ByteArrayInputStream(new byte[0]);
- testRunner.setProperty("My Property", "test");
- testRunner.enqueue(content);
- testRunner.run(1);
- testRunner.assertQueueEmpty();
- final List results = testRunner.getFlowFilesForRelationship(MyProcessor.MY_RELATIONSHIP);
- assertTrue("1 match", results.size() == 1);
- }
- }
Optional:
Nar Directory:
You can create a custom nar directory to deploy your custom nifi processors to. You can either use the nifi/lib directory or specify your own. To specify your own edit the “nifi.properties” file.
- cd /nifi/conf/
- nano nifi.properties
Look for “nifi.nar.library.directory.”.
Add the following: nifi.nar.library.directory.anyname=/your/directory/