Browsed by
Category: Java

An Example CSV to Mongo Dataflow

An Example CSV to Mongo Dataflow

Do you have a bunch of CSV files laying around that you’d like to be able to run queries against? This post is going to discuss a way to do that with Apache Camel and a processor I wrote.

Prerequisites:

  • Ability to run a mvn camel:run
  • A MongoDB installation available

Technologies used:

Wrote a Custom camel processor to take in a CSV file and turn it into JSON. Jackson was my tool of choice for converting the CSV to a JSON file. Below you can see the code for the custom processor:

package datadidit.helpful.hints.camel;
import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
/**
* Camel Processor that reads in a CSV file and produces a JSON Exchange for
* each line.
*
* Helpful Example:
* http://camel.apache.org/how-do-i-write-a-custom-processor-which-sends-multiple-messages.html
* http://stackoverflow.com/questions/19766266/convert-csv-file-directly-to-json-file-using-jackson-library
*/
public class CSVToJson implements Processor{
private Logger logger = Logger.getLogger(CSVToJson.class.getName());
ProducerTemplate producer;
private Boolean header;
private Boolean fieldNames;
private CsvSchema schema;
public CSVToJson(Boolean header, String fieldNames) throws Exception{
if(!header && fieldNames!=null){
Builder build = CsvSchema.builder();
for(String field : fieldNames.split(",")){
build.addColumn(field, CsvSchema.ColumnType.NUMBER_OR_STRING);
}
schema = build.build();
}else if(header && fieldNames!=null && !fieldNames.equals("")){
schema = this.buildCsvSchema(fieldNames, header);
}else if(!header && fieldNames==null){
throw new Exception("File must either contain headers or you must provide them..");
}else{
schema = CsvSchema.emptySchema().withHeader();
}
}
//TODO: Should probably do it this way at some point....
private CsvSchema buildCsvSchema(String fieldNames, Boolean withHeader){
Builder build = CsvSchema.builder();
for(String field : fieldNames.split(",")){
String[] fieldWithType = field.split("#");
if(fieldWithType.length==2){
logger.info("Field: "+fieldWithType[0]);
logger.info("Type: "+fieldWithType[1]);
build.addColumn(fieldWithType[0], CsvSchema.ColumnType.valueOf(fieldWithType[1]));
}else{
build.addColumn(field);
}
}
if(withHeader){
return build.build().withHeader();
}
return build.build();
}
public void process(Exchange arg0) throws Exception {
InputStream stream = arg0.getIn().getBody(InputStream.class);
List<Map<?, ?>> objects = readObjectsFromCsv(stream);
for(Map<?,?> map : objects){
//Create JSON
final String json = writeAsJson(map);
producer.send(new Processor(){
public void process(Exchange outExchange){
outExchange.getIn().setBody(json);
}
});
}
//TODO:If you don't close the stream this processor will continue to try and process the exchange...
stream.close();
}
public List<Map<?, ?>> readObjectsFromCsv(InputStream file) throws IOException {
CsvMapper csvMapper = new CsvMapper();
String csv = IOUtils.toString(file, "UTF-8");
MappingIterator<Map<?, ?>> mappingIterator = csvMapper.readerFor(Map.class).with(schema).readValues(csv);
return this.fixMap(mappingIterator.readAll());
}
public List<Map<?,?>> readObjectsFromCsv(String fileContent) throws JsonProcessingException, IOException{
CsvMapper csvMapper = new CsvMapper();
MappingIterator<Map<?, ?>> mappingIterator = csvMapper.readerFor(Map.class).with(schema).readValues(fileContent);
return this.fixMap(mappingIterator.readAll());
}
//TODO: This is a HACK, use library or submit bug
public List<Map<?,?>> fixMap(List<Map<?,?>> map){
List<Map<?,?>> newList = new ArrayList<>();
for(Map<?, ?> entry : map){
Map<String,Object> newMap = new HashMap<String,Object>();
for(Map.Entry<?, ?> mEntry : entry.entrySet()){
String value = mEntry.getValue().toString();
//Need to remove leading . for isNumeric to work with Doubles
if(value.startsWith(".") && StringUtils.isNumeric(value.substring(1))){
newMap.put(mEntry.getKey().toString(), Double.parseDouble(value));
}else if(StringUtils.isNumeric(mEntry.getValue().toString())){
newMap.put(mEntry.getKey().toString(), Integer.parseInt(value));
}else{
newMap.put(mEntry.getKey().toString(), mEntry.getValue().toString());
}
}
newList.add(newMap);
}
return newList;
}
public String writeAsJson(List<Map<?, ?>> data) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(data);
}
/**
* Taken from: https://itexpertsconsultant.wordpress.com/2016/08/03/how-to-readwrite-csv-file-to-map-in-java/
* @param listOfMap
* @param writer
* @throws IOException
*/
public void csvWriter(List<Map<?, ?>> listOfMap, Writer writer) throws IOException {
CsvSchema schema = null;
CsvSchema.Builder schemaBuilder = CsvSchema.builder();
if (listOfMap != null && !listOfMap.isEmpty()) {
for (Object col : listOfMap.get(0).keySet()) {
schemaBuilder.addColumn(col.toString());
}
schema = schemaBuilder.build().withLineSeparator("\r").withHeader();
}
CsvMapper mapper = new CsvMapper();
mapper.writer(schema).writeValues(writer).writeAll(listOfMap);
writer.flush();
}
public String writeAsJson(Map<?, ?> data) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(data);
}
public void setProducer(ProducerTemplate producer){
this.producer = producer;
}
public Boolean getHeader() {
return header;
}
public void setHeader(Boolean header) {
this.header = header;
}
public Boolean getFieldNames() {
return fieldNames;
}
public void setFieldNames(Boolean fieldNames) {
this.fieldNames = fieldNames;
}
}
view raw CSVToJson.java hosted with ❤ by GitHub

Once the custom processor was in place I used Camels XML DSL which allows users to use camel without writing any code:

<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<template id="myTemplate" defaultEndpoint="direct:handleJson"/>
<!--
Route that takes in a CSV file and outputs JSON
-->
<route id="csv-to-json">
<from uri="file:///Users/mkwyc_000/workspace/git/apache-camel/process-csv?delete=true" />
<process ref="csvToJson" />
<to uri="file:///Users/mkwyc_000/workspace/git/apache-camel/process-complete" />
</route>
<!--
Route to turn CSV into JSON and insert into Mongo
-->
<route id="handleJson">
<from uri="direct:handleJson" />
<log message="Got ${body}" />
<to uri="mongodb:mongoBean?database=example&amp;collection=testCollection&amp;operation=insert" />
</route>
</camelContext>
<!--
Bean instantiation for Mongo Client
-->
<bean id="mongoBean" class="com.mongodb.Mongo">
<constructor-arg name="host" value="127.0.0.1" />
<constructor-arg name="port" value="27017" />
</bean>
<!--
Bean instantiation of CSVToJson
-->
<bean id="csvToJson" class="datadidit.helpful.hints.camel.CSVToJson" >
<property name="producer" ref="myTemplate" />
<constructor-arg type="java.lang.Boolean" value="true"/>
<constructor-arg type="java.lang.String" value="" />
</bean>
</beans>
view raw camelContext.xml hosted with ❤ by GitHub

In the Camel Context above there are two routes csv-to-json and handleJson. The csv-to-json portion of the route first uses the file component to read in any data dropped into the directory defined in the ‘from’ URI. Then it passes that data to my custom processor which is defined by this bean:

<bean id="csvToJson" class="datadidit.helpful.hints.camel.CSVToJson" >
<property name="producer" ref="myTemplate" />
<constructor-arg type="java.lang.Boolean" value="true"/>
<constructor-arg type="java.lang.String" value="" />
</bean>
view raw camelContext.xml hosted with ❤ by GitHub

The custom processor reads in the Exchange(csv file) and uses Jackson to convert each line of the incoming CSV to JSON. The producerTemplate sends the produced JSON to the handleJson route. Once in the handleJson route I use Camels Mongodb component to insert the data into Mongodb using this line:

<route id="handleJson">
<from uri="direct:handleJson" />
<log message="Got ${body}" />
<to uri="mongodb:mongoBean?database=example&amp;collection=testCollection&amp;operation=insert" />
</route>
view raw camelContext.xml hosted with ❤ by GitHub

The mongo bean is defined here:

<bean id="mongoBean" class="com.mongodb.Mongo">
<constructor-arg name="host" value="127.0.0.1" />
<constructor-arg name="port" value="27017" />
</bean>
view raw camelContext.xml hosted with ❤ by GitHub

That’s all the code and xml it takes to get csv into MongoDB. Users can now put their CSV files into MongoDB easily by following these steps:

  1. Update the camel context variables to monitor the appropriate directories for your environment. You can also update the mongodb portion of the route to insert into the appropriate DB and Collection for your situation.
  2. Start mongodb there are multiple ways to get it up and running I just used ‘mongod –dbpath d:\mongodb-data’
  3. Run ‘mvn camel:run’ to get your route Camel route started
  4. Drop your data into the ‘process.csv’ directory you defined in your camel context.
  5. Log into mongodb by typing this command at the console: ‘mongo’
  6. In the console run ‘db.<‘collection name in route’>.find() to see all your data

 

JSON to CSV with Jackson

JSON to CSV with Jackson

Needed to convert some JSON output to CSV this week on one of my REST endpoints. Previously on this endpoint I was using Gson to do the conversion. Gson does not natively support JSON to CSV conversion. So though I love how easy it is for me to type:

[code language=”java”]
Gson gson = new Gson();

gson.toJson("Hello World");
[/code]

and easily retrtieve my Json. I switched over to using Jackson because it does support CSV conversion of my Pojos as well. When trying to figure this out the following links came in very handy:

stackoverflow

cowtowncoder

jackson-csv

Since the post from cowtowncoder is a bit old I figured I’d write a new blogpost in case someone else runs into this problem.

[code language=”xml”]<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>datadidit.helpful.hints</groupId>
<artifactId>parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>json-to-csv</artifactId>
<name>json-to-csv</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<dependencies>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-surefire-provider</artifactId>
<version>1.0.0-M1</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
[/code]

Note: Also took the time to try out JUnit 5.

Made a simple POJO for converting to JSON and CSV:

[code language=”java”]

package datadidit.helpful.hints.csv;

import java.util.Date;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;

@JsonPropertyOrder({"firstName", "lastName", "dob"})
public class Simple {
private String firstName;

private String lastName;

private Date dob;

public Simple(){
}

public Simple(String firstName, String lastName, Date dob) {
super();
this.firstName = firstName;
this.lastName = lastName;
this.dob = dob;
}

public String getFirstName() {
return firstName;
}

public void setFirstName(String firstName) {
this.firstName = firstName;
}

public String getLastName() {
return lastName;
}

public void setLastName(String lastName) {
this.lastName = lastName;
}

public Date getDob() {
return dob;
}

public void setDob(Date dob) {
this.dob = dob;
}
}

[/code]

Then wrote a Unit test for testing the code out:

[code language=”java”]

package datadidit.helpful.hints.csv;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;

public class JsonToCsvTest {
private ObjectMapper mapper;

private CsvMapper csvMapper;

@BeforeEach
public void setup(){
csvMapper = new CsvMapper();
mapper = new ObjectMapper();
}
@Test
public void CreateJson() throws JsonProcessingException{
List<Simple> simpleList = new ArrayList<>();
simpleList.add(new Simple("hello", "world", new Date()));

String json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(simpleList);

System.out.println(json);

System.out.println("Now as CSV: ");
CsvSchema schema = csvMapper.schemaFor(Simple.class).withHeader();
System.out.println(csvMapper.writer(schema).writeValueAsString(simpleList));
}
}

[/code]

Output as JSON:

[code language=”text”]

[ {
"firstName" : "hello",
"lastName" : "world",
"dob" : 1468293742743
} ]

[/code]

Output as CSV:

[code language=”text”]

firstName,lastName,dob
hello,world,1468075423437

[/code]