【大数据学习-实验-5】MapReduce应用

1.编程实现WordCount实例。

实验内容

现有某电商网站用户对商品的收藏数据,记录了用户收藏的商品id以及收藏日期,名为buyer_favorite1。
buyer_favorite1包含:买家id,商品id,收藏日期这三个字段,数据以“\t”分割,样本数据及格式如下:

买家id   商品id    收藏日期
10181   1000481   2010-04-04 16:54:31
20001   1001597   2010-04-07 15:07:52
20001   1001560   2010-04-07 15:08:27
20042   1001368   2010-04-08 08:20:30
20067   1002061   2010-04-08 16:45:33
20056   1003289   2010-04-12 10:50:55
20056   1003290   2010-04-12 11:57:35
20056   1003292   2010-04-12 12:05:29
20054   1002420   2010-04-14 15:24:12
20055   1001679   2010-04-14 19:46:04
20054   1010675   2010-04-14 15:23:53
20054   1002429   2010-04-14 17:52:45
20076   1002427   2010-04-14 19:35:39
20054   1003326   2010-04-20 12:54:44
20056   1002420   2010-04-15 11:24:49
20064   1002422   2010-04-15 11:35:54
20056   1003066   2010-04-15 11:43:01
20056   1003055   2010-04-15 11:43:06
20056   1010183   2010-04-15 11:45:24
20056   1002422   2010-04-15 11:45:49
20056   1003100   2010-04-15 11:45:54
20056   1003094   2010-04-15 11:45:57
20056   1003064   2010-04-15 11:46:04
20056   1010178   2010-04-15 16:15:20
20076   1003101   2010-04-15 16:37:27
20076   1003103   2010-04-15 16:37:05
20076   1003100   2010-04-15 16:37:18
20076   1003066   2010-04-15 16:37:31
20054   1003103   2010-04-15 16:40:14
20054   1003100   2010-04-15 16:40:16

要求编写MapReduce程序,统计每个买家收藏商品数量。
统计结果数据如下:

买家id 商品数量
10181	1
20001	2
20042	1
20054	6
20055	1
20056	12
20064	1
20067	1
20076	5

实验思路

大致思路是将hdfs上的文本作为输入,MapReduce通过InputFormat会将文本进行切片处理,并将每行的首字母相对于文本文件的首地址的偏移量作为输入键值对的key,文本内容作为输入键值对的value,经过在map函数处理,输出中间结果<word,1>的形式,并在reduce函数中完成对每个单词的词频统计。整个程序代码主要包括两部分:Mapper部分和Reducer部分。

在map函数里有三个参数,前面两个Object key,Text value就是输入的key和value,第三个参数Context context是可以记录输入的key和value。例如context.write(word,one);此外context还会记录map运算的状态。map阶段采用Hadoop的默认的作业输入方式,把输入的value用StringTokenizer()方法截取出的买家id字段设置为key,设置value为1,然后直接输出<key,value>。

map输出的<key,value>先要经过shuffle过程把相同key值的所有value聚集起来形成<key,values>后交给reduce端。reduce端接收到<key,values>之后,将输入的key直接复制给输出的key,用for循环遍历values并求和,求和结果就是key值代表的单词出现的总次,将其设置为value,直接输出<key,value>。

package mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();//设置环境参数
        job.setJobName("WordCount");//设置环境参数名
        job.setJarByClass(WordCount.class);//设置整个程序类名
        job.setMapperClass(doMapper.class);//添加doMapper类
        job.setReducerClass(doReducer.class);//添加doReducer类
        job.setOutputKeyClass(Text.class);//设置输出类型
        job.setOutputValueClass(IntWritable.class);//设置输出类型
        Path in = new Path("hdfs://localhost:9000/mymapreduce1/in/buyer_favorite1");
        Path out = new Path("hdfs://localhost:9000/mymapreduce1/out");
        FileInputFormat.addInputPath(job, in);//设置输入文件
        FileOutputFormat.setOutputPath(job, out);//设置输出文件
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
   public static class doMapper extends Mapper<Object, Text, Text, IntWritable>{
	//第一个Object表示输入key的类型;第二个Text表示输入value的类型;第三个Text表示输出键的类型;第四个IntWritable表示输出值的类型
		public static final IntWritable one = new IntWritable(1);
        public static Text word = new Text();
        @Override
        protected void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException{
                       //抛出异常
            StringTokenizer tokenizer = new StringTokenizer(value.toString(),"\t");
          //StringTokenizer是Java工具包中的一个类,用于将字符串进行拆分
                word.set(tokenizer.nextToken());
                 //返回当前位置到下一个分隔符之间的字符串
                context.write(word, one);
                 //将word存到容器中,记一个数
        }
    }
    public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
		//参数同Map一样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型
		private IntWritable result = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
			    int sum = 0;
			    for (IntWritable value : values) {
				    sum += value.get();
			    }
			    //for循环遍历,将得到的values值累加
			    result.set(sum);
			    context.write(key, result);
			}
	}
}

2.编写程序实现简单的求平均统计。

实验内容

现有某电商关于商品点击情况的数据文件,表名为goods_click,包含两个字段(商品分类,商品点击次数),分隔符“\t”,由于数据很大,所以为了方便统计我们只截取它的一部分数据,内容如下:

商品分类 商品点击次数
52127	5
52120	93
52092	93
52132	38
52006	462
52109	28
52109	43
52132	0
52132	34
52132	9
52132	30
52132	45
52132	24
52009	2615
52132	25
52090	13
52132	6
52136	0
52090	10
52024	347

要求使用mapreduce统计出每类商品的平均点击次数。结果数据如下:

商品分类 商品平均点击次数
52006	462
52009	2615
52024	347
52090	11
52092	93
52109	35
52120	93
52127	5
52132	23
52136	0

实现原理

求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,在数据输入到Reduce之前先经过shuffle,将map函数输出的key值相同的所有的value值形成一个集合value-list,然后将输入到Reduce端,Reduce端汇总并且统计记录数,然后作商即可。具体原理如下图所示:

map端在采用Hadoop的默认输入方式之后,将输入的value值通过split()方法截取出来,我们把截取的商品点击次数字段转化为IntWritable类型并将其设置为value,把商品分类字段设置为key,然后直接输出key/value的值。

map的输出<key,value>经过shuffle过程集成<key,values>键值对,然后将<key,values>键值对交给reduce。reduce端接收到values之后,将输入的key直接复制给输出的key,将values通过for循环把里面的每个元素求和num并统计元素的次数count,然后用num除以count 得到平均值avg,将avg设置为value,最后直接输出<key,value>就可以了。

package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MyAverage{
	public static class Map extends Mapper<Object , Text , Text , IntWritable>{
	    private static Text newKey=new Text();
	    //实现map函数
	    public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
		    // 将输入的纯文本文件的数据转化成String
		    String line=value.toString();
		    System.out.println(line);
		    String arr[]=line.split("\t");
		    newKey.set(arr[0]);
		    int click=Integer.parseInt(arr[1]);
		    context.write(newKey, new IntWritable(click));
	    }
    }

  	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
		//实现reduce函数
		public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
		    int num=0;
		    int count=0;
		    for(IntWritable val:values){
			    num+=val.get(); //每个元素求和num
			    count++;        //统计元素的次数count
		    }
		    int avg=num/count;  //计算平均数
		    context.write(key,new IntWritable(avg));
		}
    }

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
	        Configuration conf=new Configuration();
	        System.out.println("start");
	        Job job =new Job(conf,"MyAverage");
	        job.setJarByClass(MyAverage.class);
	        job.setMapperClass(Map.class);
	        job.setReducerClass(Reduce.class);
	        job.setOutputKeyClass(Text.class);
	        job.setOutputValueClass(IntWritable.class);
	        job.setInputFormatClass(TextInputFormat.class);
	        job.setOutputFormatClass(TextOutputFormat.class);
	        Path in=new Path("hdfs://localhost:9000/mymapreduce4/in/goods_click");
	        Path out=new Path("hdfs://localhost:9000/mymapreduce4/out");
	        FileInputFormat.addInputPath(job,in);
	        FileOutputFormat.setOutputPath(job,out);
	        System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

3.编写程序实现去重操作。

实验内容

现有一个某电商网站的数据文件,名为buyer_favorite1,记录了用户收藏的商品以及收藏的日期,文件buyer_favorite1中包含(用户id,商品id,收藏日期)三个字段,数据内容以“\t”分割,由于数据很大,所以为了方便统计我们只截取它的一部分数据,内容如下:

用户id   商品id    收藏日期
10181   1000481   2010-04-04 16:54:31
20001   1001597   2010-04-07 15:07:52
20001   1001560   2010-04-07 15:08:27
20042   1001368   2010-04-08 08:20:30
20067   1002061   2010-04-08 16:45:33
20056   1003289   2010-04-12 10:50:55
20056   1003290   2010-04-12 11:57:35
20056   1003292   2010-04-12 12:05:29
20054   1002420   2010-04-14 15:24:12
20055   1001679   2010-04-14 19:46:04
20054   1010675   2010-04-14 15:23:53
20054   1002429   2010-04-14 17:52:45
20076   1002427   2010-04-14 19:35:39
20054   1003326   2010-04-20 12:54:44
20056   1002420   2010-04-15 11:24:49
20064   1002422   2010-04-15 11:35:54
20056   1003066   2010-04-15 11:43:01
20056   1003055   2010-04-15 11:43:06
20056   1010183   2010-04-15 11:45:24
20056   1002422   2010-04-15 11:45:49
20056   1003100   2010-04-15 11:45:54
20056   1003094   2010-04-15 11:45:57
20056   1003064   2010-04-15 11:46:04
20056   1010178   2010-04-15 16:15:20
20076   1003101   2010-04-15 16:37:27
20076   1003103   2010-04-15 16:37:05
20076   1003100   2010-04-15 16:37:18
20076   1003066   2010-04-15 16:37:31
20054   1003103   2010-04-15 16:40:14
20054   1003100   2010-04-15 16:40:16

要求用Java编写MapReduce程序,根据商品id进行去重,统计用户收藏商品中都有哪些商品被收藏。结果数据如下:

商品id
1000481
1001368
1001560
1001597
1001679
1002061
1002420
1002422
1002427
1002429
1003055
1003064
1003066
1003094
1003100
1003101
1003103
1003289
1003290
1003292
1003326
1010178
1010183
1010675

实现原理

“数据去重”主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。

数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后交给reduce。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,而对value-list则没有要求(可以设置为空)。当reduce接收到一个<key,value-list>时就直接将输入的key复制到输出的key中,并将value设置成空值,然后输出<key,value>。

数据去重的目的是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然想到将相同key值的所有value记录交到一台reduce机器,让其无论这个数据出现多少次,最终结果只输出一次。具体就是reduce的输出应该以数据作为key,而对value-list没有要求,当reduce接收到一个时,就直接将key复制到输出的key中,将value设置为空。

map阶段采用Hadoop的默认的作业输入方式,把输入的value用split()方法截取,截取出的商品id字段设置为key,设置value为空,然后直接输出<key,value>。

map阶段采用Hadoop的默认的作业输入方式,把输入的value用split()方法截取,截取出的商品id字段设置为key,设置value为空,然后直接输出<key,value>。

package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Filter{
	public static class Map extends Mapper<Object , Text , Text , NullWritable>
    //map将输入中的value复制到输出数据的key上,并直接输出
    {
	    private static Text newKey=new Text();		//从输入中得到的每行的数据的类型
	    public void map(Object key,Text value,Context context) throws IOException, InterruptedException
	    //实现map函数
	    {			  //获取并输出每一次的处理过程
		    String line=value.toString();
		    System.out.println(line);
		    String arr[]=line.split("\t");
		    newKey.set(arr[1]);
		    context.write(newKey, NullWritable.get());
		    System.out.println(newKey);
	    }
    }


	public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{
			public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException
		    //实现reduce函数
		    {
			    context.write(key,NullWritable.get());   //获取并输出每一次的处理过程
		    }
	}
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
	        Configuration conf=new Configuration();
	        System.out.println("start");
	        Job job =new Job(conf,"filter");
	        job.setJarByClass(Filter.class);
	        job.setMapperClass(Map.class);
	        job.setReducerClass(Reduce.class);
	        job.setOutputKeyClass(Text.class);
	        job.setOutputValueClass(NullWritable.class);
	        job.setInputFormatClass(TextInputFormat.class);
	        job.setOutputFormatClass(TextOutputFormat.class);
	        Path in=new Path("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1");
	        Path out=new Path("hdfs://localhost:9000/mymapreduce2/out");
	        FileInputFormat.addInputPath(job,in);
	        FileOutputFormat.setOutputPath(job,out);
	        System.exit(job.waitForCompletion(true) ? 0 : 1);
	  }
}

4.编写程序实现二次排序。

实验内容

在电商网站中,用户进入页面浏览商品时会产生访问日志,记录用户对商品的访问情况,现有goods_visit2表,包含(goods_id,click_num)两个字段,数据内容如下:

goods_id click_num
1010037	100
1010102	100
1010152	97
1010178	96
1010280	104
1010320	103
1010510	104
1010603	96
1010637	97

编写MapReduce代码,功能为根据商品的点击次数(click_num)进行降序排序,再根据goods_id升序排序,并输出所有商品。
输出结果如下:

点击次数 商品id
------------------------------------------------
104	1010280
104	1010510
------------------------------------------------
103	1010320
------------------------------------------------
100	1010037
100	1010102
------------------------------------------------
97	1010152
97	1010637
------------------------------------------------
96	1010178
96	1010603

实现原理

在Map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本实验中使用的是TextInputFormat,他提供的RecordReder会将文本的字节偏移量作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>键值对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。 如果没有通过job.setSortComparatorClass设置key比较函数类,则可以使用key实现的compareTo方法进行排序。 在本实验中,就使用了IntPair实现的compareTo方法。

在Reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

二次排序:在mapreduce中,所有的key是需要被比较和排序的,并且是二次,先根据partitioner,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后在第一字段相同时按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。Java代码主要分为四部分:自定义key,自定义分区函数类,map部分,reduce部分。

package mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SecondarySort
{
	//自定义key的代码:
	/**所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的,并重载方法。该类中包含以下几种方法:1.反序列化,从流中的二进制转换成IntPair 方法为public void readFields(DataInput in) throws IOException 2.序列化,将IntPair转化成使用流传送的二进制 方法为public void write(DataOutput out)3. key的比较 public int compareTo(IntPair o) 另外新定义的类应该重写的两个方法 public int hashCode() 和public boolean equals(Object right) **/
    public static class IntPair implements WritableComparable<IntPair> {
	    int first;//第一个成员变量 
	    int second;//第二个成员变量 

	    public void set(int left, int right){
		    first = left;
		    second = right;
	    }
	    public int getFirst(){
	   		return first;
	    }
	    public int getSecond(){
	    	return second;
	    }
	    @Override
	    public void readFields(DataInput in) throws IOException {//反序列化,从流中的二进制转换成IntPair 
		    // TODO Auto-generated method stub
		    first = in.readInt();
		    second = in.readInt();
	    }
	    @Override
	    public void write(DataOutput out) throws IOException{ //序列化,将IntPair转化成使用流传送的二进制 
		    // TODO Auto-generated method stub
		    out.writeInt(first);
		    out.writeInt(second);
	    }
	    @Override
	    public int compareTo(IntPair o) {  //key的比较 
		    // TODO Auto-generated method stub
		    if (first != o.first) {
		    	return first < o.first ? 1 : -1;
		    }
		    else if (second != o.second){
		    	return second < o.second ? -1 : 1;
		    }
		    else{
		    	return 0;
		    }
	    }
	    @Override
	    public int hashCode() {
	    	return first * 157 + second;
	    }
	    @Override
	    public boolean equals(Object right){
		    if (right == null)
		    	return false;
		    if (this == right)
		    	return true;
		    if (right instanceof IntPair){
			    IntPair r = (IntPair) right;
			    return r.first == first && r.second == second;
		    }
		    else{
		    	return false;
		    }
		}
	}
	/**分区函数类代码 对key进行分区,根据自定义key中first乘以127取绝对值在对numPartions取余来进行分区。这主要是为实现第一次排序。 **/
	public static class FirstPartitioner extends Partitioner<IntPair, IntWritable> {
	    @Override
	    public int getPartition(IntPair key, IntWritable value,int numPartitions){
	    	return Math.abs(key.getFirst() * 127) % numPartitions;
	    }
	}
	/**分组函数类代码 分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。 **/
	public static class GroupingComparator extends WritableComparator{
	    protected GroupingComparator(){
	    	super(IntPair.class, true);
	    }
	    @Override
	    //Compare two WritableComparables.
	    public int compare(WritableComparable w1, WritableComparable w2){
		    IntPair ip1 = (IntPair) w1;
		    IntPair ip2 = (IntPair) w2;
		    int l = ip1.getFirst();
		    int r = ip2.getFirst();
		    return l == r ? 0 : (l < r ? -1 : 1);
	    }
	}
	/**map代码:在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>键值对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key实现compareTo方法。在本例子中,使用了IntPair实现compareTo方法。 **/
	
	public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>{
	    private final IntPair intkey = new IntPair();
	    private final IntWritable intvalue = new IntWritable();
	    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
		    String line = value.toString();
		    StringTokenizer tokenizer = new StringTokenizer(line);
		    int left = 0;
		    int right = 0;
		    if (tokenizer.hasMoreTokens()){
		    	left = Integer.parseInt(tokenizer.nextToken());
		    	if (tokenizer.hasMoreTokens())
		    		right = Integer.parseInt(tokenizer.nextToken());
			    intkey.set(right, left);
			    intvalue.set(left);
			    context.write(intkey, intvalue);
		    }
	    }
	}
	/**Reduce代码:在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的key和它的value迭代器。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。**/
	public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>{
	    private final Text left = new Text();
	    private static final Text SEPARATOR = new Text("------------------------------------------------");
		public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
		    context.write(SEPARATOR, null);
		    left.set(Integer.toString(key.getFirst()));
		    System.out.println(left);
		    for (IntWritable val : values){
			    context.write(left, val);
			    //System.out.println(val);
		    }
		}
	}
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
	
	    Configuration conf = new Configuration();
	    Job job = new Job(conf, "secondarysort");
	    job.setJarByClass(SecondarySort.class);
	    job.setMapperClass(Map.class);
	    job.setReducerClass(Reduce.class);
	    job.setPartitionerClass(FirstPartitioner.class);
	
	    job.setGroupingComparatorClass(GroupingComparator.class);
	    job.setMapOutputKeyClass(IntPair.class);
	
	    job.setMapOutputValueClass(IntWritable.class);
	
	    job.setOutputKeyClass(Text.class);
	
	    job.setOutputValueClass(IntWritable.class);
	
	    job.setInputFormatClass(TextInputFormat.class);
	
	    job.setOutputFormatClass(TextOutputFormat.class);
	    String[] otherArgs=new String[2];
	    otherArgs[0]="hdfs://localhost:9000/mymapreduce8/in/goods_visit2";
	    otherArgs[1]="hdfs://localhost:9000/mymapreduce8/out";
	
	    FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
	
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

5.编写程序实现单表join。

实验内容

现有某电商的用户好友数据文件,名为 buyer1,buyer1中包含(buyer_id,friends_id)两个字段,内容是以"\t"分隔,编写MapReduce进行单表连接,查询出用户的间接好友关系。例如:10001的好友是10002,而10002的好友是10005,那么10001和10005就是间接好友关系。buyer1(buyer_id,friends_id)

10001	10002
10002	10005
10003	10002
10004	10006
10005	10007
10006	10022
10007	10032
10009	10006
10010	10005
10011	10013

统计结果数据如下:

好友id  用户id
10005	10001
10005	10003
10007	10010
10007	10002
10022	10004
10022	10009
10032	10005

实验原理

实验原理
以本实验的buyer1(buyer_id,friends_id)表为例来阐述单表连接的实验原理。单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,在map阶段将读入数据分割成buyer_id和friends_id之后,会将buyer_id设置成key,friends_id设置成value,直接输出并将其作为左表;再将同一对buyer_id和friends_id中的friends_id设置成key,buyer_id设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"buyer_idfriends_id–friends_idbuyer_id"关系。取出每个key的value-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。

package mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DanJoin {
	public static class Map extends Mapper<Object,Text,Text,Text>{
  		 //实现map函数
  		 /**Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat将数据集切分成小的数据集InputSplit,并用RecordReader解析成<key/value>对提供给map函数使用。map函数中用split("\t")方法把每行数据进行截取,并把数据存入到数组arr[],把arr[0]赋值给mapkey,arr[1]赋值给mapvalue。用两个context的write()方法把数据输出两份,再通过标识符relationtype为1或2对两份输出数据的value打标记。**/
  		 
		public void map(Object key,Text value,Context context)
				throws IOException,InterruptedException{
				String line = value.toString();
				String[] arr = line.split("\t");   //按行截取
				String mapkey=arr[0];
				String mapvalue=arr[1];
				String relationtype=new String();  //左右表标识
				relationtype="1";  //输出左表
				context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));
				//System.out.println(relationtype+"+"+mapvalue);
				relationtype="2";  //输出右表
				context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));
				//System.out.println(relationtype+"+"+mapvalue);

		}
    }

	public static class Reduce extends Reducer<Text, Text, Text, Text>{
 		//实现reduce函数
 		/**reduce端在接收map端传来的数据时已经把相同key的所有value都放到一个Iterator容器中values。reduce函数中,首先新建两数组buyer[]和friends[]用来存放map端的两份输出数据。然后Iterator迭代中hasNext()和Next()方法加while循环遍历输出values的值并赋值给record,用charAt(0)方法获取record第一个字符赋值给relationtype,用if判断如果relationtype为1则把用substring(2)方法从下标为2开始截取record将其存放到buyer[]中,如果relationtype为2时将截取的数据放到frindes[]数组中。然后用三个for循环嵌套遍历输出<key,value>,其中key=buyer[m],value=friends[n]。**/
		public void reduce(Text key,Iterable<Text> values,Context context)
    		throws IOException,InterruptedException{
		    int buyernum=0;
		    String[] buyer=new String[20];
		    int friendsnum=0;
		    String[] friends=new String[20];
		    Iterator ite=values.iterator();
		    while(ite.hasNext()){
			    String record=ite.next().toString();
			    int len=record.length();
			    int i=2;
			    if(0==len){
			    	continue;
	    		}
			    //取得左右表标识
			    char relationtype=record.charAt(0);
			    //取出record,放入buyer
			    if('1'==relationtype){
				    buyer [buyernum]=record.substring(i);
				    buyernum++;
			    }
			    //取出record,放入friends
			    if('2'==relationtype){
				    friends[friendsnum]=record.substring(i);
				    friendsnum++;
			    }
  		  	}
		    //buyernum和friendsnum数组求笛卡尔积
		    if(0!=buyernum&&0!=friendsnum){
			    for(int m=0;m<buyernum;m++){
				    for(int n=0;n<friendsnum;n++){
					    if(buyer[m]!=friends[n]){
						    //输出结果
						    context.write(new Text(buyer[m]),new Text(friends[n]));
						}
				    }
				}
			}
		}
	}

    	public static void main(String[] args) throws Exception{

		    Configuration conf=new Configuration();
		    String[] otherArgs=new String[2];
		    otherArgs[0]="hdfs://localhost:9000/mymapreduce7/in/buyer1";
		    otherArgs[1]="hdfs://localhost:9000/mymapreduce7/out";
		    Job job=new Job(conf," Table join");
		    job.setJarByClass(DanJoin.class);
		    job.setMapperClass(Map.class);
		    job.setReducerClass(Reduce.class);
		    job.setOutputKeyClass(Text.class);
		    job.setOutputValueClass(Text.class);
		    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		    System.exit(job.waitForCompletion(true)?0:1);
	
	    }
    }

全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务