12/30/2014

Creating Multiple output folder in Hadoop Using MultipleOutputs

Below is the sample  input file. first column is the Year. Requirement is need to create dynamic folder for each year and corresponding record should move to corresponding year folder.


text.txt
----------------------------------------------
1950,50,10
1950,20,10   
1950,30,20
1960,30,20
1960,30,20
1960,30,20
1970,30,20
1970,30,20
1970,30,20
1970,30,20
1980,30,20
1980,30,20
1980,30,20
1980,30,20

output
---------------------------------------


below is the Map reduce program. 


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.my.cert.example;

import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipulOutputExample extends Configured implements Tool {

 public static class MultipleMapper extends
   Mapper<LongWritable, Text, Text, LongWritable> {
  Pattern pattern = null;
  MultipleOutputs multipleOutputs;

  @Override()
  protected void setup(Context context) throws java.io.IOException,
    java.lang.InterruptedException {
   multipleOutputs = new MultipleOutputs(context);
  }

  protected void map(LongWritable key, Text value,
    Mapper<LongWritable, Text, Text, LongWritable>.Context context)
    throws java.io.IOException, java.lang.InterruptedException {
   String values[] = value.toString().split(",");
   String fileName = generateFileName(values[0]);
   multipleOutputs.write(NullWritable.get(), value, fileName);
  }

  @Override()
  protected void cleanup(Context context) throws java.io.IOException,
    InterruptedException {
   multipleOutputs.close();
  }

  private static String generateFileName(String values) {
   return values + "/" + values;
  }

 }

 @Override
 public int run(String[] args) throws Exception {
  Job job = new Job(getConf());
  job.setJarByClass(MultipulOutputExample.class);
  Path inputPath = new Path("C:\\hadoop\\test\\test.txt");
  Path outputDir = new Path("C:\\hadoop\\test\\test1");
  FileInputFormat.addInputPath(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setMapperClass(MultipulOutputExample.MultipleMapper.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  job.setNumReduceTasks(0);
  job.waitForCompletion(true);
  return 0;
 }

 public static void main(String[] args) throws Exception,
   ClassNotFoundException, InterruptedException {
  ToolRunner.run(new MultipulOutputExample(), args);
 }
}

No comments:

Post a Comment