创建词频统计归并器类
一个类继承Reducer,变成一个Reducer组件类
Reducer组件会接收Mapper组件的输出结果
第一个泛型对应的是Mapper输出key类型
第二个泛型对应的是Mapper输出value类型
第三个泛型和第四个泛型是Reducer的输出key类型和输出value类型
Reducer组件不能单独存在,但是Mapper组件可以单独存在
当引入Reducer组件后,输出结果文件内容就是Reducer的输出key和输出value
在net.hw.mr包里创建WordCountReducer
package net.hw.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class WordCountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 定义整数数组列表 List integers = new ArrayList<>(); // 遍历输入值迭代器 for (IntWritable value : values) { // 将每个值添加到数组列表 integers.add(value.get()); // 利用get()方法将hadoop数据类型转换成java数据类型 } // 输出新的键值对,注意要将java字符串转换成hadoop的text类型 context.write(key, new Text(integers.toString())); } }
修改词频统计驱动器类
设置词频统计的Reducer类及其输出键类型和输出值类型(Text,Text)
运行词频统计驱动器类,查看结果
运行WordCountDriver类
修改词频统计归并器类
输出键值类型改为IntWritable,遍历值迭代器,累加得到单词出现次数
package net.hw.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 定义键出现次数 int count = 0; // 遍历输入值迭代器 for (IntWritable value : values) { count += value.get(); // 其实针对此案例,可用count++来处理 } // 输出新的键值对,注意要将java的int类型转换成hadoop的IntWritable类型 context.write(key, new IntWritable(count)); } }
修改词频统计驱动器类
修改归并任务的输出值类型(IntWritable类型)
采用多个Reduce做合并
MR默认采用哈希分区HashPartitioner
Mapper输出key.hashcode & Integer.MAX_ VALUE % Reduce任务数量
修改词频统计驱动器类,设置分区数量
运行结果
在Hadoop WebUI界面上查看
打包上传到虚拟机上运行
利用Maven打包
打开Maven管理窗口,找到项目的LifeCycle下的package命令
双击package命令,报错,maven插件版本不对
修改pom.xml文件,添加maven插件,记得要刷新maven
再次打包,即可生成MRWordCount-1.0-SNAPSHOT.jar
将jar包上传到虚拟机
将MRWordCount-1.0-SNAPSHOT.jar上传到master虚拟机/home目录
查看上传的jar包
运行jar包,查看结果
运行报错,Java编译版本不一致导致错误,本地打包用的是JDK11,虚拟机上安装的JDK8
降低项目JDK版本,重新打包
修改项目JDK
修改pom.xml文件
重新利用maven打包
重新上传jar包到虚拟机
删除master虚拟机上的jar包
重新上传jar包
运行jar包,查看结果
执行命令:hadoop jar MRWordCount-1.0-SNAPSHOT.jar net.hw.mr.WordCountDriver
创建新词频统计驱动器类
由用户指定输入路径和输出路径,如果用户不指定,那么由程序来设置
在net.hw.mr包里创建WordCountDriverNew类
package net.hw.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; public class WordCountDriverNew { public static void main(String[] args) throws Exception { // 创建配置对象 Configuration conf = new Configuration(); // 设置数据节点主机名属性 conf.set("dfs.client.use.datanode.hostname", "true"); // 获取作业实例 Job job = Job.getInstance(conf); // 设置作业启动类 job.setJarByClass(WordCountDriverNew.class); // 设置Mapper类 job.setMapperClass(WordCountMapper.class); // 设置map任务输出键类型 job.setMapOutputKeyClass(Text.class); // 设置map任务输出值类型 job.setMapOutputValueClass(IntWritable.class); // 设置Reducer类 job.setReducerClass(WordCountReducer.class); // 设置reduce任务输出键类型 job.setOutputKeyClass(Text.class); // 设置reduce任务输出值类型 job.setOutputValueClass(IntWritable.class); // 设置分区数量(reduce任务的数量,结果文件的数量) job.setNumReduceTasks(3); // 定义uri字符串 String uri = "hdfs://master:9000"; // 声明输入目录 Path inputPath = null; // 声明输出目录 Path outputPath = null; // 判断输入参数个数 if (args.length == 0) { // 创建输入目录 inputPath = new Path(uri + "/wordcount/input"); // 创建输出目录 outputPath = new Path(uri + "/wordcount/output"); } else if (args.length == 2) { // 创建输入目录 inputPath = new Path(uri + args[0]); // 创建输出目录 outputPath = new Path(uri + args[1]); } else { // 提示用户参数个数不符合要求 System.out.println("参数个数不符合要求,要么是0个,要么是2个!"); // 结束应用程序 return; } // 获取文件系统 FileSystem fs = FileSystem.get(new URI(uri), conf); // 删除输出目录(第二个参数设置是否递归) fs.delete(outputPath, true); // 给作业添加输入目录(允许多个) FileInputFormat.addInputPath(job, inputPath); // 给作业设置输出目录(只能一个) FileOutputFormat.setOutputPath(job, outputPath); // 等待作业完成 job.waitForCompletion(true); // 输出统计结果 System.out.println("======统计结果======"); FileStatus[] fileStatuses = fs.listStatus(outputPath); for (int i = 1; i < fileStatuses.length; i++) { // 输出结果文件路径 System.out.println(fileStatuses[i].getPath()); // 获取文件系统数据字节输入流 FSDataInputStream in = fs.open(fileStatuses[i].getPath()); // 将结果文件显示在控制台 IOUtils.copyBytes(in, System.out, 4096, false); } } }
重新打包上传虚拟机并执行
重新打包
删除先前的jar包
上传新的单词文件
上传新的jar包
执行命令:hadoop jar MRWordCount-1.0-SNAPSHOT.jar net.hw.mr.WordCountDriverNew
不指定输入路径和输出路径参数
执行命令:hadoop jar MRWordCount-1.0-SNAPSHOT.jar net.hw.mr.WordCountDriverNew /winter/input /winter/output
指定输入路径和输出路径参数
执行命令:hadoop jar MRWordCount-1.0-SNAPSHOT.jar net.hw.mr.WordCountDriverNew /winter/input
指定输入路径参数,不指定输出路径参数
将三个类合并成一个类完成词频统计
在net.hw.mr包里创建WordCount类
package net.hw.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.net.URI; public class WordCount extends Configured implements Tool { public static class WordCountMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取行内容 String line = value.toString(); // 清洗所有英文标点符号(p——属性[property],P——标点符号[Punctuation]) line = line.replaceAll("[\pP]", ""); // 按空格拆分得到单词数组 String[] words = line.split(" "); // 遍历单词数组,生成输出键值对 for (int i = 0; i < words.length; i++) { context.write(new Text(words[i]), new IntWritable(1)); } } } public static class WordCountReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 定义输出键出现次数 int count = 0; // 历输出值迭代对象,统计其出现次数 for (IntWritable value : values) { count = count + value.get(); } // 生成键值对输出 context.write(key, new IntWritable(count)); } } @Override public int run(String[] strings) throws Exception { // 创建配置对象 Configuration conf = new Configuration(); // 设置数据节点主机名属性 conf.set("dfs.client.use.datanode.hostname", "true"); // 获取作业实例 Job job = Job.getInstance(conf); // 设置作业启动类 job.setJarByClass(WordCountDriver.class); // 设置Mapper类 job.setMapperClass(WordCountMapper.class); // 设置map任务输出键类型 job.setMapOutputKeyClass(Text.class); // 设置map任务输出值类型 job.setMapOutputValueClass(IntWritable.class); // 设置Reducer类 job.setReducerClass(WordCountReducer.class); // 设置reduce任务输出键类型 job.setOutputKeyClass(Text.class); // 设置reduce任务输出值类型 job.setOutputValueClass(IntWritable.class); // 设置分区数量(reduce任务的数量,结果文件的数量) job.setNumReduceTasks(3); // 定义uri字符串 String uri = "hdfs://master:9000"; // 创建输入目录 Path inputPath = new Path(uri + "/wordcount2/input"); // 创建输出目录 Path outputPath = new Path(uri + "/wordcount2/output"); // 获取文件系统 FileSystem fs = FileSystem.get(new URI(uri), conf); // 删除输出目录(第二个参数设置是否递归) fs.delete(outputPath, true); // 给作业添加输入目录(允许多个) FileInputFormat.addInputPath(job, inputPath); // 给作业设置输出目录(只能一个) FileOutputFormat.setOutputPath(job, outputPath); // 等待作业完成 boolean res = job.waitForCompletion(true); // 输出统计结果 System.out.println("======统计结果======"); FileStatus[] fileStatuses = fs.listStatus(outputPath); for (int i = 1; i < fileStatuses.length; i++) { // 输出结果文件路径 System.out.println(fileStatuses[i].getPath()); // 获取文件系统数据字节输入流 FSDataInputStream in = fs.open(fileStatuses[i].getPath()); // 将结果文件显示在控制台 IOUtils.copyBytes(in, System.out, 4096, false); } if (res) { return 0; } else { return -1; } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCount(), args); System.exit(res); } }
上传一个有标点符号的单词文件
运行代码,查看结果
将JDK版本降低到8
修改编译器配置文件
运行程序,查看结果
合并分区导致的多个结果文件
利用hadoop的-getmerge命令来完成:hdfs dfs -getmerge /wordcount/result part-r-final
统计不同单词数
利用cat -nu命令,带行号显示文件内容
利用wc -l命令,统计文件行数,即不同单词数