package demo03;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.util.Arrays; import java.util.StringTokenizer;
/** * @author 张泰 * @Version v 1.0 * @date 2020/6/8 */ public class WordCount01 { //程序入口 public static void main(String[] args) throws Exception { //调用方法 depending(); } //设置job启动方法 public static void depending() throws Exception { //申请资源 Configration Configuration conf =new Configuration(); //排序 Job job2 = init2(conf, WCMapper2.class, WCReducer2.class); //获取url Job job1 = init1(conf, WCMapper.class, WCReducer.class); JobControl jobControl = new JobControl("groupName"); // List<ControlledJob> dependingJobs = new ArrayList<ControlledJob>(); //进行排序 ControlledJob controlledJob1 = new ControlledJob(conf); controlledJob1.setJob(job1); // dependingJobs.add(controlledJob1); //排序 ControlledJob controlledJob2 = new ControlledJob(conf); controlledJob2.setJob(job2); controlledJob2.addDependingJob(controlledJob1); jobControl.addJob(controlledJob2); jobControl.addJob(controlledJob1); //使用线程管理 Thread jcThread = new Thread(jobControl); jcThread.start(); while (true) { if (jobControl.allFinished()) { System.out.println(jobControl.getSuccessfulJobList()); jobControl.stop(); break; } if (jobControl.getFailedJobList().size() > 0) { System.out.println(jobControl.getFailedJobList()); jobControl.stop(); break; } }
FileSystem fs = FileSystem.get(conf); boolean ret = fs.deleteOnExit(new Path("")); System.out.println(ret); } //job1 public static Job init1(Configuration conf,Class o1,Class o2) throws IOException, ClassNotFoundException, InterruptedException { //实例化job Job job = Job.getInstance(conf, "Mr1"); //设置集群可用 job.setJarByClass(WordCount01.class); //设置覆盖函数类型 job.setMapperClass(o1); job.setCombinerClass(o2); job.setReducerClass(o2); //设置函数输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置输入输出文件路径 FileInputFormat.addInputPath(job,new Path("")); FileOutputFormat.setOutputPath(job,new Path("")); return job; } //job1 public static Job init2(Configuration conf,Class o1,Class o2) throws IOException, ClassNotFoundException, InterruptedException { //实例化job Job job = Job.getInstance(conf, "Mr2"); //设置集群可用 job.setJarByClass(WordCount01.class); //设置覆盖函数类型 job.setMapperClass(o1); job.setCombinerClass(o2); job.setReducerClass(o2); //设置函数输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置二次排序方式 job.setSortComparatorClass(TextIntComparator.class); //设置输入输出文件路径 FileInputFormat.addInputPath(job,new Path("")); FileOutputFormat.setOutputPath(job,new Path("")); return job; } //二次排序 public static class TextIntComparator implements RawComparator { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int n1 = WritableUtils.decodeVIntSize(b1[s1]); int n2 = WritableUtils.decodeVIntSize(b2[s2]);
// Text的比较是这么实现的 // WritableComparator.compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2); // 其实完全可以这么干 byte[] _b1 = Arrays.copyOfRange(b1, s1 + n1, s1 + l1); byte[] _b2 = Arrays.copyOfRange(b2, s2 + n2, s2 + l2); String t1 = new String(_b1); String t2 = new String(_b2); return compare(new Text(t1), new Text(t2)); }
@Override public int compare(Object o1, Object o2) { return o1.toString().compareTo(o2.toString()); } } //mapper1任务 public static class WCMapper extends Mapper<Object, Text,Text, IntWritable>{ //定义一个不可变常量用于当做站位符 private static final IntWritable one =new IntWritable(1); //实例化text对象 private Text word=new Text();
/** * * @param key 行首偏移量 * @param value 每行内容 * @param context 实例资源 * @throws IOException 抛出异常 * @throws InterruptedException */ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //实例化java自带切割方法 StringTokenizer itr = new StringTokenizer(value.toString()); //循环 while (itr.hasMoreElements()){ //word保存值 word.set(itr.nextToken()); //输出 context.write(word,one); } } }
//reduce1任务 public static class WCReducer extends Reducer<Text, IntWritable,Text, IntWritable>{ //实例化变量用于保存结果值 private IntWritable results =new IntWritable();
/** * * @param key 每个文本内容 * @param values 1的集合 * @param context 实例资源 * @throws IOException 抛出异常 * @throws InterruptedException 抛出异常 */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定义一个计数器 int num =0; //遍历 for (IntWritable val : values) { //累加 num+=val.get(); } //保存结果值 results.set(num); //输出 context.write(key,results); } } //mapper2任务 public static class WCMapper2 extends Mapper<Object, Text,Text, IntWritable>{ //定义一个不可变常量用于当做站位符 private static final IntWritable one =new IntWritable(1); //实例化text对象 private Text word=new Text();
/** * * @param key 行首偏移量 * @param value 每行内容 * @param context 实例资源 * @throws IOException 抛出异常 * @throws InterruptedException */ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
} }
//reduce2任务 public static class WCReducer2 extends Reducer<Text, IntWritable,Text, IntWritable>{ //实例化变量用于保存结果值 private IntWritable results =new IntWritable();
/** * * @param key 每个文本内容 * @param values 1的集合 * @param context 实例资源 * @throws IOException 抛出异常 * @throws InterruptedException 抛出异常 */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
} } }
|