Showing posts with label Hadoop. Show all posts
Showing posts with label Hadoop. Show all posts

1/21/2015

From JAVA to HDFS File operations : Read, write, copy, delete, create


From JAVA to HDFS File operations : Read, write, copy, delete, create

1. creating the directory in the HDFS
2. deleting the directory in the HDF
3. copying file from local to HDFS
4. Read File From HDFS
5. Write File To HDFS

Change below configuration to your .xml files and use it. 


Configuration conf = new Configuration();
conf.addResource(new Path("C:/hadoop-2.5.1/etc/hadoop/core-site.xml"));
conf.addResource(new Path("C:/hadoop-2.5.1/etc/hadoop/hdfs-site.xml"));

-----------------------------------------------------------------------


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
package com.my.cert.example;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSFileOperations {

 public static void main(String args[]) throws IOException {
  Configuration conf = new Configuration();
  conf.addResource(new Path("C:/hadoop-2.5.1/etc/hadoop/core-site.xml"));
  conf.addResource(new Path("C:/hadoop-2.5.1/etc/hadoop/hdfs-site.xml"));
  FileSystem hdfs = FileSystem.get(conf);
  System.out.println("Home Dir: " + getHomedirectory(hdfs));
  System.out.println("Create Directory : "+ createDirectory(hdfs, "/testFolder"));
  System.out.println("copy File From Local: "+ copyFileFromLocal(hdfs, "testFolder","C:/hadoop/test/test.txt"));
  readDataFromHDFSFile(hdfs, "testFolder/test.txt");
  writingDataToHDFS(hdfs, "testFolder/test.txt");

 }

 public static Path getHomedirectory(FileSystem hdfc) throws IOException {
  Path homeDir = hdfc.getHomeDirectory();

  return homeDir;
 }

 /*
  * creating the directory in the HDFS
  */

 public static boolean createDirectory(FileSystem hdfs, String dirName)
   throws IOException {
  Path homeDir = getHomedirectory(hdfs);
  Path newFolderPath = new Path(dirName);
  newFolderPath = Path.mergePaths(homeDir, newFolderPath);
  if (hdfs.exists(newFolderPath)) {
   hdfs.delete(newFolderPath, true);

  }

  return hdfs.mkdirs(newFolderPath);
 }

 /*
  * deleting the directory in the HDFS
  */
 public static boolean deleteDirectory(FileSystem hdfs, String dirName)
   throws IOException {
  Path deleteFolderName = new Path(dirName);
  if (hdfs.exists(deleteFolderName)) {
   return hdfs.delete(deleteFolderName, true);
  }
  return false;
 }

 /*
  * copying file from local to HDFS
  */

 public static boolean copyFileFromLocal(FileSystem hdfs,
   String hdfsFolderName, String localFileAbsPath) throws IOException {
  Path localFilePath = new Path(localFileAbsPath);
  String localFileName = new File(localFileAbsPath).getName();
  Path hdfsFolderpath = new Path(hdfsFolderName + "/" + localFileName);

  if (!hdfs.exists(hdfsFolderpath)) {
   hdfs.createNewFile(hdfsFolderpath);
  }

  hdfs.copyFromLocalFile(localFilePath, hdfsFolderpath);
  return true;
 }

 public static void readDataFromHDFSFile(FileSystem hdfs, String filePath)
   throws IllegalArgumentException, IOException {
  BufferedReader bfr = new BufferedReader(new InputStreamReader(
    hdfs.open(new Path(filePath))));
  String str = null;
  while ((str = bfr.readLine()) != null) {
   System.out.println(str);

  }

 }

 public static void writingDataToHDFS(FileSystem hdfs, String filePath)
   throws IllegalArgumentException, IOException {
  StringBuilder sb = new StringBuilder();
  for (int i = 1; i <= 5; i++) {
   sb.append("Test creating file" + i);
   sb.append("\n");
  }
  byte[] byt = sb.toString().getBytes();
  FSDataOutputStream fsOutStream = hdfs.create(new Path(filePath));
  fsOutStream.write(byt);

  fsOutStream.close();
 }

}

Hadoop big data: Converting Text files to Sequence File

Hadoop big data: Converting Text files to Sequence File.

package com.my.hadoop.example3;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;



public class ConvertTextToSequenceFile {  
 
   @SuppressWarnings("deprecation")
   public static void main(String[] args) throws IOException,
           InstantiationException, IllegalAccessException {
       // TODO Auto-generated method stub

       Configuration conf = new Configuration();
       conf.addResource(new Path("C:/hadoop-2.5.1/etc/hadoop/core-site.xml"));
       conf.addResource(new Path("C:/hadoop-2.5.1/etc/hadoop/hdfs-site.xml"));
       FileSystem fs = FileSystem.get(conf);
       Path inputFile = new Path("word/test1.txt");
       FSDataInputStream inputStream = fs.open(inputFile);
       Path outputFile = new Path("outputSEQ.lz");
       IntWritable key = new IntWritable();
       int count = 0;
       Text value = new Text();    
       String str;
       SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,outputFile, key.getClass(), value.getClass());
       while (inputStream.available() > 0) {
           key.set(count++);
           str = inputStream.readLine();
           value.set(str);
           writer.append(key, value);
       }
       fs.close();
       IOUtils.closeStream(writer);
       System.out.println("SEQUENCE FILE CREATED SUCCESSFULLY........");
   }}

1/02/2015

Example to setting Map reduce properties from command prompt using -D

Execute command
----------------------------

>hadoop jar MapReduce-0.0.1-SNAPSHOT.jar com.my.cert.example.WordCount -D mapred.reduce.tasks=2  word  file:///c:/hadoop/test1

if  we need multiple arguments we should pass as below.

hadoop jar MapReduce-0.0.1-SNAPSHOT.jar com.my.cert.example.WordCount -D mapred.reduce.tasks=2 -D mapred.map.tasks=2 word file:///c:/hadoop/test1

The command is setting the number of reducer task to 2.  The command output the result to my windows local directory file:///c:/hadoop/test1.  

"word" is input directory having text files  and contains data something like below

text.txt
-----------------------------------
A A A
B B B
C C C

---------------------------
we can pass/set map reduce configuration parameters ( -D mapred.reduce.tasks=2) to driver class. The class must "extends Configured implements Tool" as in below java map reduce program

public class WordCount extends Configured implements Tool

-------
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.my.cert.example;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {
 public static void main(String[] args) throws Exception {
  
   int res = ToolRunner.run(new Configuration(), new WordCount(), args);
   System.exit(res);
 }

 public static class ProjectionMapper extends
   Mapper<LongWritable, Text, Text, LongWritable> {
  private Text word = new Text();
  private LongWritable count = new LongWritable();

  @Override
  protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
   // value is tab separated values: word, year, occurrences, #books,
   // #pages
   // we project out (word, occurrences) so we can sum over all years
   String[] split = value.toString().split("\\s+");
   for (String  text : split) {
    word.set(text);
    context.write(word, new LongWritable(1));
   }
   
  }
 }

 public static class LongSumReducer extends
   Reducer<Text, LongWritable, Text, LongWritable> {

  private LongWritable result = new LongWritable();
  
  public void reduce(Text key, Iterable<LongWritable> values,
    Context context) throws IOException, InterruptedException {
   long sum = 0;
   for (LongWritable val : values) {
    sum += val.get();
   }
   
   result.set(sum);
   context.write(key, result);
  }
  @Override
  protected void cleanup(
    org.apache.hadoop.mapreduce.Reducer.Context context)
    throws IOException, InterruptedException {
   // TODO Auto-generated method stub
   
   super.cleanup(context);
  }
 }


 @Override
 public int run(String[] args) throws Exception {

  Path inputPath = new Path(args[0]);
  Path outputDir = new Path(args[1]);

  // Create configuration
  Configuration conf = this.getConf();

  // Create job
  Job job = new Job(conf, "WordCount");
  job.setJarByClass(WordCount.class);

  // Setup MapReduce
  job.setMapperClass(WordCount.ProjectionMapper.class);
  job.setReducerClass(WordCount.LongSumReducer.class);
  job.setNumReduceTasks(1);

  // Specify key / value
  // job.setMapOutputKeyClass(Text.class);
  // job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);

  // Input
  FileInputFormat.addInputPath(job, inputPath);
  job.setInputFormatClass(TextInputFormat.class);

  // Output
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setOutputFormatClass(TextOutputFormat.class);
  int code = job.waitForCompletion(true) ? 0 : 1;
 

 
  return code;
 }
}

12/30/2014

Hadoop JAVA Map Reduce Sort by Value


Input  (test.txt)
------------
1,50
2,20
3,30
4,10
5,15
6,25
7,55
8,35
9,70

output
----------------------------
9 70
7 55
1 50
8 35
3 30
6 25
2 20
5 15

4 10



package com.my.cert.example;

import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class ValueSortExp {
 public static void main(String[] args) throws Exception {

  Path inputPath = new Path("C:\\hadoop\\test\\test.txt");
  Path outputDir = new Path("C:\\hadoop\\test\\test1");

  // Path inputPath = new Path(args[0]);
  // Path outputDir = new Path(args[1]);

  // Create configuration
  Configuration conf = new Configuration(true);

  // Create job
  Job job = new Job(conf, "Test HIVE commond");
  job.setJarByClass(ValueSortExp.class);

  // Setup MapReduce
  job.setMapperClass(ValueSortExp.MapTask.class);
  job.setReducerClass(ValueSortExp.ReduceTask.class);
  job.setNumReduceTasks(1);

  // Specify key / value
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(IntWritable.class);
  job.setOutputKeyClass(IntWritable.class);
  job.setOutputValueClass(IntWritable.class);
  job.setSortComparatorClass(IntComparator.class);
  // Input
  FileInputFormat.addInputPath(job, inputPath);
  job.setInputFormatClass(TextInputFormat.class);

  // Output
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setOutputFormatClass(TextOutputFormat.class);

  /*
   * // Delete output if exists FileSystem hdfs = FileSystem.get(conf); if
   * (hdfs.exists(outputDir)) hdfs.delete(outputDir, true);
   * 
   * // Execute job int code = job.waitForCompletion(true) ? 0 : 1;
   * System.exit(code);
   */

  // Execute job
  int code = job.waitForCompletion(true) ? 0 : 1;
  System.exit(code);

 }
 
 public static class IntComparator extends WritableComparator {

     public IntComparator() {
         super(IntWritable.class);
     }

     @Override
     public int compare(byte[] b1, int s1, int l1,
             byte[] b2, int s2, int l2) {

         Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
         Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

         return v1.compareTo(v2) * (-1);
     }
 }

 public static class MapTask extends
   Mapper<LongWritable, Text, IntWritable, IntWritable> {
  public void map(LongWritable key, Text value, Context context)
    throws java.io.IOException, InterruptedException {
   String line = value.toString();
   String[] tokens = line.split(","); // This is the delimiter between
   int keypart = Integer.parseInt(tokens[0]);
   int valuePart = Integer.parseInt(tokens[1]);
   context.write(new IntWritable(valuePart), new IntWritable(keypart));

  }
 }

 public static class ReduceTask extends
   Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  public void reduce(IntWritable key, Iterable<IntWritable> list, Context context)
    throws java.io.IOException, InterruptedException {
   
   for (IntWritable value : list) {
    
    context.write(value,key);
    
   }
   
  }
 }

}

Maven dependencies for Hadoop 2.5.1

Create New Maven project from eclipse. and replace default dependencies with below in Pom.mxl file.  right click the project ->Run as->do Maven clean and Maven install.  dependency jars get downloads and project setup is ready to right Mappers,reducers and drivers.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
<repositories>
  <repository>
   <!-- Central Repository -->
   <id>central</id>
   <url>http://repo1.maven.org/maven2/</url>
   <releases>
    <enabled>true</enabled>
   </releases>
   <snapshots>
    <enabled>true</enabled>
   </snapshots>
  </repository>
  <repository>
   <!-- Cloudera Repository -->
   <id>cloudera</id>
   <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
   <releases>
    <enabled>true</enabled>
   </releases>
   <snapshots>
    <enabled>true</enabled>
   </snapshots>
  </repository>
 </repositories>
 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>

 <dependencies>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-hdfs</artifactId>
   <version>2.5.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-auth</artifactId>
   <version>2.5.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-common</artifactId>
   <version>2.5.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
   <version>2.5.1</version>
  </dependency>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit-dep</artifactId>
   <version>4.8.2</version>
  </dependency>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>3.8.1</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.apache.mrunit</groupId>
   <artifactId>mrunit</artifactId>
   <version>1.0.0</version>
   <classifier>hadoop2</classifier>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.mockito</groupId>
   <artifactId>mockito-all</artifactId>
   <version>1.10.8</version>
  </dependency>

  <dependency>
   <groupId>org.apache.pig</groupId>
   <artifactId>pig</artifactId>
   <version>0.10.0</version>
  </dependency>
 </dependencies>

Creating Multiple output folder in Hadoop Using MultipleOutputs

Below is the sample  input file. first column is the Year. Requirement is need to create dynamic folder for each year and corresponding record should move to corresponding year folder.


text.txt
----------------------------------------------
1950,50,10
1950,20,10   
1950,30,20
1960,30,20
1960,30,20
1960,30,20
1970,30,20
1970,30,20
1970,30,20
1970,30,20
1980,30,20
1980,30,20
1980,30,20
1980,30,20

output
---------------------------------------


below is the Map reduce program. 


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.my.cert.example;

import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipulOutputExample extends Configured implements Tool {

 public static class MultipleMapper extends
   Mapper<LongWritable, Text, Text, LongWritable> {
  Pattern pattern = null;
  MultipleOutputs multipleOutputs;

  @Override()
  protected void setup(Context context) throws java.io.IOException,
    java.lang.InterruptedException {
   multipleOutputs = new MultipleOutputs(context);
  }

  protected void map(LongWritable key, Text value,
    Mapper<LongWritable, Text, Text, LongWritable>.Context context)
    throws java.io.IOException, java.lang.InterruptedException {
   String values[] = value.toString().split(",");
   String fileName = generateFileName(values[0]);
   multipleOutputs.write(NullWritable.get(), value, fileName);
  }

  @Override()
  protected void cleanup(Context context) throws java.io.IOException,
    InterruptedException {
   multipleOutputs.close();
  }

  private static String generateFileName(String values) {
   return values + "/" + values;
  }

 }

 @Override
 public int run(String[] args) throws Exception {
  Job job = new Job(getConf());
  job.setJarByClass(MultipulOutputExample.class);
  Path inputPath = new Path("C:\\hadoop\\test\\test.txt");
  Path outputDir = new Path("C:\\hadoop\\test\\test1");
  FileInputFormat.addInputPath(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setMapperClass(MultipulOutputExample.MultipleMapper.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  job.setNumReduceTasks(0);
  job.waitForCompletion(true);
  return 0;
 }

 public static void main(String[] args) throws Exception,
   ClassNotFoundException, InterruptedException {
  ToolRunner.run(new MultipulOutputExample(), args);
 }
}

Hadoop Partitioner Example

After sort and merge completion, If we would like to partition the records to multiple files we should use Portitioner.

Below example partition the data  the on Year. records corresponding to same year go to same file.

job.setNumReduceTasks(5); is mandatory to split the data to multiple files else all data goes to single reducer file.

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); added to avoid creating empty reducer files. 


Input (text.txt)
------------------
1950,50,10
1950,20,10
1950,30,20
1960,30,20
1960,30,20
1960,30,20
1970,30,20
1970,30,20
1970,30,20
1970,30,20
1980,30,20
1980,30,20
1980,30,20
1980,30,20
1990,30,20
2000,30,20

output
---------------------
file 1                                        
1950,50,10
1950,20,10
1950,30,20
file 2
1960,30,20
1960,30,20
1960,30,20
.--------
-------


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.my.cert.example;

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PartitionExample extends Configured implements Tool {

 public static class ExamleMapper extends
   Mapper<LongWritable, Text, Text, Text> {

  protected void map(LongWritable key, Text value, Context context)
    throws java.io.IOException, java.lang.InterruptedException {
   String values[] = value.toString().split(",");
   context.write(new Text(values[0]), value);
  }

 }

 public static class YearPartitioner extends Partitioner<Text, Text> {

  @Override
  public int getPartition(Text key, Text value, int numPartitions) {

   int year = Integer.parseInt(key.toString());

   if (year < 1960) {
    return 0;
   }
   else if (year < 1970) {
    return 1;
   } 
   else if (year < 1980) {
    return 2;
   }
   else if (year < 1990) {
    return 3;
   }
    return 4;
  }

 }

 public static class ExamleReducer extends
   Reducer<Text, Text, Text, NullWritable> {

  protected void reduce(Text key, Iterable<Text> value, Context context)
    throws IOException, InterruptedException {

   for (Text text : value) {

    context.write(text, NullWritable.get());
   }
  }

 }

 @Override
 public int run(String[] args) throws Exception {
  Job job = new Job(getConf());
  job.setJarByClass(PartitionExample.class);
  Path inputPath = new Path("C:\\hadoop\\test\\test.txt");
  Path outputDir = new Path("C:\\hadoop\\test\\test1");
  FileInputFormat.addInputPath(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setMapperClass(PartitionExample.ExamleMapper.class);
  job.setReducerClass(PartitionExample.ExamleReducer.class);
  job.setPartitionerClass(PartitionExample.YearPartitioner.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  job.setNumReduceTasks(5);
  job.waitForCompletion(true);
  return 0;
 }

 public static void main(String[] args) throws Exception,
   ClassNotFoundException, InterruptedException {
  ToolRunner.run(new PartitionExample(), args);
 }
}