使用Hadoop处理数据时,在Reduce阶段
有时候,我们使用Hadoop处理数据时,在Reduce阶段,我们可能想对每一个输出的key进行单独输出一个目录或文件,这样方便数据分析,比如根据某个时间段对日志文件进行时间段归类等等。这时候我们就可以使用MultipleOutputs类,来搞定这件事,
下面,先来看下散仙的测试数据:
中国;我们 美国;他们 中国;123 中国人;善良 美国;USA 美国;在北美洲
中国;我们 美国;他们 中国;123 中国人;善良 美国;USA 美国;在北美洲
输出结果:预期输出结果是:
中国一组,美国一组,中国人一组
核心代码如下:
package com.partition.test;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.qin.operadb.PersonRecoder;
import com.qin.operadb.ReadMapDB;
/***
* @author qindongliang
*
* 大数据技术交流群:324714439
* **/
public class TestMultiOutput {
/**
* map任务
*
* **/
public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String ss[]=value.toString().split(";");
context.write(new Text(ss[0]), new Text(ss[1]));
}
}
public static class PReduce extends Reducer<Text, Text, Text, Text>{
/**
* 设置多个文件输出
* */
private MultipleOutputs mos;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
mos=new MultipleOutputs(context);//初始化mos
}
@Override
protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
throws IOException, InterruptedException {
String key=arg0.toString();
for(Text t:arg1){
if(key.equals("中国")){
/**
* 一个参数
* **/
mos.write("china", arg0,t);
} else if(key.equals("美国")){
mos.write("USA", arg0,t);
} else if(key.equals("中国人")){
mos.write("cperson", arg0,t);
}
//System.out.println("Reduce: "+arg0.toString()+" "+t.toString());
}
}
@Override
protected void cleanup(
Context context)
throws IOException, InterruptedException {
mos.close();//释放资源
}
}
public static void main(String[] args) throws Exception{
JobConf conf=new JobConf(ReadMapDB.class);
//Configuration conf=new Configuration();
// conf.set("mapred.job.tracker","192.168.75.130:9001");
//读取person中的数据字段
// conf.setJar("tt.jar");
//注意这行代码放在最前面,进行初始化,否则会报
/**Job任务**/
Job job=new Job(conf, "testpartion");
job.setJarByClass(TestMultiOutput.class);
System.out.println("模式: "+conf.get("mapred.job.tracker"));;
// job.setCombinerClass(PCombine.class);
//job.setPartitionerClass(PPartition.class);
//job.setNumReduceTasks(5);
job.setMapperClass(PMapper.class);
/**
* 注意在初始化时需要设置输出文件的名
* 另外名称,不支持中文名,仅支持英文字符
*
* **/
MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class);
job.setReducerClass(PReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String path="hdfs://192.168.75.130:9000/root/outputdb";
FileSystem fs=FileSystem.get(conf);
Path p=new Path(path);
if(fs.exists(p)){
fs.delete(p, true);
System.out.println("输出路径存在,已删除!");
}
FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
FileOutputFormat.setOutputPath(job,p );
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
} package com.partition.test;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.qin.operadb.PersonRecoder;
import com.qin.operadb.ReadMapDB;
/***
* @author qindongliang
*
* 大数据技术交流群:324714439
* **/
public class TestMultiOutput {
/**
* map任务
*
* **/
public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String ss[]=value.toString().split(";");
context.write(new Text(ss[0]), new Text(ss[1]));
}
}
public static class PReduce extends Reducer<Text, Text, Text, Text>{
/**
* 设置多个文件输出
* */
private MultipleOutputs mos;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
mos=new MultipleOutputs(context);//初始化mos
}
@Override
protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
throws IOException, InterruptedException {
String key=arg0.toString();
for(Text t:arg1){
if(key.equals("中国")){
/**
* 一个参数
* **/
mos.write("china", arg0,t);
} else if(key.equals("美国")){
mos.write("USA", arg0,t);
} else if(key.equals("中国人")){
mos.write("cperson", arg0,t);
}
//System.out.println("Reduce: "+arg0.toString()+" "+t.toString());
}
}
@Override
protected void cleanup(
Context context)
throws IOException, InterruptedException {
mos.close();//释放资源
}
}
public static void main(String[] args) throws Exception{
JobConf conf=new JobConf(ReadMapDB.class);
//Configuration conf=new Configuration();
// conf.set("mapred.job.tracker","192.168.75.130:9001");
//读取person中的数据字段
// conf.setJar("tt.jar");
//注意这行代码放在最前面,进行初始化,否则会报
/**Job任务**/
Job job=new Job(conf, "testpartion");
job.setJarByClass(TestMultiOutput.class);
System.out.println("模式: "+conf.get("mapred.job.tracker"));;
// job.setCombinerClass(PCombine.class);
//job.setPartitionerClass(PPartition.class);
//job.setNumReduceTasks(5);
job.setMapperClass(PMapper.class);
/**
* 注意在初始化时需要设置输出文件的名
* 另外名称,不支持中文名,仅支持英文字符
*
* **/
MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class);
job.setReducerClass(PReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String path="hdfs://192.168.75.130:9000/root/outputdb";
FileSystem fs=FileSystem.get(conf);
Path p=new Path(path);
if(fs.exists(p)){
fs.delete(p, true);
System.out.println("输出路径存在,已删除!");
}
FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
FileOutputFormat.setOutputPath(job,p );
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
如果是中文的路径名,则会报如下的一个异常:
模式: local 输出路径存在,已删除! WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001 INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0 INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91 INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100 INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720 INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680 INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0 INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting INFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done. INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0 INFO - LocalJobRunner$Job.run(348) | Map task executor complete. INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null INFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes INFO - LocalJobRunner$Job.statusUpdate(466) | WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001 java.lang.IllegalArgumentException: Name cannot be have a '一' char at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348) at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74) at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398) INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001 INFO - Counters.log(585) | Counters: 17 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=91 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=177 INFO - Counters.log(589) | HDFS_BYTES_READ=91 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=71111 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=105 INFO - Counters.log(589) | Map input records=6 INFO - Counters.log(589) | Reduce shuffle bytes=0 INFO - Counters.log(589) | Spilled Records=6 INFO - Counters.log(589) | Map output bytes=87 INFO - Counters.log(589) | Total committed heap usage (bytes)=227737600 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=0 INFO - Counters.log(589) | Reduce input groups=0 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Reduce output records=0 INFO - Counters.log(589) | Map output records=6
模式: local 输出路径存在,已删除! WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001 INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0 INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91 INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100 INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720 INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680 INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0 INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting INFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done. INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0 INFO - LocalJobRunner$Job.run(348) | Map task executor complete. INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null INFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes INFO - LocalJobRunner$Job.statusUpdate(466) | WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001 java.lang.IllegalArgumentException: Name cannot be have a '一' char at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348) at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74) at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398) INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001 INFO - Counters.log(585) | Counters: 17 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=91 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=177 INFO - Counters.log(589) | HDFS_BYTES_READ=91 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=71111 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=105 INFO - Counters.log(589) | Map input records=6 INFO - Counters.log(589) | Reduce shuffle bytes=0 INFO - Counters.log(589) | Spilled Records=6 INFO - Counters.log(589) | Map output bytes=87 INFO - Counters.log(589) | Total committed heap usage (bytes)=227737600 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=0 INFO - Counters.log(589) | Reduce input groups=0 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Reduce output records=0 INFO - Counters.log(589) | Map output records=6
源码中关于名称的校验如下:
/**
* Checks if a named output name is valid token.
*
* @param namedOutput named output Name
* @throws IllegalArgumentException if the output name is not valid.
*/
private static void checkTokenName(String namedOutput) {
if (namedOutput == null || namedOutput.length() == 0) {
throw new IllegalArgumentException(
"Name cannot be NULL or emtpy");
}
for (char ch : namedOutput.toCharArray()) {
if ((ch >= 'A') && (ch <= 'Z')) {
continue;
}
if ((ch >= 'a') && (ch <= 'z')) {
continue;
}
if ((ch >= '0') && (ch <= '9')) {
continue;
}
throw new IllegalArgumentException(
"Name cannot be have a '" + ch + "' char");
}
} /**
* Checks if a named output name is valid token.
*
* @param namedOutput named output Name
* @throws IllegalArgumentException if the output name is not valid.
*/
private static void checkTokenName(String namedOutput) {
if (namedOutput == null || namedOutput.length() == 0) {
throw new IllegalArgumentException(
"Name cannot be NULL or emtpy");
}
for (char ch : namedOutput.toCharArray()) {
if ((ch >= 'A') && (ch <= 'Z')) {
continue;
}
if ((ch >= 'a') && (ch <= 'z')) {
continue;
}
if ((ch >= '0') && (ch <= '9')) {
continue;
}
throw new IllegalArgumentException(
"Name cannot be have a '" + ch + "' char");
}
}
程序运行成功输出:
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0006 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0006 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=9289 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=13645 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=0 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=105 INFO - Counters.log(589) | HDFS_BYTES_READ=203 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=113616 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=87 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=91 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=105 INFO - Counters.log(589) | Map input records=6 INFO - Counters.log(589) | Reduce shuffle bytes=105 INFO - Counters.log(589) | Spilled Records=12 INFO - Counters.log(589) | Map output bytes=87 INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792 INFO - Counters.log(589) | CPU time spent (ms)=1880 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=6 INFO - Counters.log(589) | Reduce input groups=3 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=278876160 INFO - Counters.log(589) | Reduce output records=0 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1460908032 INFO - Counters.log(589) | Map output records=6
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0006 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0006 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=9289 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=13645 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=0 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=105 INFO - Counters.log(589) | HDFS_BYTES_READ=203 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=113616 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=87 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=91 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=105 INFO - Counters.log(589) | Map input records=6 INFO - Counters.log(589) | Reduce shuffle bytes=105 INFO - Counters.log(589) | Spilled Records=12 INFO - Counters.log(589) | Map output bytes=87 INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792 INFO - Counters.log(589) | CPU time spent (ms)=1880 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=6 INFO - Counters.log(589) | Reduce input groups=3 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=278876160 INFO - Counters.log(589) | Reduce output records=0 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1460908032 INFO - Counters.log(589) | Map output records=6
运行成功后,生成的文件如下所示:

china-r-00000里面的数据如下:
中国 我们 中国 123
中国 我们 中国 123
USA-r-00000里面的数据如下:
美国 他们 美国 USA 美国 在北美洲
美国 他们 美国 USA 美国 在北美洲
cperson-r-00000里面的数据如下:
中国人 善良
中国人 善良
在输出结果中,reduce自带的那个文件仍然会输出,但是里面没有任何数据,至此,我们已经在hadoop1.2.0的基于新的API里,测试多文件输出通过。
相关推荐
Kafka 2020-09-18
Wepe0 2020-10-30
windle 2020-10-29
mengzuchao 2020-10-22
Junzizhiai 2020-10-10
bxqybxqy 2020-09-30
风之沙城 2020-09-24
kingszelda 2020-09-22
大唐帝国前营 2020-08-18
yixu0 2020-08-17
TangCuYu 2020-08-15
xiaoboliu00 2020-08-15
songshijiazuaa 2020-08-15
xclxcl 2020-08-03
zmzmmf 2020-08-03
newfarhui 2020-08-03
likesyour 2020-08-01