12/30/2014

Hadoop Partitioner Example

After sort and merge completion, If we would like to partition the records to multiple files we should use Portitioner.

Below example partition the data  the on Year. records corresponding to same year go to same file.

job.setNumReduceTasks(5); is mandatory to split the data to multiple files else all data goes to single reducer file.

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); added to avoid creating empty reducer files. 


Input (text.txt)
------------------
1950,50,10
1950,20,10
1950,30,20
1960,30,20
1960,30,20
1960,30,20
1970,30,20
1970,30,20
1970,30,20
1970,30,20
1980,30,20
1980,30,20
1980,30,20
1980,30,20
1990,30,20
2000,30,20

output
---------------------
file 1                                        
1950,50,10
1950,20,10
1950,30,20
file 2
1960,30,20
1960,30,20
1960,30,20
.--------
-------


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.my.cert.example;

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PartitionExample extends Configured implements Tool {

 public static class ExamleMapper extends
   Mapper<LongWritable, Text, Text, Text> {

  protected void map(LongWritable key, Text value, Context context)
    throws java.io.IOException, java.lang.InterruptedException {
   String values[] = value.toString().split(",");
   context.write(new Text(values[0]), value);
  }

 }

 public static class YearPartitioner extends Partitioner<Text, Text> {

  @Override
  public int getPartition(Text key, Text value, int numPartitions) {

   int year = Integer.parseInt(key.toString());

   if (year < 1960) {
    return 0;
   }
   else if (year < 1970) {
    return 1;
   } 
   else if (year < 1980) {
    return 2;
   }
   else if (year < 1990) {
    return 3;
   }
    return 4;
  }

 }

 public static class ExamleReducer extends
   Reducer<Text, Text, Text, NullWritable> {

  protected void reduce(Text key, Iterable<Text> value, Context context)
    throws IOException, InterruptedException {

   for (Text text : value) {

    context.write(text, NullWritable.get());
   }
  }

 }

 @Override
 public int run(String[] args) throws Exception {
  Job job = new Job(getConf());
  job.setJarByClass(PartitionExample.class);
  Path inputPath = new Path("C:\\hadoop\\test\\test.txt");
  Path outputDir = new Path("C:\\hadoop\\test\\test1");
  FileInputFormat.addInputPath(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setMapperClass(PartitionExample.ExamleMapper.class);
  job.setReducerClass(PartitionExample.ExamleReducer.class);
  job.setPartitionerClass(PartitionExample.YearPartitioner.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  job.setNumReduceTasks(5);
  job.waitForCompletion(true);
  return 0;
 }

 public static void main(String[] args) throws Exception,
   ClassNotFoundException, InterruptedException {
  ToolRunner.run(new PartitionExample(), args);
 }
}

No comments:

Post a Comment