MapReduce技术细节
一、概述
1、MapReduce是Hadoop提供的一套用于进行分布式的计算框架
2、MapReduce程序将整个计算过程拆分成两个阶段:Map阶段(映射)和Reduce阶段(规约)
二、MapReduce程序的执行流程

三、Winodws系统的环境配置
1、Hadoop对Winodws的兼容性不强,所以如果不配置则运行MR程序就会报错
2、如果双击winutils.exe报错,将msvcr120.dll复制到C:\Windows\System32目录下
3、配置环境变量



四、MapReduce程序相关案例
①字符统计案例
1、CharCountMapper类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package com.itcast.charcount;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
// 当前类是MapReduce程序的Mapper阶段的逻辑
// 统计每一个字符出现的次数 // 在MapReduce程序中要求数据能够被序列化 // KEYIN --- 输入的键的类型,如果不指定则默认就是行的字节偏移量 // VALUEIN --- 输入的值的类型,如果不指定则默认就是读取的一行数据 // KEYOUT --- 输出的键的类型,当前案例中输出的键是字符 // VALUEOUT --- 输出的值的类型,当前案例中输出的值是次数 public class CharCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
// key:键,实际上就是行的字节偏移量 // value:值。实际上就是读取的一行数据 // context:利用这个参数将数据传输到Reduce阶段 @Override // 子类重写父类的方法 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // hello // 先将这一行中的字符拆分出来 char[] cs = value.toString().toCharArray(); // {h,e,l,l,o} // h:1 e:1 l:2 o:1 // h:1 e:1 l:1 l:1 o:1 // 无论字符是否重复,那么将每一个字符的出现次数设置为1 for (char c : cs){ // 每遍历一次就得到了一个字符,不管重复与否都设置为1 context.write(new Text(c + ""),new IntWritable(1)); } } }
|
2、CharCountReducer类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.itcast.charcount;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// 当前类是MapReduce程序的Reducer阶段的逻辑
// KEYIN, VALUEIN -- 输入的键值类型,Reducer的数据来自于Mapper // 那么意味着Map的输出就是Reducer阶段的输入 // KEYOUT, VALUEOUT --- 输出的键值类型 public class CharCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
// key:输入的键 -- 字符 // values:输入的值,这个值已经进行过分组,将相同的键对应的值放在一组中产生一个迭代器 // context:利用这个参数将结果写到HDFS @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // key:'a' // values:{1,1,1,1,1,1,1,1.....} // 求和变量 int sum = 0; // 遍历迭代器来求和 for (IntWritable val : values){ sum += val.get(); } context.write(key,new IntWritable(sum)); } }
|
3、CharCountDriver类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.itcast.charcount;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
// 当前类是MapReduce程序的入口类(驱动类) public class CharCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 环境参数 Configuration conf = new Configuration(); // 向Yarn来申请Job任务 Job job = Job.getInstance(conf);
// 设置入口类 job.setJarByClass(CharCountDriver.class); // 设置Mapper类 job.setMapperClass(CharCountMapper.class); // 设置Reducer类 job.setReducerClass(CharCountReducer.class);
// 设置Mapper阶段的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置Reducer阶段的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
// 设置输入路径 --- 要处理原始文件的路径 // 写Active状态的NameNode路径 FileInputFormat.addInputPath(job, new Path("hdfs://192.168.232.128:9000/characters.txt")); // 设置输出路径 --- 处理之后结果文件的路径 FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.128:9000/result/charcount"));
// 提交Job任务 job.waitForCompletion(true); } }
|
②单词统计案例
1、WordCountMapper类
2、WordCountReducer类
3、WordCountDriver类