vlambda博客
学习文章列表

完成功能--MapReduce驱动两个job、二次排序、求Url的topN

“脑子里想法,总是想去代码实现



//用mapreduce来实现下面需求?
//现在有10个文件夹,每个文件夹都有1000000个url.现在让你找出top1000000url。
//方法一:
// 运用2个job,第一个job直接用filesystem读取10个文件夹作为map输入,url做key,
reduce计算url的sum,
// 下一个job map用url作key,运用sum作二次排序,reduce中取top10000000
// 1:首先进行wordcount计算
// 2:进行二次排序
// 如何启动两个job代码如下:




01



实现二次排序RawComparator接口



他与WritableComparator和Deserializer的关系图


02


开发源码(job2开发只提供了框架)


package demo03;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Arrays;
import java.util.StringTokenizer;

/**
*
@author 张泰
*
@Version v 1.0
*
@date 2020/6/8
*/
public class WordCount01 {
//程序入口
public static void main(String[] args) throws Exception {
//调用方法
depending();
}
//设置job启动方法
public static void depending() throws Exception {
//申请资源 Configration
Configuration conf =new Configuration();
//排序
Job job2 = init2(conf, WCMapper2.class, WCReducer2.class);
//获取url
Job job1 = init1(conf, WCMapper.class, WCReducer.class);
JobControl jobControl = new JobControl("groupName");
// List<ControlledJob> dependingJobs = new ArrayList<ControlledJob>();
//进行排序
ControlledJob controlledJob1 = new ControlledJob(conf);
controlledJob1.setJob(job1);
// dependingJobs.add(controlledJob1);
//排序
ControlledJob controlledJob2 = new ControlledJob(conf);
controlledJob2.setJob(job2);
controlledJob2.addDependingJob(controlledJob1);
jobControl.addJob(controlledJob2);
jobControl.addJob(controlledJob1);
//使用线程管理
Thread jcThread = new Thread(jobControl);
jcThread.start();
while (true) {
if (jobControl.allFinished()) {
System.out.println(jobControl.getSuccessfulJobList());
jobControl.stop();
break;
}
if (jobControl.getFailedJobList().size() > 0) {
System.out.println(jobControl.getFailedJobList());
jobControl.stop();
break;
}
}

FileSystem fs = FileSystem.get(conf);
boolean ret = fs.deleteOnExit(new Path(""));
System.out.println(ret);
}
//job1
public static Job init1(Configuration conf,Class o1,Class o2)
throws IOException, ClassNotFoundException, InterruptedException {
//实例化job
Job job = Job.getInstance(conf, "Mr1");
//设置集群可用
job.setJarByClass(WordCount01.class);
//设置覆盖函数类型
job.setMapperClass(o1);
job.setCombinerClass(o2);
job.setReducerClass(o2);
//设置函数输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出文件路径
FileInputFormat.addInputPath(job,new Path(""));
FileOutputFormat.setOutputPath(job,new Path(""));
return job;
}
//job1
public static Job init2(Configuration conf,Class o1,Class o2)
 throws IOException, ClassNotFoundException, InterruptedException {
//实例化job
Job job = Job.getInstance(conf, "Mr2");
//设置集群可用
job.setJarByClass(WordCount01.class);
//设置覆盖函数类型
job.setMapperClass(o1);
job.setCombinerClass(o2);
job.setReducerClass(o2);
//设置函数输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置二次排序方式
job.setSortComparatorClass(TextIntComparator.class);
//设置输入输出文件路径
FileInputFormat.addInputPath(job,new Path(""));
FileOutputFormat.setOutputPath(job,new Path(""));
return job;
}
//二次排序
public static class TextIntComparator implements RawComparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);

// Text的比较是这么实现的
// WritableComparator.compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
// 其实完全可以这么干
byte[] _b1 = Arrays.copyOfRange(b1, s1 + n1, s1 + l1);
byte[] _b2 = Arrays.copyOfRange(b2, s2 + n2, s2 + l2);
String t1 = new String(_b1);
String t2 = new String(_b2);
return compare(new Text(t1), new Text(t2));
}

@Override
public int compare(Object o1, Object o2) {
return o1.toString().compareTo(o2.toString());
}
}
//mapper1任务
public static class WCMapper extends Mapper<Object, Text,Text, IntWritable>{
//定义一个不可变常量用于当做站位符
private static final IntWritable one =new IntWritable(1);
//实例化text对象
private Text word=new Text();

/**
*
*
@param key 行首偏移量
*
@param value 每行内容
*
@param context 实例资源
*
@throws IOException 抛出异常
*
@throws InterruptedException
*/
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//实例化java自带切割方法
StringTokenizer itr = new StringTokenizer(value.toString());
//循环
while (itr.hasMoreElements()){
//word保存值
word.set(itr.nextToken());
//输出
context.write(word,one);
}
}
}

//reduce1任务
public static class WCReducer
extends Reducer<Text, IntWritable,Text, IntWritable>{
//实例化变量用于保存结果值
private IntWritable results =new IntWritable();

/**
*
*
@param key 每个文本内容
*
@param values 1的集合
*
@param context 实例资源
*
@throws IOException 抛出异常
*
@throws InterruptedException 抛出异常
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//定义一个计数器
int num =0;
//遍历
for (IntWritable val : values) {
//累加
num+=val.get();
}
//保存结果值
results.set(num);
//输出
context.write(key,results);
}
}
//mapper2任务
public static class WCMapper2 extends Mapper<Object, Text,Text, IntWritable>{
//定义一个不可变常量用于当做站位符
private static final IntWritable one =new IntWritable(1);
//实例化text对象
private Text word=new Text();

/**
*
*
@param key 行首偏移量
*
@param value 每行内容
*
@param context 实例资源
*
@throws IOException 抛出异常
*
@throws InterruptedException
*/
@Override
protected void map(Object key, Text value, Context context) throws IOException,
 InterruptedException {

}
}

//reduce2任务
public static class WCReducer2 extends
Reducer<Text, IntWritable,Text, IntWritable>{
//实例化变量用于保存结果值
private IntWritable results =new IntWritable();

/**
*
*
@param key 每个文本内容
*
@param values 1的集合
*
@param context 实例资源
*
@throws IOException 抛出异常
*
@throws InterruptedException 抛出异常
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {

}
}
}

其他方式也是可以实现的

//方法二:
// 建hive表A,挂分区channel,每个文件夹是一个分区.
//select x.url,x.c from(select url,count(1) as c from
A where channel ='' group by url) x order by x.c desc limit 1000000;
//
// 还可以用treeMap, 到1000000了每来一个都加进去, 删掉最小的