Browsed by
Category: Json

How To Make a Custom NiFi Processor

How To Make a Custom NiFi Processor

I read a couple of forum posts(link1, link2) about converting CSV data to JSON with Apache NiFi. There are already solutions proposed for going from CSV to JSON in both of those links, one of which is writing your own Custom Processor. Since I already have code to convert data from CSV to JSON (see my post), I decided to write a NiFi Processor to accomplish the same thing. This blog entry will show how that was done.

NiFi has a guide for developers reviewing several topics, including the Processor API. The NiFi team also has a Confluence page that documents the archetype command necessary to make a template processor project. The Maven archetype command for creating your processor template is:

mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=1.0.0 -DnifiVersion=1.1.0

Once you run the command, you’ll be prompted for input. Guidance for what to put in each field can be found in the developer guide or the confluence page linked above. If you make a mistake you can always refactor the names later. Once the command is run you’ll have a project structure that looks similar to this:

NiFi Processor Project Structure. Picture retrieved from this blog post: http://bryanbende.com/development/2015/02/04/custom-processors-for-apache-nifi
NiFi Processor Project Structure. Picture retrieved from this blog post: http://bryanbende.com/development/2015/02/04/custom-processors-for-apache-nifi

 

Before moving forward, I want to note the importance of the ‘org.apache.nifi.processor.Processor’ file located in ‘src/main/resources/META-INF/services’. This file defines the location of the processor you’ll be writing. If you rename the Java file for your processor, make sure you also update the processor name in the file as well.  With the project structure now in place you can begin to develop your first processor. Going forward, I will be using code snippets from the processor I developed to discuss properties, relationships, the onTrigger method, and testing the processor. The full code can be found here.

The developer guide is helpful for understanding concepts used in NiFi processors. However, the best way to learn how to do anything is by looking at the way that other people have done it. For this reason, I decided to  look at NiFi source to see how their developers write Processors. One of the first things that I noticed when looking at the processors was that each one had properties and relationships. When making my convertCSVToJSON processor I knew I needed two properties:

  • header: whether the incoming CSV contains a header
  • field names: The field names to use for the incoming CSV when converting it to flat JSON. If the incoming file already has headers, this can be empty.

Below is the code I wrote for defining those properties.

public static final PropertyDescriptor HEADER = new PropertyDescriptor
.Builder().name("header")
.displayName("header")
.description("Whether or not a header exists in the incoming CSV file.(default true)")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor
.Builder().name("Field Names")
.displayName("Field Names")
.description("Names of the fields in the CSV if no header exists. Field names must be in order.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

The properties for my processor are now defined. Next, I need to add relationships. NiFi defines Relationships as:

Relationship: Each Processor has zero or more Relationships defined for it. These Relationships are named to indicate the result of processing a FlowFile. After a Processor has finished processing a FlowFile, it will route (or “transfer”) the FlowFile to one of the Relationships. A DFM is then able to connect each of these Relationships to other components in order to specify where the FlowFile should go next under each potential processing result.

User Guide

In general you’ll have two relationships a SUCCESS relationship for all data that is successfully processed by your Processor and a FAILURE relationship for all data that’s unsuccessfully processed. Relationships aren’t a requirement for your processor you can have 0-N relationships. Below is a code snippet where I defined my relationships.

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Successfully converted incoming CSV file to JSON")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Failed to convert incoming CSV file to JSON")
.build();

Once you define your Properties and Relationships you’ll need to add them to your ‘descriptors’ and ‘relationships’ collections. In your initial MyProcessor.java class, you’ll have a template to accomplish that.  Update the code to have any additional relationships or properties you add. This is all done in the ‘init’ method. See example below:

protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(HEADER);
descriptors.add(FIELD_NAMES);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
csvMapper = new CsvMapper();
}
@Override

I decided to use the ‘onScheduled’ method to initialize my CsvSchema object. The CsvSchema only needs to be created once the processor is configured so it made sense to use the ‘onScheduled’ method in this case. You can read more about Component Lifecycle in the developer guide.  Below is a code snippet of my ‘onScheduled’ method:

public void onScheduled(final ProcessContext context) throws ConfigurationException {
//Retrieve properties from context
Boolean header = context.getProperty(HEADER).asBoolean();
String fieldNames = context.getProperty(FIELD_NAMES).getValue();
/*
* Create Schema based on properties from user.
*/
if(!header && fieldNames!=null){
Builder build = CsvSchema.builder();
for(String field : fieldNames.split(",")){
build.addColumn(field, CsvSchema.ColumnType.NUMBER_OR_STRING);
}
schema = build.build();
}else if(header && fieldNames!=null && !fieldNames.equals("")){
schema = this.buildCsvSchema(fieldNames, header);
}else if(!header && fieldNames==null){
throw new ConfigurationException("File must either contain headers or you must provide them..");
}else{
schema = CsvSchema.emptySchema().withHeader();
}
}

With all the setup out of the way, I can get down to adding the code that actually does the conversion. The work of a processor is all done in the ‘onTrigger’ method. You can see my ‘onTrigger’ method below:

public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
try {
//Read in Data
InputStream stream = session.read(flowFile);
String csv = IOUtils.toString(stream, "UTF-8");
stream.close();
//Convert CSV data to JSON
List<Map<?,?>> objects = this.readObjectsFromCsv(csv);
//Convert to JSON String
String json = this.writeAsJson(objects);
//Output Flowfile
FlowFile output = session.write(flowFile, new OutputStreamCallback(){
@Override
public void process(OutputStream outputStream) throws IOException {
IOUtils.write(json, outputStream, "UTF-8");
}
});
output = session.putAttribute(output, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
//TODO: May want to have a better default name....
output = session.putAttribute(output, CoreAttributes.FILENAME.key(), UUID.randomUUID().toString()+".json");
session.transfer(output, REL_SUCCESS);
} catch (IOException e) {
getLogger().error("Unable to process Change CSV to JSON for this file "+flowFile.getAttributes().get(CoreAttributes.FILENAME));
session.transfer(flowFile, REL_FAILURE);
}
}

The next step is to test the code. NiFi has a TestRunner interface that’s already setup for me to run the unit tests.  Just add your processor as the class that the TestRunner will use. Below is an example:

@Before
public void init() {
testRunner = TestRunners.newTestRunner(ConvertCSVToJSON.class);
}

Then you can go ahead and write your first unit test for your processor:

@Test
public void testWithHeader() throws FileNotFoundException, UnsupportedEncodingException {
//Set Headers
testRunner.setProperty(ConvertCSVToJSON.HEADER, "true");
testRunner.enqueue(new FileInputStream(new File("src/test/resources/WithHeader.csv")));
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ConvertCSVToJSON.REL_SUCCESS, 1);
List<MockFlowFile> successFiles = testRunner.getFlowFilesForRelationship(ConvertCSVToJSON.REL_SUCCESS);
for(MockFlowFile mockFile : successFiles){
System.out.println(new String(mockFile.toByteArray(), "UTF-8"));
}
}

You can find more examples of unit tests for processors here. With the code now written and tested, the only thing left is to deploy the code in your NiFi instance so you can use it. To deploy the code you copy the ‘nar’ file produced from your build into $NIFI_HOME/lib . Once you start/restart your NiFi instance, you’ll now be able to access your processor:

Insert Custom Processor Screenshot
Insert Custom Processor Screenshot
Processor in Workspace
Processor in Workspace
Configure Processor properties
Configure Processor properties

Helpful Links:

Jackson based JAX-RS Providers(JSON, XML, CSV) Example

Jackson based JAX-RS Providers(JSON, XML, CSV) Example

This blog post discusses returning multiple data formats from your RESTFul endpoints using Jackson JAX-RS providers. Jackson provides a lot of providers that you can see here . The providers allow you to return a POJO(plain old java object) from your REST annotated methods and give back the appropriate media type. Jackson also gives you the ability make your own Entity Providers. Documentation for how to do that is here below I’ll show a generic way to provide CSV’s along with XML and JSON.

Pom

Sample Model Objects

REST Resource

Custom Entity Provider

Configuration

Instructions

Screen Shots

Pom

[code language=”xml”]
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>datadidit.helpful.hints</groupId>
<artifactId>parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>jetty-cxf</artifactId>
<name>jetty-cxf</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-jaxrs</artifactId>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>${jetty.version}</version>
</plugin>
</plugins>
</build>
</project>
[/code]

Model

Simple model object.

[code language=”java”]
package datadidit.helpful.hints.csv.test.model;

import java.util.Date;

import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement(name="SimpleSample")
public class SimpleSample {
private String firstName;

private String lastName;

private Date dob;

public SimpleSample(){}

public SimpleSample(String firstName, String lastName, Date dob){
this.dob = dob;
this.firstName = firstName;
this.lastName = lastName;
}

public String getFirstName() {
return firstName;
}

public void setFirstName(String firstName) {
this.firstName = firstName;
}

public String getLastName() {
return lastName;
}

public void setLastName(String lastName) {
this.lastName = lastName;
}

public Date getDob() {
return dob;
}

public void setDob(Date dob) {
this.dob = dob;
}

}
[/code]

POJO with a Data Structure(Map) embedded in it. This implements the CSVTransformer interface so the CSV Entity Provider below can know how to flatten the POJO.

[code language=”java”]
package datadidit.helpful.hints.csv.test.model;

import java.util.HashMap;
import java.util.Map;

import javax.xml.bind.annotation.XmlRootElement;

import datadidit.helpful.hints.csv.provider.CSVTransformer;

@XmlRootElement(name="ComplexSample")
public class ComplexSample implements CSVTransformer{
private String studentId;

private Map<String, String> grades;

public ComplexSample(){}

public ComplexSample(String studentId, Map<String, String> grades){
this.studentId = studentId;
this.grades = grades;
}

@Override
public Map<?, ?> flatten() {
Map<String, Object> myMap = new HashMap<>();
myMap.put("studentId", studentId);
myMap.putAll(grades);

return myMap;
}

public String getStudentId() {
return studentId;
}

public void setStudentId(String studentId) {
this.studentId = studentId;
}

public Map<String, String> getGrades() {
return grades;
}

public void setGrades(Map<String, String> grades) {
this.grades = grades;
}

}
[/code]

REST

REST Endpoint defining URLs for the Web Service.

[code language=”java”]
package datadidit.helpful.hints.rest;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import datadidit.helpful.hints.csv.provider.CSVTransformer;
import datadidit.helpful.hints.csv.test.model.ComplexSample;
import datadidit.helpful.hints.csv.test.model.SimpleSample;

@Path("CustomProvider")
public class CXFSampleResource {
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, "application/csv"})
@Path("test/{caseToUse}")
public List<?> doSomething(@PathParam("caseToUse") @DefaultValue("simple") String caseToUse){
List<Object> test = new ArrayList<>();
if(caseToUse.equalsIgnoreCase("simple")){
for(SimpleSample samp : this.generateSimpleSample())
test.add(samp);
}else{
for(ComplexSample samp : this.generateComplexSample())
test.add(samp);
}

System.out.println("Hello: "+test);
return test;
}

public List<SimpleSample> generateSimpleSample(){
List<SimpleSample> samples = new ArrayList<>();
samples.add(new SimpleSample("hello", "world", new Date()));
samples.add(new SimpleSample("hello", "chad", new Date()));
samples.add(new SimpleSample("hello", "marcus", new Date()));
samples.add(new SimpleSample("hello", "joy", new Date()));
samples.add(new SimpleSample("hello", "mom", new Date()));

return samples;
}

public List<ComplexSample> generateComplexSample(){
Map<String, String> grades = new HashMap<>();
grades.put("Class1", "A");
grades.put("Class2", "B");
grades.put("Class3", "C");
grades.put("Class4", "D");

List<ComplexSample> samples = new ArrayList<>();
samples.add(new ComplexSample(UUID.randomUUID().toString(), grades));
samples.add(new ComplexSample(UUID.randomUUID().toString(), grades));
samples.add(new ComplexSample(UUID.randomUUID().toString(), grades));
samples.add(new ComplexSample(UUID.randomUUID().toString(), grades));

return samples;
}
}
[/code]

Custom Entity Provider

Generic Interface that provides method for flattening a POJO so that the POJO
can be converted to a csv.

[code language=”java”]
package datadidit.helpful.hints.csv.provider;

import java.util.Map;

public interface CSVTransformer {
/**
* Utility method to Flatten POJO so that it can be converted into a CSV
* @return
*/
Map<?,?> flatten();
}
[/code]

Generic Entity Provider to generate a CSV file from a List of POJOs. Uses Jacksons CSV dataformat.

[code language=”java”]
package datadidit.helpful.hints.csv.provider;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyWriter;

import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;

@Produces("application/csv")
public class CSVBodyWriter implements MessageBodyWriter<Object>{
Logger logger = Logger.getLogger(CSVBodyWriter.class.getName());

public long getSize(Object myCollectionOfObjects, Class type, Type genericType, Annotation[] annotations,
MediaType arg4) {
return 0;
}

public boolean isWriteable(Class type, Type genericType, Annotation[] annotations,
MediaType arg3) {
return true;
}

public void writeTo(Object myCollectionOfObjects, Class type, Type genericType, Annotation[] annotations,
MediaType mediaType, MultivaluedMap httpHeaders, OutputStream entityHeaders)
throws IOException, WebApplicationException {
//Whatever makes it in here should be a list
List<?> myList = new ArrayList<>();
if(myCollectionOfObjects instanceof List && ((myList=(List<?>)myCollectionOfObjects).size()>0)){
CsvMapper csvMapper = new CsvMapper();
CsvSchema schema = null;

/*
* If it’s not a flat POJO must implement
* CSVTransformer
*/
if(implementsCSVTransformer(myList.get(0).getClass())){
Class[] params = {};
try {
Method meth = CSVTransformer.class.getDeclaredMethod("flatten", params);

/*
* Create a new list using the toMap() function
*/
List<Map<String, ?>> listOfMaps = new ArrayList<>();
Set<String> headers = null;
for(Object obj : myList){
Map<String, ?> keyVals = (Map<String, ?>) meth.invoke(obj, params);

if(schema==null){
schema = this.buildSchemaFromKeySet(keyVals.keySet());
headers = keyVals.keySet();
}

//Validate that latest headers are the same as the original ones
if(headers.equals(keyVals.keySet()))
listOfMaps.add(keyVals);
else
logger.warning("Headers should be the same for each objects in the list, excluding this object "+keyVals);
}

csvMapper.writer(schema).writeValue(entityHeaders, listOfMaps);
} catch (Exception e) {
throw new IOException("Unable to retrieve flatten() "+e.getMessage());
}
}else{
schema = csvMapper.schemaFor(myList.get(0).getClass()).withHeader();
csvMapper.writer(schema).writeValue(entityHeaders, myList);
}
}else if(myList.isEmpty()){
logger.warning("Nothing in list to convert to CSV….");
entityHeaders.write(myList.toString().getBytes(Charset.forName("UTF-8")));
}else{
throw new IOException("Not in proper format must pass a java.util.List to use this MessageBodyWriter…");
}
}

public CsvSchema buildSchemaFromKeySet(Set<String> keySet){
Builder build = CsvSchema.builder();
for(String field : keySet){
build.addColumn(field);
}
CsvSchema schema = build.build().withHeader();
return schema;
}

public Boolean implementsCSVTransformer(Class arg1){
Class[] interfaces = arg1.getInterfaces();
for(Class aClass : interfaces){
if(aClass.getName().equals(CSVTransformer.class.getName()))
return true;
}

return false;
}
}
[/code]

Configuration

This xml file configures the CXF servlet, extension mappings and providers for your Web Service to use. Some good docs on this configuration file can be found here.

[code language=”xml”]
<?xml version="1.0" encoding="UTF-8"?>
<web-app id="WebApp_ID" version="2.4"
xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">
<display-name>CSV Provider Test</display-name>
<servlet>
<servlet-name>MyApplication</servlet-name>
<servlet-class>org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet</servlet-class>
<!– Name of the resource –>
<init-param>
<param-name>jaxrs.serviceClasses</param-name>
<param-value>
datadidit.helpful.hints.rest.CXFSampleResource,
</param-value>
</init-param>
<!– Name of the providers –>
<init-param>
<param-name>jaxrs.providers</param-name>
<param-value>
com.fasterxml.jackson.jaxrs.xml.JacksonJaxbXMLProvider,
com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider,
datadidit.helpful.hints.csv.provider.CSVBodyWriter
</param-value>
</init-param>
<!– Name of the extensions –>
<init-param>
<param-name>jaxrs.extensions</param-name>
<param-value>
csv=application/csv
json=application/json
xml=application/xml
</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>

<servlet-mapping>
<servlet-name>MyApplication</servlet-name>
<url-pattern>/*</url-pattern>
</servlet-mapping>
</web-app>
[/code]

Instructions

  1. From the root of the project run ‘mvn jetty:run’
  2. For the simple example run:
    • xml: http://localhost:8080/CustomProvider/test/simple
    • json: http://localhost:8080/CustomProvider/test/simple.json
    • csv: http://localhost:8080/CustomProvider/test/simple.csv
  3. For the complex example run:
    • xml: http://localhost:8080/CustomProvider/test/complex
    • json: http://localhost:8080/CustomProvider/test/complex.json
    • csv: http://localhost:8080/CustomProvider/test/complex.csv

Screen Shots

When you hit the ‘.csv’ extensions depending on your browser you may notice that the csv is just downloaded as a file.

simple_xml

simple_json

complex_xml

complex_json

An Example CSV to Mongo Dataflow

An Example CSV to Mongo Dataflow

Do you have a bunch of CSV files laying around that you’d like to be able to run queries against? This post is going to discuss a way to do that with Apache Camel and a processor I wrote.

Prerequisites:

  • Ability to run a mvn camel:run
  • A MongoDB installation available

Technologies used:

Wrote a Custom camel processor to take in a CSV file and turn it into JSON. Jackson was my tool of choice for converting the CSV to a JSON file. Below you can see the code for the custom processor:

package datadidit.helpful.hints.camel;
import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
/**
* Camel Processor that reads in a CSV file and produces a JSON Exchange for
* each line.
*
* Helpful Example:
* http://camel.apache.org/how-do-i-write-a-custom-processor-which-sends-multiple-messages.html
* http://stackoverflow.com/questions/19766266/convert-csv-file-directly-to-json-file-using-jackson-library
*/
public class CSVToJson implements Processor{
private Logger logger = Logger.getLogger(CSVToJson.class.getName());
ProducerTemplate producer;
private Boolean header;
private Boolean fieldNames;
private CsvSchema schema;
public CSVToJson(Boolean header, String fieldNames) throws Exception{
if(!header && fieldNames!=null){
Builder build = CsvSchema.builder();
for(String field : fieldNames.split(",")){
build.addColumn(field, CsvSchema.ColumnType.NUMBER_OR_STRING);
}
schema = build.build();
}else if(header && fieldNames!=null && !fieldNames.equals("")){
schema = this.buildCsvSchema(fieldNames, header);
}else if(!header && fieldNames==null){
throw new Exception("File must either contain headers or you must provide them..");
}else{
schema = CsvSchema.emptySchema().withHeader();
}
}
//TODO: Should probably do it this way at some point....
private CsvSchema buildCsvSchema(String fieldNames, Boolean withHeader){
Builder build = CsvSchema.builder();
for(String field : fieldNames.split(",")){
String[] fieldWithType = field.split("#");
if(fieldWithType.length==2){
logger.info("Field: "+fieldWithType[0]);
logger.info("Type: "+fieldWithType[1]);
build.addColumn(fieldWithType[0], CsvSchema.ColumnType.valueOf(fieldWithType[1]));
}else{
build.addColumn(field);
}
}
if(withHeader){
return build.build().withHeader();
}
return build.build();
}
public void process(Exchange arg0) throws Exception {
InputStream stream = arg0.getIn().getBody(InputStream.class);
List<Map<?, ?>> objects = readObjectsFromCsv(stream);
for(Map<?,?> map : objects){
//Create JSON
final String json = writeAsJson(map);
producer.send(new Processor(){
public void process(Exchange outExchange){
outExchange.getIn().setBody(json);
}
});
}
//TODO:If you don't close the stream this processor will continue to try and process the exchange...
stream.close();
}
public List<Map<?, ?>> readObjectsFromCsv(InputStream file) throws IOException {
CsvMapper csvMapper = new CsvMapper();
String csv = IOUtils.toString(file, "UTF-8");
MappingIterator<Map<?, ?>> mappingIterator = csvMapper.readerFor(Map.class).with(schema).readValues(csv);
return this.fixMap(mappingIterator.readAll());
}
public List<Map<?,?>> readObjectsFromCsv(String fileContent) throws JsonProcessingException, IOException{
CsvMapper csvMapper = new CsvMapper();
MappingIterator<Map<?, ?>> mappingIterator = csvMapper.readerFor(Map.class).with(schema).readValues(fileContent);
return this.fixMap(mappingIterator.readAll());
}
//TODO: This is a HACK, use library or submit bug
public List<Map<?,?>> fixMap(List<Map<?,?>> map){
List<Map<?,?>> newList = new ArrayList<>();
for(Map<?, ?> entry : map){
Map<String,Object> newMap = new HashMap<String,Object>();
for(Map.Entry<?, ?> mEntry : entry.entrySet()){
String value = mEntry.getValue().toString();
//Need to remove leading . for isNumeric to work with Doubles
if(value.startsWith(".") && StringUtils.isNumeric(value.substring(1))){
newMap.put(mEntry.getKey().toString(), Double.parseDouble(value));
}else if(StringUtils.isNumeric(mEntry.getValue().toString())){
newMap.put(mEntry.getKey().toString(), Integer.parseInt(value));
}else{
newMap.put(mEntry.getKey().toString(), mEntry.getValue().toString());
}
}
newList.add(newMap);
}
return newList;
}
public String writeAsJson(List<Map<?, ?>> data) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(data);
}
/**
* Taken from: https://itexpertsconsultant.wordpress.com/2016/08/03/how-to-readwrite-csv-file-to-map-in-java/
* @param listOfMap
* @param writer
* @throws IOException
*/
public void csvWriter(List<Map<?, ?>> listOfMap, Writer writer) throws IOException {
CsvSchema schema = null;
CsvSchema.Builder schemaBuilder = CsvSchema.builder();
if (listOfMap != null && !listOfMap.isEmpty()) {
for (Object col : listOfMap.get(0).keySet()) {
schemaBuilder.addColumn(col.toString());
}
schema = schemaBuilder.build().withLineSeparator("\r").withHeader();
}
CsvMapper mapper = new CsvMapper();
mapper.writer(schema).writeValues(writer).writeAll(listOfMap);
writer.flush();
}
public String writeAsJson(Map<?, ?> data) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(data);
}
public void setProducer(ProducerTemplate producer){
this.producer = producer;
}
public Boolean getHeader() {
return header;
}
public void setHeader(Boolean header) {
this.header = header;
}
public Boolean getFieldNames() {
return fieldNames;
}
public void setFieldNames(Boolean fieldNames) {
this.fieldNames = fieldNames;
}
}
view raw CSVToJson.java hosted with ❤ by GitHub

Once the custom processor was in place I used Camels XML DSL which allows users to use camel without writing any code:

<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<template id="myTemplate" defaultEndpoint="direct:handleJson"/>
<!--
Route that takes in a CSV file and outputs JSON
-->
<route id="csv-to-json">
<from uri="file:///Users/mkwyc_000/workspace/git/apache-camel/process-csv?delete=true" />
<process ref="csvToJson" />
<to uri="file:///Users/mkwyc_000/workspace/git/apache-camel/process-complete" />
</route>
<!--
Route to turn CSV into JSON and insert into Mongo
-->
<route id="handleJson">
<from uri="direct:handleJson" />
<log message="Got ${body}" />
<to uri="mongodb:mongoBean?database=example&amp;collection=testCollection&amp;operation=insert" />
</route>
</camelContext>
<!--
Bean instantiation for Mongo Client
-->
<bean id="mongoBean" class="com.mongodb.Mongo">
<constructor-arg name="host" value="127.0.0.1" />
<constructor-arg name="port" value="27017" />
</bean>
<!--
Bean instantiation of CSVToJson
-->
<bean id="csvToJson" class="datadidit.helpful.hints.camel.CSVToJson" >
<property name="producer" ref="myTemplate" />
<constructor-arg type="java.lang.Boolean" value="true"/>
<constructor-arg type="java.lang.String" value="" />
</bean>
</beans>
view raw camelContext.xml hosted with ❤ by GitHub

In the Camel Context above there are two routes csv-to-json and handleJson. The csv-to-json portion of the route first uses the file component to read in any data dropped into the directory defined in the ‘from’ URI. Then it passes that data to my custom processor which is defined by this bean:

<bean id="csvToJson" class="datadidit.helpful.hints.camel.CSVToJson" >
<property name="producer" ref="myTemplate" />
<constructor-arg type="java.lang.Boolean" value="true"/>
<constructor-arg type="java.lang.String" value="" />
</bean>
view raw camelContext.xml hosted with ❤ by GitHub

The custom processor reads in the Exchange(csv file) and uses Jackson to convert each line of the incoming CSV to JSON. The producerTemplate sends the produced JSON to the handleJson route. Once in the handleJson route I use Camels Mongodb component to insert the data into Mongodb using this line:

<route id="handleJson">
<from uri="direct:handleJson" />
<log message="Got ${body}" />
<to uri="mongodb:mongoBean?database=example&amp;collection=testCollection&amp;operation=insert" />
</route>
view raw camelContext.xml hosted with ❤ by GitHub

The mongo bean is defined here:

<bean id="mongoBean" class="com.mongodb.Mongo">
<constructor-arg name="host" value="127.0.0.1" />
<constructor-arg name="port" value="27017" />
</bean>
view raw camelContext.xml hosted with ❤ by GitHub

That’s all the code and xml it takes to get csv into MongoDB. Users can now put their CSV files into MongoDB easily by following these steps:

  1. Update the camel context variables to monitor the appropriate directories for your environment. You can also update the mongodb portion of the route to insert into the appropriate DB and Collection for your situation.
  2. Start mongodb there are multiple ways to get it up and running I just used ‘mongod –dbpath d:\mongodb-data’
  3. Run ‘mvn camel:run’ to get your route Camel route started
  4. Drop your data into the ‘process.csv’ directory you defined in your camel context.
  5. Log into mongodb by typing this command at the console: ‘mongo’
  6. In the console run ‘db.<‘collection name in route’>.find() to see all your data

 

JSON to CSV with Jackson

JSON to CSV with Jackson

Needed to convert some JSON output to CSV this week on one of my REST endpoints. Previously on this endpoint I was using Gson to do the conversion. Gson does not natively support JSON to CSV conversion. So though I love how easy it is for me to type:

[code language=”java”]
Gson gson = new Gson();

gson.toJson("Hello World");
[/code]

and easily retrtieve my Json. I switched over to using Jackson because it does support CSV conversion of my Pojos as well. When trying to figure this out the following links came in very handy:

stackoverflow

cowtowncoder

jackson-csv

Since the post from cowtowncoder is a bit old I figured I’d write a new blogpost in case someone else runs into this problem.

[code language=”xml”]<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>datadidit.helpful.hints</groupId>
<artifactId>parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>json-to-csv</artifactId>
<name>json-to-csv</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<dependencies>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-surefire-provider</artifactId>
<version>1.0.0-M1</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
[/code]

Note: Also took the time to try out JUnit 5.

Made a simple POJO for converting to JSON and CSV:

[code language=”java”]

package datadidit.helpful.hints.csv;

import java.util.Date;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;

@JsonPropertyOrder({"firstName", "lastName", "dob"})
public class Simple {
private String firstName;

private String lastName;

private Date dob;

public Simple(){
}

public Simple(String firstName, String lastName, Date dob) {
super();
this.firstName = firstName;
this.lastName = lastName;
this.dob = dob;
}

public String getFirstName() {
return firstName;
}

public void setFirstName(String firstName) {
this.firstName = firstName;
}

public String getLastName() {
return lastName;
}

public void setLastName(String lastName) {
this.lastName = lastName;
}

public Date getDob() {
return dob;
}

public void setDob(Date dob) {
this.dob = dob;
}
}

[/code]

Then wrote a Unit test for testing the code out:

[code language=”java”]

package datadidit.helpful.hints.csv;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;

public class JsonToCsvTest {
private ObjectMapper mapper;

private CsvMapper csvMapper;

@BeforeEach
public void setup(){
csvMapper = new CsvMapper();
mapper = new ObjectMapper();
}
@Test
public void CreateJson() throws JsonProcessingException{
List<Simple> simpleList = new ArrayList<>();
simpleList.add(new Simple("hello", "world", new Date()));

String json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(simpleList);

System.out.println(json);

System.out.println("Now as CSV: ");
CsvSchema schema = csvMapper.schemaFor(Simple.class).withHeader();
System.out.println(csvMapper.writer(schema).writeValueAsString(simpleList));
}
}

[/code]

Output as JSON:

[code language=”text”]

[ {
"firstName" : "hello",
"lastName" : "world",
"dob" : 1468293742743
} ]

[/code]

Output as CSV:

[code language=”text”]

firstName,lastName,dob
hello,world,1468075423437

[/code]