vlambda博客
学习文章列表

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化

在这里插入图片描述

一、pom

    <properties>
     <maven.compiler.source>1.8</maven.compiler.source>
     <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.10</scala.version>
        <spark.version>3.0.0</spark.version>
        <hadoop.version>3.2.1</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
   
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>
 
 
      <build>
    <pLuginManagement>
     <plugins>
      <!--编译scala的插件-- ->
      <pLugin>
       <groupId>net.alchim31.maven</groupId>
       <artifactId>scala-maven-plugin</artifactId>
       <version>3.2.2</version>
      </plugin>
      <!--编译java的插件-->
      <pLugin>
       <groupId>org.apache.maven.plugins</groupId>
       <artifactId>maven-compiler-plugin</artifactId>
       <version>3.5.1</version>
      </plugin>
     </plugins>
    </pLuginManagement>
   <plugins>
   <plugin>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <executions>
     <execution>
      <id>scala-compile-first</id>
      <phase>process-resources</phase>
      <goals>
       <goal>add-source</goal>
       <goal>compile</goal>
      </goals>
     </execution>
     <execution>
      <id>scala-test-compile</id>
      <phase>process-test-resources</phase>
      <goals>
       <goal>testCompile</goal>
      </goals>
     </execution>
    </executions>
   </plugin>



            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

二、Spark3.0-JavaAPI程序

实现Spark读取HDFS中的文本文件,实现单词计数,并2将结果输出到HDFS中。

2.1 java匿名实现类

// 1.创建配置
SparkConf conf = SparkConf().setAppName("JavaWordCount");

// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);

// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);

// 4.切分压平
JavaRDD<String> words = lines.flatMap(new FlatMapFuntion<String,String>(){
 @Override
 public Iterator<String> call(String line) throws Expection{
  return Arrays.asList(line.split(" ")).iterator;
 }
});

// 5.单词和1组合
JavaPairRDD<String,Integer> wordAndOne = words.maoToPair(new PairFunction<String,String,Integer>(){
  @Override
 public Tuple2<String,Integer> call(String words) throws Exception{
  return Tuple2.apply(word,1);
 }
});

// 6.分组聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer,Integer,Integer>(){
 @Override
 public Integer call(Integer v1,Integer v2) throws Exception{
  return v1+v2;
 }
});

// 7.交换kv顺序
JavaPairRDD<Integer,String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String,Integer>,Integer,String>(){
 @Override
 public Tuple2<Integer,String> call(Tuple2<String,Integer> tp) throws Exception{
  return tp.swap();
 }
});

// 8.排序
JavaPairRDD<Integer,String> sorted = swapped.sortedByKey(false);

// 9.交换kv顺序
JavaPairRDD<String,Integer> result = sorted.mapToPair(new PairFunction<Tuple2<String,Integer>,String,Integer>(){
 @Override
 public Tuple2<String,Integer> call(Tuple2<Integer,String> tp) throws Exception{
  return tp.swap();
 }
});

// 10.触发action保存到HDFS
result.saveAsTextFile(args[1]);

// 11.释放资源
jsc.stop();

2.2 Lambda表达式实现

// 1.创建配置
SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount");

// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);

// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);

// 4.切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator());

// 5.单词 1
JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1));

// 6.聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j);

// 7.排序
JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false)
 .mapToPair(tp -> tp.swap());

// 8.保存hdfs
sorted.saveAsTextFile(args[1]);

// 9.释放资源
jsc.stop();

2.3 程序打包

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化
在这里插入图片描述

2.4 上传到Linux

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化
在这里插入图片描述

2.5 启动HDFS

hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode

2.6 Spark执行jar包

bin/spark-3.0.0-bin-hadoop3.2/bin/spark-submit  --master  spark://hadoop1:7077,hadoop2:7077,hadoop3:7077  --executor-memory 1g  --total-executor-cores 5  --class com.wang.spark.LambdaJavaWordCount  /root/spark-in-active-1.0-SNAPSHOT.jar  hdfs://hadoop1:9000/wc  hdfs://hadoop1:9000/out
【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化
在这里插入图片描述

2.7 查看结果

hdfs -dfs -cat /out/*【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化

三、本机执行

本地测试,不会建立集群链接,再本地的一个进程运行。

// 1.创建配置【本地测试】
SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount").setMaster("local[*]");

// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);

// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);

// 4.切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator());

// 5.单词 1
JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1));

// 6.聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j);

// 7.排序
JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false)
 .mapToPair(tp -> tp.swap());

// 8.保存hdfs
sorted.saveAsTextFile(args[1]);

// 9.释放资源
jsc.stop();

【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化运行时候,传入参数本地数据或者hdfs的数据。
如果出现这个错误,需要将pom中的scala的** **放开 或者全部读取本机的文件。
执行结果如下: