Day03_03_MapReduce技术细节

MapReduce技术细节

一、概述

1、MapReduce是Hadoop提供的一套用于进行分布式的计算框架

2、MapReduce程序将整个计算过程拆分成两个阶段:Map阶段(映射)和Reduce阶段(规约)

二、MapReduce程序的执行流程

1741834195647

三、Winodws系统的环境配置

1、Hadoop对Winodws的兼容性不强,所以如果不配置则运行MR程序就会报错

2、如果双击winutils.exe报错,将msvcr120.dll复制到C:\Windows\System32目录下

3、配置环境变量

1741835715649

1741835819869

1741835965028

四、MapReduce程序相关案例
①字符统计案例

1、CharCountMapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.itcast.charcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

// 当前类是MapReduce程序的Mapper阶段的逻辑

// 统计每一个字符出现的次数
// 在MapReduce程序中要求数据能够被序列化
// KEYIN --- 输入的键的类型,如果不指定则默认就是行的字节偏移量
// VALUEIN --- 输入的值的类型,如果不指定则默认就是读取的一行数据
// KEYOUT --- 输出的键的类型,当前案例中输出的键是字符
// VALUEOUT --- 输出的值的类型,当前案例中输出的值是次数
public class CharCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {

// key:键,实际上就是行的字节偏移量
// value:值。实际上就是读取的一行数据
// context:利用这个参数将数据传输到Reduce阶段
@Override // 子类重写父类的方法
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// hello
// 先将这一行中的字符拆分出来
char[] cs = value.toString().toCharArray();
// {h,e,l,l,o}
// h:1 e:1 l:2 o:1
// h:1 e:1 l:1 l:1 o:1
// 无论字符是否重复,那么将每一个字符的出现次数设置为1
for (char c : cs){
// 每遍历一次就得到了一个字符,不管重复与否都设置为1
context.write(new Text(c + ""),new IntWritable(1));
}
}
}

2、CharCountReducer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.itcast.charcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

// 当前类是MapReduce程序的Reducer阶段的逻辑

// KEYIN, VALUEIN -- 输入的键值类型,Reducer的数据来自于Mapper
// 那么意味着Map的输出就是Reducer阶段的输入
// KEYOUT, VALUEOUT --- 输出的键值类型
public class CharCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

// key:输入的键 -- 字符
// values:输入的值,这个值已经进行过分组,将相同的键对应的值放在一组中产生一个迭代器
// context:利用这个参数将结果写到HDFS
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// key:'a'
// values:{1,1,1,1,1,1,1,1.....}
// 求和变量
int sum = 0;
// 遍历迭代器来求和
for (IntWritable val : values){
sum += val.get();
}
context.write(key,new IntWritable(sum));
}
}

3、CharCountDriver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.itcast.charcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

// 当前类是MapReduce程序的入口类(驱动类)
public class CharCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 环境参数
Configuration conf = new Configuration();
// 向Yarn来申请Job任务
Job job = Job.getInstance(conf);

// 设置入口类
job.setJarByClass(CharCountDriver.class);
// 设置Mapper类
job.setMapperClass(CharCountMapper.class);
// 设置Reducer类
job.setReducerClass(CharCountReducer.class);

// 设置Mapper阶段的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Reducer阶段的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置输入路径 --- 要处理原始文件的路径
// 写Active状态的NameNode路径
FileInputFormat.addInputPath(job,
new Path("hdfs://192.168.232.128:9000/characters.txt"));
// 设置输出路径 --- 处理之后结果文件的路径
FileOutputFormat.setOutputPath(job,
new Path("hdfs://192.168.232.128:9000/result/charcount"));

// 提交Job任务
job.waitForCompletion(true);
}
}
②单词统计案例

1、WordCountMapper类

1

2、WordCountReducer类

1

3、WordCountDriver类

1