hadoop0.18.3 到 0.20.2

以前用的是0.18.3,现在改用0.20.2,結果发现mapreduce的接口变了好多,而《mapreduce 权威指南》这本书上还是0.18.3的接口 ,这里记录一下今天下午的探索:

 最大的变化是作業配置那部分,新的版本里面不再使用JobConf, 而是使用了Job,这里的Job继承自JobContext,它集成了JobConf 。

Job里面还是用了相同的设置inputPath, outputPath, inputFormat, outputFormat之类的,主要的不同我认为有以下几个:

1. 初始化不一样,

     前者: JobConf conf = new  JobConf(getConf(), WordCount.class ); 

     后才: Job job = new  Job(conf, "word count" );

2. 执行不同:

     前者:  JobClient.runJob(conf)

     后才:job.waitForCompletion(true )

3.  最隐含是变化:

     前者:setMapperClass(class<? extends MapReduceBase implements Mapper>) 和 setReducerClass(class<? extends MapReducerBase implements Reducer>)

     后者:setMapperClass(class<? extends Mapper>) 和 setReducerClass(class<? extends Reducer>)

   

    也就是说Map类和Reduce也有所变化,并且在import的时候要注意,

   前者的mapper类和reduce类不仅要extends xxxbase父类,而且要implements mapper和reduce 接口,且

    import org.apache.hadoop.mapred.MapReduceBase,

    import org.apache.hadoop.mapred.Mapper;   import org.apache.hadoop.mapred.Reducer;

   后才的mapper类和reduce类只要extends Mapper Reducer父类。

具体的比较程序如下:

前者出自《mapreduce 权威指南》,是旧版本的一个程序:

Mapper类:

import java.io.IOException;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapred.MapReduceBase;

importorg.apache.hadoop.mapred.Mapper;

importorg.apache.hadoop.mapred.OutputCollector;

importorg.apache.hadoop.mapred.Reporter;

publicclassMaxTemperatureMapperextendsMapReduceBase

implementsMapper<LongWritable,Text,Text,IntWritable>{

privatestaticfinalintMISSING=9999;

publicvoidmap(LongWritablekey,Textvalue,

OutputCollector<Text,IntWritable>output,Reporterreporter)

throwsIOException{

}

}

Stringline=value.toString();

Stringyear=line.substring(15,19);

intairTemperature;

if(line.charAt(87)=='+'){//parseIntdoesn'tlikeleadingplussigns

airTemperature=Integer.parseInt(line.substring(88,92));

}else{

airTemperature=Integer.parseInt(line.substring(87,92));

}

Stringquality=line.substring(92,93);

if(airTemperature!=MISSING&&quality.matches("[01459]")){

output.collect(newText(year),newIntWritable(airTemperature));

}

Reducer类:

import java.io.IOException;

importjava.util.Iterator;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapred.MapReduceBase;

importorg.apache.hadoop.mapred.OutputCollector;

importorg.apache.hadoop.mapred.Reducer;

importorg.apache.hadoop.mapred.Reporter;

publicclassMaxTemperatureReducerextendsMapReduceBase

implementsReducer<Text,IntWritable,Text,IntWritable>{

publicvoidreduce(Textkey,Iterator<IntWritable>values,

OutputCollector<Text,IntWritable>output,Reporterreporter)

throwsIOException{

}

}

intmaxValue=Integer.MIN_VALUE;

while(values.hasNext()){

maxValue=Math.max(maxValue,values.next().get());

}

output.collect(key,newIntWritable(maxValue));

}

}

主类

import java.io.IOException;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapred.FileInputFormat;

importorg.apache.hadoop.mapred.FileOutputFormat;

importorg.apache.hadoop.mapred.JobClient;

importorg.apache.hadoop.mapred.JobConf;

publicclassMaxTemperature{

publicstaticvoidmain(String[]args)throwsIOException{

if(args.length!=2){

System.err.println("Usage:MaxTemperature<inputpath><outputpath>");

System.exit(-1);

}

JobConfconf=newJobConf(MaxTemperature.class);

conf.setJobName("Maxtemperature");

FileInputFormat.addInputPath(conf,newPath(args[0]));

FileOutputFormat.setOutputPath(conf,newPath(args[1]));

conf.setMapperClass(MaxTemperatureMapper.class);

conf.setReducerClass(MaxTemperatureReducer.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

}

}

JobClient.runJob(conf);

我修改后的新版本程序:

import java.io.IOException;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapred.OutputCollector;

importorg.apache.hadoop.mapred.Reporter;

publicclassMaxTemperatureMapperextendsMapper<LongWritable,Text,Text,IntWritable>{

privatestaticfinalintMISSING=9999;

publicvoidmap(LongWritablekey,Textvalue,

OutputCollector<Text,IntWritable>output,Reporterreporter)

throwsIOException{

//TODOAuto-generatedmethodstub

Stringline=value.toString();

Stringyear=line.substring(15,19);

intairTemperature;

if(line.charAt(87)=='+'){

airTemperature=Integer.parseInt(line.substring(88,92));

}else{

airTemperature=Integer.parseInt(line.substring(87,92));

}

Stringquality=line.substring(92,93);

if(airTemperature!=MISSING&&quality.matches("[01459]")){

output.collect(newText(year),newIntWritable(airTemperature));

}

}

}

import java.io.IOException;

importjava.util.Iterator;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapred.OutputCollector;

importorg.apache.hadoop.mapred.Reporter;

publicclassMaxTemperatureReducerextendsReducer<Text,IntWritable,Text,IntWritable>{

publicvoidreduce(Textkey,Iterator<IntWritable>values,

OutputCollector<Text,IntWritable>output,Reporterreporter)throwsIOException{

intmaxValue=Integer.MIN_VALUE;

while(values.hasNext()){

maxValue=Math.max(maxValue,values.next().get());

}

output.collect(key,newIntWritable(maxValue));

}

}

import java.io.IOException;

importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclassMaxTemperature{

publicstaticvoidmain(String[]argc)throwsIOException,InterruptedException,ClassNotFoundException{

if(argc.length!=2){

System.out.println("Usage:MaxTemperature<input><output>");

System.exit(-1);

}

Configurationconf=newConfiguration();

Jobj=newJob(conf,"MaxTemperature");

j.setJarByClass(MaxTemperature.class);

j.setMapperClass(MaxTemperatureMapper.class);

j.setReducerClass(MaxTemperatureReducer.class);

j.setOutputKeyClass(Text.class);

j.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(j,newPath(argc[0]));

FileOutputFormat.setOutputPath(j,newPath(argc[1]));

System.exit(j.waitForCompletion(true)?0:1);

}

}

参考: http://blog.csdn.net/amuseme_lu/archive/2010/05/13/5588545.aspx

转自:http://blog.csdn.net/JiaoYanChen/archive/2010/08/16/5816573.aspx