MapReduce 学习日志之我的MapReduce程序学习.docx
文本预览下载声明
MapReduce 学习日志之我的MapReduce程序学习 将一批电话通信清单,记录了用户A拨打用户B的记录,需要做一个倒排索引,记录拨打给用户B的所有用户A。如原有的txt 为:首先,我们应该把源文件传到Hdfs上,然后将原始数据进行分割,将被叫作为KEY,主叫作为Value,将拨打相同被叫的主叫号码汇总起来输出到HDFS。程序如下:package com.xxs;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;//此处为导入的包,一般为固定的。public class MapTest_2 extends Configured implements Tool{ enum Counter { LINESKIP, }//出错的行,出错计数器public static class Map extends MapperLongWritable,Text,Text,Text{public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{String line=value.toString();try{String [] lineSplit=line.split( );String anum=lineSplit[0];String bnum=lineSplit[1];context.write(new Text(bnum),new Text(anum));}catch(java.lang.ArrayIndexOutOfBoundsException e){context.getCounter(Counter.LINESKIP).increment(1);return;}}}public static class Reduce extends ReducerText,Text,Text,Text{public void reduce(Text key,IterableTextvalues,Context context)throws IOException,InterruptedException{String valueString;String out=;for(Text value:values){valueString=value.toString();out+=valueString+|;}context.write(key, new Text(out));}}public int run(String[] args)throws Exception{Configuration conf=getConf();Job job=new Job(conf,MapTest_2);//任务名job.setJarByClass(MapTest_2.class);//指定classFileInputFormat.addInputPath(job, new Path(args[0]));//输入路径FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出路径job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputFormatClass(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.waitForCompletion(
显示全部