I will attempt to explain how to setup a map, reduce, Combiner, Path Filter, Partitioner, Outputer using Java Eclipse with Maven. If you need to know how to install Eclipse go here. Remember that these are not complete code just snipets to get you going.
A starting point I used was this tutorial however it was built using older Hadoop code.
Mapper: Maps input key/value pairs to a set of intermediate key/value pairs.
Reducer: Reduces a set of intermediate values which share a key to a smaller set of values.
Partitioner: http://www.tutorialspoint.com/map_reduce/map_reduce_partitioner.htm
Combiner: http://www.tutorialspoint.com/map_reduce/map_reduce_combiners.htm
First you will need to create a maven project. You can follow any tutorial on how to do that if you don’t know how.
pom.xml:
<properties> <hadoop.version>2.7.2</hadoop.version> </properties> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-nodemanager</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId> <version>${hadoop.version}</version> </dependency>
Job Driver:
public class JobDriver extends Configured implements Tool { private Configuration conf; private static String hdfsURI = "hdfs://localhost:54310"; public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new JobDriver(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { BasicConfigurator.configure(); conf = this.getConf(); //The paths for the configuration final String HADOOP_HOME = System.getenv("HADOOP_HOME"); conf.addResource(new Path(HADOOP_HOME, "etc/hadoop/core-site.xml")); conf.addResource(new Path(HADOOP_HOME, "etc/hadoop/hdfs-site.xml")); conf.addResource(new Path(HADOOP_HOME, "etc/hadoop/yarn-site.xml")); hdfsURI = conf.get("fs.defaultFS"); Job job = Job.getInstance(conf, YOURJOBNAME); //You can setup additional configuration information by doing the below. job.getConfiguration().set("NAME", "VALUE"); job.setJarByClass(JobDriver.class); //If you are going to use a mapper class job.setMapperClass(MAPPERCLASS.class); //If you are going to use a combiner class job.setCombinerClass(COMBINERCLASS.class); //If you plan on splitting the output job.setPartitionerClass(PARTITIONERCLASS.class); job.setNumReduceTasks(NUMOFREDUCERS); //if you plan on use a reducer job.setReducerClass(REDUCERCLASS.class); //You need to set the output key and value types. We will just use Text for this example job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //If you want to use an input filter class FileInputFormat.setInputPathFilter(job, INPUTPATHFILTER.class); //You must setup what the input path is for the files you want to parse. It takes either string or Path FileInputFormat.setInputPaths(job, inputPaths); //Once you parse the data you must put it somewhere. job.setOutputFormatClass(OUTPUTFORMATCLASS.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUTPATH)); return job.waitForCompletion(true) ? 0 : 1; } }
INPUTPATHFILTER:
public class InputPathFilter extends Configured implements PathFilter { Configuration conf; FileSystem fs; Pattern includePattern = null; Pattern excludePattern = null; @Override public void setConf(Configuration conf) { this.conf = conf; if (conf != null) { try { fs = FileSystem.get(conf); //If you want you can always pass in regex patterns from the job driver class and filter that way. Up to you! if (conf.get("file.includePattern") != null) includePattern = conf.getPattern("file.includePattern", null); if (conf.get("file.excludePattern") != null) excludePattern = conf.getPattern("file.excludePattern", null); } catch (IOException e) { e.printStackTrace(); } } } @Override public boolean accept(Path path) { //Here you could filter based on your include or exclude regex or file size. //Remember if you have sub directories you have to return true for that if (fs.isDirectory(path)) { return true; } else { //You can also do this to get file size in case you want to do anything when files are certains size, etc FileStatus file = fs.getFileStatus(path); String size = FileUtils.byteCountToDisplaySize(file.getLen()); //You can also move files in this section boolean move_success = fs.rename(path, new Path(NEWPATH + path.getName())); } } }
MAPPERCLASS:
//Remember at the beginning I said we will use key and value as Text. That is the second part of the extends mapper public class MyMapper extends Mapper<LongWritable, Text, Text, Text> { //Do whatever setup you would like. Remember in the job drive you could set things to configuration well you can access them here now @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); } //This is the main map method. @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //This will get the file name you are currently processing if you want. However not necessary. String filename = ((FileSplit) context.getInputSplit()).getPath().toString(); //Do whatever you want in the mapper. The context is what you print out to. //If you want to embed javascript go <a href="http://www.gaudreault.ca/java-embed-javascript/" target="_blank">here</a>. //If you want to embed Python go <a href="http://www.gaudreault.ca/java-embed-python/" target="_blank">here</a>. } }
If you decided to embed Python or JavaScript you will need these scripts as an example. map_python and map
COMBINERCLASS:
public class MyCombiner extends Reducer<Text, Text, Text, Text> { //Do whatever setup you would like. Remember in the job drive you could set things to configuration well you can access them here now @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); } @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //Do whatever you want in the mapper. The context is what you print out to. //If you want to embed javascript go <a href="http://www.gaudreault.ca/java-embed-javascript/" target="_blank">here</a>. //If you want to embed Python go <a href="http://www.gaudreault.ca/java-embed-python/" target="_blank">here</a>. } }
If you decided to embed Python or JavaScript you will need these scripts as an example. combiner_python and combiner_js
REDUCERCLASS:
public class MyReducer extends Reducer<Text, Text, Text, Text> { //Do whatever setup you would like. Remember in the job drive you could set things to configuration well you can access them here now @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); } @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //Do whatever you want in the mapper. The context is what you print out to. //If you want to embed javascript go <a href="http://www.gaudreault.ca/java-embed-javascript/" target="_blank">here</a>. //If you want to embed Python go <a href="http://www.gaudreault.ca/java-embed-python/" target="_blank">here</a>. } }
If you decided to embed Python or JavaScript you will need these scripts as an example. reduce_python and reduce_js
PARTITIONERCLASS:
public class MyPartitioner extends Partitioner<Text, Text> implements Configurable { private Configuration conf; @Override public Configuration getConf() { return conf; } //Do whatever setup you would like. Remember in the job drive you could set things to configuration well you can access them here now @Override public void setConf(Configuration conf) { this.conf = conf; } @Override public int getPartition(Text key, Text value, int numReduceTasks) { Integer partitionNum = 0; //Do whatever logic you would like to figure out the way you want to partition. //If you want to embed javascript go <a href="http://www.gaudreault.ca/java-embed-javascript/" target="_blank">here</a>. //If you want to embed Python go <a href="http://www.gaudreault.ca/java-embed-python/" target="_blank">here</a>. return partionNum; } }
If you decided to embed Python or JavaScript you will need these scripts as an example. partitioner_python and partitioner_js
OUTPUTFORMATCLASS:
public class MyOutputFormat<K, V> extends FileOutputFormat<K, V> { protected static int outputCount = 0; protected static class JsonRecordWriter<K, V> extends RecordWriter<K, V> { protected DataOutputStream out; public JsonRecordWriter(DataOutputStream out) throws IOException { this.out = out; } @Override public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { out.writeBytes(WRITE_WHATEVER_YOU_WANT); out.close(); } @Override public void write(K key, V value) throws IOException, InterruptedException { //write the value //You could also send to a database here if you wanted. Up to you how you want to deal with it. } } @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext tac) throws IOException, InterruptedException { Configuration conf = tac.getConfiguration(); Integer numReducers = conf.getInt("mapred.reduce.tasks", 0); //you can set output filename in the config from the job driver if you want String outputFileName = conf.get("outputFileName"); outputCount++; //If you used a partitioner you need to split out the content so you should break the output filename into parts if (numReducers > 1) { //Do whatever logic you need to in order to get unique filenames per split } Path file = FileOutputFormat.getOutputPath(tac); Path fullPath = new Path(file, outputFileName); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileout = fs.create(fullPath); return new JsonRecordWriter<K, V>(fileout); } }
You must be logged in to post a comment.