How To Make a Custom NiFi Processor

How To Make a Custom NiFi Processor

I read a couple of forum posts(link1, link2) about converting CSV data to JSON with Apache NiFi. There are already solutions proposed for going from CSV to JSON in both of those links, one of which is writing your own Custom Processor. Since I already have code to convert data from CSV to JSON (see my post), I decided to write a NiFi Processor to accomplish the same thing. This blog entry will show how that was done.

NiFi has a guide for developers reviewing several topics, including the Processor API. The NiFi team also has a Confluence page that documents the archetype command necessary to make a template processor project. The Maven archetype command for creating your processor template is:

mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=1.0.0 -DnifiVersion=1.1.0

Once you run the command, you’ll be prompted for input. Guidance for what to put in each field can be found in the developer guide or the confluence page linked above. If you make a mistake you can always refactor the names later. Once the command is run you’ll have a project structure that looks similar to this:

NiFi Processor Project Structure. Picture retrieved from this blog post: http://bryanbende.com/development/2015/02/04/custom-processors-for-apache-nifi
NiFi Processor Project Structure. Picture retrieved from this blog post: http://bryanbende.com/development/2015/02/04/custom-processors-for-apache-nifi

 

Before moving forward, I want to note the importance of the ‘org.apache.nifi.processor.Processor’ file located in ‘src/main/resources/META-INF/services’. This file defines the location of the processor you’ll be writing. If you rename the Java file for your processor, make sure you also update the processor name in the file as well.  With the project structure now in place you can begin to develop your first processor. Going forward, I will be using code snippets from the processor I developed to discuss properties, relationships, the onTrigger method, and testing the processor. The full code can be found here.

The developer guide is helpful for understanding concepts used in NiFi processors. However, the best way to learn how to do anything is by looking at the way that other people have done it. For this reason, I decided to  look at NiFi source to see how their developers write Processors. One of the first things that I noticed when looking at the processors was that each one had properties and relationships. When making my convertCSVToJSON processor I knew I needed two properties:

  • header: whether the incoming CSV contains a header
  • field names: The field names to use for the incoming CSV when converting it to flat JSON. If the incoming file already has headers, this can be empty.

Below is the code I wrote for defining those properties.

public static final PropertyDescriptor HEADER = new PropertyDescriptor
.Builder().name("header")
.displayName("header")
.description("Whether or not a header exists in the incoming CSV file.(default true)")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor
.Builder().name("Field Names")
.displayName("Field Names")
.description("Names of the fields in the CSV if no header exists. Field names must be in order.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

The properties for my processor are now defined. Next, I need to add relationships. NiFi defines Relationships as:

Relationship: Each Processor has zero or more Relationships defined for it. These Relationships are named to indicate the result of processing a FlowFile. After a Processor has finished processing a FlowFile, it will route (or “transfer”) the FlowFile to one of the Relationships. A DFM is then able to connect each of these Relationships to other components in order to specify where the FlowFile should go next under each potential processing result.

User Guide

In general you’ll have two relationships a SUCCESS relationship for all data that is successfully processed by your Processor and a FAILURE relationship for all data that’s unsuccessfully processed. Relationships aren’t a requirement for your processor you can have 0-N relationships. Below is a code snippet where I defined my relationships.

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Successfully converted incoming CSV file to JSON")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Failed to convert incoming CSV file to JSON")
.build();

Once you define your Properties and Relationships you’ll need to add them to your ‘descriptors’ and ‘relationships’ collections. In your initial MyProcessor.java class, you’ll have a template to accomplish that.  Update the code to have any additional relationships or properties you add. This is all done in the ‘init’ method. See example below:

protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(HEADER);
descriptors.add(FIELD_NAMES);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
csvMapper = new CsvMapper();
}
@Override

I decided to use the ‘onScheduled’ method to initialize my CsvSchema object. The CsvSchema only needs to be created once the processor is configured so it made sense to use the ‘onScheduled’ method in this case. You can read more about Component Lifecycle in the developer guide.  Below is a code snippet of my ‘onScheduled’ method:

public void onScheduled(final ProcessContext context) throws ConfigurationException {
//Retrieve properties from context
Boolean header = context.getProperty(HEADER).asBoolean();
String fieldNames = context.getProperty(FIELD_NAMES).getValue();
/*
* Create Schema based on properties from user.
*/
if(!header && fieldNames!=null){
Builder build = CsvSchema.builder();
for(String field : fieldNames.split(",")){
build.addColumn(field, CsvSchema.ColumnType.NUMBER_OR_STRING);
}
schema = build.build();
}else if(header && fieldNames!=null && !fieldNames.equals("")){
schema = this.buildCsvSchema(fieldNames, header);
}else if(!header && fieldNames==null){
throw new ConfigurationException("File must either contain headers or you must provide them..");
}else{
schema = CsvSchema.emptySchema().withHeader();
}
}

With all the setup out of the way, I can get down to adding the code that actually does the conversion. The work of a processor is all done in the ‘onTrigger’ method. You can see my ‘onTrigger’ method below:

public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
try {
//Read in Data
InputStream stream = session.read(flowFile);
String csv = IOUtils.toString(stream, "UTF-8");
stream.close();
//Convert CSV data to JSON
List<Map<?,?>> objects = this.readObjectsFromCsv(csv);
//Convert to JSON String
String json = this.writeAsJson(objects);
//Output Flowfile
FlowFile output = session.write(flowFile, new OutputStreamCallback(){
@Override
public void process(OutputStream outputStream) throws IOException {
IOUtils.write(json, outputStream, "UTF-8");
}
});
output = session.putAttribute(output, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
//TODO: May want to have a better default name....
output = session.putAttribute(output, CoreAttributes.FILENAME.key(), UUID.randomUUID().toString()+".json");
session.transfer(output, REL_SUCCESS);
} catch (IOException e) {
getLogger().error("Unable to process Change CSV to JSON for this file "+flowFile.getAttributes().get(CoreAttributes.FILENAME));
session.transfer(flowFile, REL_FAILURE);
}
}

The next step is to test the code. NiFi has a TestRunner interface that’s already setup for me to run the unit tests.  Just add your processor as the class that the TestRunner will use. Below is an example:

@Before
public void init() {
testRunner = TestRunners.newTestRunner(ConvertCSVToJSON.class);
}

Then you can go ahead and write your first unit test for your processor:

@Test
public void testWithHeader() throws FileNotFoundException, UnsupportedEncodingException {
//Set Headers
testRunner.setProperty(ConvertCSVToJSON.HEADER, "true");
testRunner.enqueue(new FileInputStream(new File("src/test/resources/WithHeader.csv")));
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ConvertCSVToJSON.REL_SUCCESS, 1);
List<MockFlowFile> successFiles = testRunner.getFlowFilesForRelationship(ConvertCSVToJSON.REL_SUCCESS);
for(MockFlowFile mockFile : successFiles){
System.out.println(new String(mockFile.toByteArray(), "UTF-8"));
}
}

You can find more examples of unit tests for processors here. With the code now written and tested, the only thing left is to deploy the code in your NiFi instance so you can use it. To deploy the code you copy the ‘nar’ file produced from your build into $NIFI_HOME/lib . Once you start/restart your NiFi instance, you’ll now be able to access your processor:

Insert Custom Processor Screenshot
Insert Custom Processor Screenshot
Processor in Workspace
Processor in Workspace
Configure Processor properties
Configure Processor properties

Helpful Links:

Leave a Reply

Your email address will not be published. Required fields are marked *