vlambda博客
学习文章列表

高频面试题(三)[Scala/Spark]

一、Scala

1. java和Scala的区别?

可以说scala来源于java,但又高于java,我的理解是scala就是在java语言的基础上增加了一层编码的 “壳” 让程序人员可以通过函数式编程的方式来开发程序。由于scala最终被编译为.class文件运行在JVM虚拟机中,其实本质上还是java, 所以在scala和java可以互调双方的api;

区别:

(1)变量的声明

变量var 常量val scala支持自动类型推测

scala更多的是采用常量,而不是变量来解决问题,这样带来的好处是可以减少多线程并发安全问题,特别适合用于多并发分布式的场景

(2)函数的声明

关键字def, Scala函数没有返回值用Unit,相当于java的void

Scala支持函数式编程,可以使用高阶函数,函数是一等公民

偏函数、匿名函数、高阶函数、闭包、柯里化、控制抽象等

(3)基本类型

其实可以认为scala中并没有真正意义上的基本类型,他的类型都是类

(4)静态

java中静态static是违背java面向对象的编程思想和封装特性的,scala取消了静态的概念,使用了单例对象Object来实现

(5)对字符串的支持

Scala支持使用字符串插值的方式对字符串进行格式化,使用$开头进行取值

另外支持使用三引号将其中的内容直接包起来,其中可以包括任何字符而不需要转义

(6)类

Scala类中的字段自动带有getter和setter方法,另外可以使用@BeanProperty注解来生成java中的Get/Set方法

Scala中的每个类都有一个主构造器,这个构造器和类定义”交织在一起”,类的参数直接成为类的字段,主构造器执行类体中所有的语句

(7)Scala中不支持Break

import util.control.Breaks._

用breakable代码块包住循环体后再使用break中断:breakable{循环}

(8)访问范围问题

java中外部看不到内部,内部能看到外部

scala中外部看不到内部,内部看不到外部

(9)通配符

Java使用*进行通配,Scala使用 _ 进行通配

(10)默认导入的类

scala默认导入java.lang包、scala包、scala.Predef类。

java默认导入java.lang包

(11)特质 trait

可以类比java中的接口,但是又和接口非常不一样

java中称为类实现了接口,scala中称为混入了特质

和java中的接口不同 scala中的特质可以包含 带有方法体的方法

(12)简写

省略规则:

  • 最后一行是返回值,return可省略
  • 没有形参,()可省略
  • 没有返回值,=可省略
  • 只有一行代码,{}可省略
  • 匿名函数:()->println("hello")

原文链接:https://www.jianshu.com/p/2df9ca9f49ab

2. scala的特质和java的接口区别?

Scala的特质trait具有java中的接口和类的部分特性,如:

  • Scala的特质能够提供具体方法的实现,而java的接口只有方法的定义,这一点很像java的抽象类

  • Scala同Java,都不能进行多继承,但是前者可以实现多特质,用with关键字。这一点和java的接口相同

  • Scala的特质能在对象生成时临时加入,java则没有这个特质

3. scala闭包和柯里化理解?

1)闭包

闭包是一个函数,它的返回值取决于此函数之外声明一个或多个变量的值

一个函数和与其相关的引用环境(变量/值)组合的一个整体(实体),闭包因为可以保留上次引用的某个值,所以我们传入一次就可以反复使用

 def main(args: Array[String]): Unit = {
    //使用
    val f = makeSuffix(".jpg")
    println(f("小猫.jpg")) // 小猫.jpg
    println(f("小狗")) // 小狗.jpg
    }
    /*
    1) 编写一个函数 makeSuffix(suffix: String)  可以接收一个文件后缀名(比如.jpg),并返回一个闭包
    2) 调用闭包,可以传入一个文件名,如果该文件名没有指定的后缀(比如.jpg) ,则返回 文件名.jpg , 如果已经有.jpg后缀,则返回原文件名。
     */

    def makeSuffix(suffix: String) = {
    (fileName:String) => {
      if (fileName.endsWith(suffix)) fileName else fileName + suffix
    }
  }

2)柯里化

接受多个参数的函数都可以转化为接受单个参数的函数

它把接受多个参数的方法变换成接受一个单一参数的函数,并且返回接受余下的参数的新函数

//传统
def mul(x: Int, y: Int) = x * y
println(mul(1010))
//闭包
def mulCurry(x: Int) = (y: Int) => x * y
println(mulCurry(10)(9))
//柯里化
def mulCurry2(x: Int)(y:Int) = x * y
println(mulCurry2(10)(8))

4. Scala里面的函数和方法有什么区别

原文链接:

https://www.runoob.com/w3cnote/scala-different-function-method.html

https://blog.csdn.net/tototuzuoquan/article/details/73823995

5. Scala有没有多继承?可以实现多继承么?

Scala同Java,都不能进行多继承,但是前者可以实现多特质,用with关键字

6. Scala尾递归(高端题)

原文链接:https://blog.csdn.net/u013007900/article/details/79111974

二、Spark

1.rdd的checkpoint、cache

缓存(cache/persist)

cache和persist其实是RDD的两个API,并且cache底层调用的就是persist,区别之一就在于cache不能显示指定缓存方式,只能缓存在内存中,但是persist可以通过指定缓存方式,比如显示指定缓存在内存中、内存和磁盘并且序列化等。通过RDD的缓存,后续可以对此RDD或者是基于此RDD衍生出的其他的RDD处理中重用这些缓存的数据集,不会切断血缘

容错(checkpoint)

checkpoint与cache/persist对比:

1)都是lazy操作,只有action算子触发后才会真正进行缓存或checkpoint操作(懒加载操作是Spark任务很重要的一个特性,不仅适用于SparkRDD还适用于Sparksql等组件)

2)cache只是缓存数据,但不改变lineage。通常存于内存,丢失数据可能性更大

3)checkpoint改变原有lineage,生成新的CheckpointRDD。通常存于hdfs,高可用且更可

4)Checkpoint会单独再启动一个job,负责写入数据到hdfs中

2.topn的具体步骤

val sortWord=rdd.flatMap(_.split("")).map(x=>(x,1)).reduceByKey(_+_).sortBy(x=>x._2,false).take(10).foreach(println)

3. Spark shuffle

参考     1.8章节

4. RDD特性

1)分区

逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。

2)只读

RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD

3)依赖

RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖,包含宽依赖,窄依赖

4)缓存

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用

5)CheckPoint

RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。

5. 算子分为哪几类(RDD支持哪几种类型的操作)

转换(Transformation) 现有的RDD通过转换生成一个新的RDD。lazy模式,延迟执行。

转换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce等等。

动作(Action) 在RDD上运行计算,并返回结果给驱动程序(Driver)或写入文件系统。

动作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。collect 该方法把数据收集到driver端Array数组类型

所有的transformation只有遇到action才能被执行。

当触发执行action之后,数据类型不再是rdd了,数据就会存储到指定文件系统中,或者直接打印结 果或者收集起来。

6. 创建rdd的几种方式

1)集合并行化创建

val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val rdd =sc.makeRDD(arr)

2)读取外部文件系统,如hdfs,或者读取本地文件(最常用的方式)

val rdd2 = sc.textFile("hdfs://hdp-01:9000/words.txt")

// 读取本地文件
val rdd2 = sc.textFile(“file:///root/words.txt”)

3)从父RDD转换成新的子RDD

调用Transformation类的方法,生成新的RDD

7. Spark的任务划分

RDD任务切分组件分为:Application、Job、Stage和Task

  • Application:初始化一个SparkContext即生成一个Application
  • Job:一个Action算子就会生成一个Job
  • Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
  • Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。

8. Spark任务提交流程

参考     1.6~1.7章节

9. sortBy 和 sortByKey的区别

sortBy既可以作用于RDD[K] ,还可以作用于RDD[(k,v)]

sortByKey只能作用于 RDD[K,V] 类型上。

10. reduceByKey和groupBykey的区别

reduceByKey会传一个聚合函数, 相当于 groupByKey+ mapValues

reduceByKey 会有一个分区内聚合,而groupByKey没有

结论:reduceByKey有分区内聚合,更高效,优先选择使用reduceByKey。

11. RDD,DataFrame,DataSet的共性与区别?

共性:

1)RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利

2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算。

3)三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。

4)三者都有partition的概念

5)三者有许多共同的函数,如filter,排序等

6)在对DataFrame和Dataset进行操作许多操作都需要import spark.implicits._

7)DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

区别:

RDD

1)RDD一般和spark mlib同时使用

2)RDD不支持sparksql操作

3)编译时类型安全

DF

1)与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析,使用getAs或模式匹配获取字段值。

df.foreach{

line =>

val col1=line.getAs[String]("name")

val col2=line.getAs[Long]("age")

println(col1+":"+col2)

}

2)DataFrame与Dataset一般不与spark mlib同时使用

3)DataFrame与Dataset均支持sparksql的操作

4)DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头

//保存
val saveoptions = Map("header" -> "true""delimiter" -> "\t""path" -> "hdfs://hadoop102:9000/test")
datawDF.write.format("com.woodie.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//读取
val options = Map("header" -> "true""delimiter" -> "\t""path" -> "hdfs://hadoop102:9000/test")
val datarDF= spark.read.options(options).format("com.woodie.spark.csv").load()

DS

1)Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。

2)DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

ds.map{
line=>
println(line.col1)
println(line.col2)
}

12. 如果对RDD进行cache操作后,数据在哪里?

数据在第一执行cache算子时会被加载到各个Executor进程的内存中,第二次就会直接从内存中读取而不会去磁盘。

13. Flink与SparkStreaming做实时的区别

SparkStreaming是基于微批处理的,所以他采用DirectDstream的方式根据计算出的每个partition要取数据的Offset范围,拉取一批数据形成Rdd进行批量处理,而且该Rdd和kafka的分区是一一对应的;

Flink是真正的流处理,他是基于事件触发机制进行处理,在KafkaConsumer拉取一批数据以后,Flink将其经过处理之后变成逐个Record发送的事件触发式的流处理;

Flink支持动态发现新增topic或者新增partition,而SparkStreaming和0.8版本的kafka结合是不支持的,后来跟0.10版本的kafka结合的时候支持了

14. SparkContext的作用

加载配置文件,然后创建SparkEnv,TaskScheduler,DAGScheduler

原文链接:https://blog.csdn.net/duguwanglong/article/details/104448719

15. Sparkstreaming读取kafka数据为什么选择直连方式

1.createDirectStream的方式从Kafka集群中读取数据,并且在SparkStreaming系统里面维护偏移量相关的信息,实现零数据丢失,保证不重复消费,比createStream更高效;

2.创建的DStream的rdd的partition做到了和Kafka中topic的partition一一对应。

但是采用直连(createDirectStream)的方式,就不再向zookeeper中更新offset信息。因此,在采用直连的方式消费kafka中的数据的时候,大体思路是首先获取保存在zookeeper中的偏移量信息,根据偏移量信息去创建stream,消费数据后再把当前的偏移量写入zookeeper中。

原文链接:https://blog.csdn.net/zryowen123/article/details/78360164

16. Spark为何比MapReduce更快?

  • 1 spark的task是线程,启动更快;mr的task是进程
  • 2 spark的很多操作是在内存进行,只有shuffle操作才会把数据落盘;mr的很多操作,包括shuffle,会把数据落盘
  • 3 spark的shuffle阶段对中间结果文件建立有索引文件,读取更快;mr对中间文件没有建立索引文件;
  • 4 spark的shuffle阶段启用bypass时不会对中间结果文件进行排序;mr的shuffle阶段包含3次排序;
  • 5 spark可以对反复用到的数据进行缓存,避免多次加载花费时间;mr不能把多次用到的数据缓存起来
  • 6 Spark对于DAG进行了高度的优化,具体在于Spark划分了不同的stage和使用了延迟计算技术
  • 7 Spark对于反复用到的数据进行了缓存

17. Spark堆内内存和堆外内存

堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放

堆内内存大小:由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。

Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间

堆外内存

1)通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小

2)为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据

3)Executor的堆外内存主要用于程序的共享库、Perm Space、 线程Stack和一些Memory mapping等, 或者类C方式allocate object

参考     1.9章节

18. Spark优化

一、常规性能

1)最优资源配置(将任务分配的资源调节到可以使用的最大限度)

  • 1.提高task并行度

  • 2.增加内存量

2)程序优化

  • 1.RDD复用:RDD持久化、副本机制、缓存

  • 2.RDD数据裁剪

  • 3.广播大变量

  • 4.Kryo序列化

  • 5.调节本地化等待时长

  • 6.算子调优:mapPartition、foreachPartition、coalesce/reparation重新分区、reduceByKey本地聚合

3)Shuffle调优

  • 1.Map端缓冲区大小

  • 2.Reduce端拉取数据缓冲区大小

  • 3.Reduce端拉取数据重试次数/等待间隔

  • 4.SortShuffle排序操作阈值

4)JVM调优

  • 1.静态内存管理机制若GC频繁,可提升Excution内存

  • 2.shuffle output file cannot find,executor lost,task lost,out of memory时,调节Excution堆外内存

  • 3.调节连接等待时长

5)数据倾斜

  • 1.过滤导致倾斜的key

  • 2.提高shuffle操作中的reduce并行度(spark.sql.shuffle.partitions/reduceByKey(500))

  • 3.随机key实现双重聚合

  • 4.mapjoin,广播变量+map

  • 5.单独处理倾斜的key

参考