高频面试题(三)[Scala/Spark]
一、Scala
1. java和Scala的区别?
可以说scala来源于java,但又高于java,我的理解是scala就是在java语言的基础上增加了一层编码的 “壳” 让程序人员可以通过函数式编程的方式来开发程序。由于scala最终被编译为.class文件运行在JVM虚拟机中,其实本质上还是java, 所以在scala和java可以互调双方的api;
区别:
(1)变量的声明
变量var 常量val scala支持自动类型推测
scala更多的是采用常量,而不是变量来解决问题,这样带来的好处是可以减少多线程并发安全问题,特别适合用于多并发分布式的场景
(2)函数的声明
关键字def, Scala函数没有返回值用Unit,相当于java的void
Scala支持函数式编程,可以使用高阶函数,函数是一等公民
偏函数、匿名函数、高阶函数、闭包、柯里化、控制抽象等
(3)基本类型
其实可以认为scala中并没有真正意义上的基本类型,他的类型都是类
(4)静态
java中静态static是违背java面向对象的编程思想和封装特性的,scala取消了静态的概念,使用了单例对象Object来实现
(5)对字符串的支持
Scala支持使用字符串插值的方式对字符串进行格式化,使用$开头进行取值
另外支持使用三引号将其中的内容直接包起来,其中可以包括任何字符而不需要转义
(6)类
Scala类中的字段自动带有getter和setter方法,另外可以使用@BeanProperty注解来生成java中的Get/Set方法
Scala中的每个类都有一个主构造器,这个构造器和类定义”交织在一起”,类的参数直接成为类的字段,主构造器执行类体中所有的语句
(7)Scala中不支持Break
import util.control.Breaks._
用breakable代码块包住循环体后再使用break中断:breakable{循环}
(8)访问范围问题
java中外部看不到内部,内部能看到外部
scala中外部看不到内部,内部看不到外部
(9)通配符
Java使用*进行通配,Scala使用 _ 进行通配
(10)默认导入的类
scala默认导入java.lang包、scala包、scala.Predef类。
java默认导入java.lang包
(11)特质 trait
可以类比java中的接口,但是又和接口非常不一样
java中称为类实现了接口,scala中称为混入了特质
和java中的接口不同 scala中的特质可以包含 带有方法体的方法
(12)简写
省略规则:
-
最后一行是返回值,return可省略 -
没有形参,()可省略 -
没有返回值,=可省略 -
只有一行代码,{}可省略 -
匿名函数:()->println("hello")
原文链接:https://www.jianshu.com/p/2df9ca9f49ab
2. scala的特质和java的接口区别?
Scala的特质trait具有java中的接口和类的部分特性,如:
-
Scala的特质能够提供具体方法的实现,而java的接口只有方法的定义,这一点很像java的抽象类
-
Scala同Java,都不能进行多继承,但是前者可以实现多特质,用with关键字。这一点和java的接口相同
-
Scala的特质能在对象生成时临时加入,java则没有这个特质
3. scala闭包和柯里化理解?
1)闭包
闭包是一个函数,它的返回值取决于此函数之外声明一个或多个变量的值
一个函数和与其相关的引用环境(变量/值)组合的一个整体(实体),闭包因为可以保留上次引用的某个值,所以我们传入一次就可以反复使用
def main(args: Array[String]): Unit = {
//使用
val f = makeSuffix(".jpg")
println(f("小猫.jpg")) // 小猫.jpg
println(f("小狗")) // 小狗.jpg
}
/*
1) 编写一个函数 makeSuffix(suffix: String) 可以接收一个文件后缀名(比如.jpg),并返回一个闭包
2) 调用闭包,可以传入一个文件名,如果该文件名没有指定的后缀(比如.jpg) ,则返回 文件名.jpg , 如果已经有.jpg后缀,则返回原文件名。
*/
def makeSuffix(suffix: String) = {
(fileName:String) => {
if (fileName.endsWith(suffix)) fileName else fileName + suffix
}
}
2)柯里化
接受多个参数的函数都可以转化为接受单个参数的函数
它把接受多个参数的方法变换成接受一个单一参数的函数,并且返回接受余下的参数的新函数
//传统
def mul(x: Int, y: Int) = x * y
println(mul(10, 10))
//闭包
def mulCurry(x: Int) = (y: Int) => x * y
println(mulCurry(10)(9))
//柯里化
def mulCurry2(x: Int)(y:Int) = x * y
println(mulCurry2(10)(8))
4. Scala里面的函数和方法有什么区别
原文链接:
https://www.runoob.com/w3cnote/scala-different-function-method.html
https://blog.csdn.net/tototuzuoquan/article/details/73823995
5. Scala有没有多继承?可以实现多继承么?
Scala同Java,都不能进行多继承,但是前者可以实现多特质,用with关键字
6. Scala尾递归(高端题)
原文链接:https://blog.csdn.net/u013007900/article/details/79111974
二、Spark
1.rdd的checkpoint、cache
缓存(cache/persist)
cache和persist其实是RDD的两个API,并且cache底层调用的就是persist,区别之一就在于cache不能显示指定缓存方式,只能缓存在内存中,但是persist可以通过指定缓存方式,比如显示指定缓存在内存中、内存和磁盘并且序列化等。通过RDD的缓存,后续可以对此RDD或者是基于此RDD衍生出的其他的RDD处理中重用这些缓存的数据集,不会切断血缘
容错(checkpoint)
checkpoint与cache/persist对比:
1)都是lazy操作,只有action算子触发后才会真正进行缓存或checkpoint操作(懒加载操作是Spark任务很重要的一个特性,不仅适用于SparkRDD还适用于Sparksql等组件)
2)cache只是缓存数据,但不改变lineage。通常存于内存,丢失数据可能性更大
3)checkpoint改变原有lineage,生成新的CheckpointRDD。通常存于hdfs,高可用且更可
4)Checkpoint会单独再启动一个job,负责写入数据到hdfs中
2.topn的具体步骤
val sortWord=rdd.flatMap(_.split("")).map(x=>(x,1)).reduceByKey(_+_).sortBy(x=>x._2,false).take(10).foreach(println)
3. Spark shuffle
参考 1.8章节
4. RDD特性
1)分区
逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。
2)只读
RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD
3)依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖,包含宽依赖,窄依赖
4)缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用
5)CheckPoint
RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
5. 算子分为哪几类(RDD支持哪几种类型的操作)
转换(Transformation) 现有的RDD通过转换生成一个新的RDD。lazy模式,延迟执行。
转换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce等等。
动作(Action) 在RDD上运行计算,并返回结果给驱动程序(Driver)或写入文件系统。
动作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。collect 该方法把数据收集到driver端Array数组类型
所有的transformation只有遇到action才能被执行。
当触发执行action之后,数据类型不再是rdd了,数据就会存储到指定文件系统中,或者直接打印结 果或者收集起来。
6. 创建rdd的几种方式
1)集合并行化创建
val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val rdd =sc.makeRDD(arr)
2)读取外部文件系统,如hdfs,或者读取本地文件(最常用的方式)
val rdd2 = sc.textFile("hdfs://hdp-01:9000/words.txt")
// 读取本地文件
val rdd2 = sc.textFile(“file:///root/words.txt”)
3)从父RDD转换成新的子RDD
调用Transformation类的方法,生成新的RDD
7. Spark的任务划分
RDD任务切分组件分为:Application、Job、Stage和Task
-
Application:初始化一个SparkContext即生成一个Application -
Job:一个Action算子就会生成一个Job -
Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。 -
Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。
8. Spark任务提交流程
参考 1.6~1.7章节
9. sortBy 和 sortByKey的区别
sortBy既可以作用于RDD[K] ,还可以作用于RDD[(k,v)]
sortByKey只能作用于 RDD[K,V] 类型上。
10. reduceByKey和groupBykey的区别
reduceByKey会传一个聚合函数, 相当于 groupByKey+ mapValues
reduceByKey 会有一个分区内聚合,而groupByKey没有
结论:reduceByKey有分区内聚合,更高效,优先选择使用reduceByKey。
11. RDD,DataFrame,DataSet的共性与区别?
共性:
1)RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算。
3)三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。
4)三者都有partition的概念
5)三者有许多共同的函数,如filter,排序等
6)在对DataFrame和Dataset进行操作许多操作都需要import spark.implicits._
7)DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型
区别:
RDD
1)RDD一般和spark mlib同时使用
2)RDD不支持sparksql操作
3)编译时类型安全
DF
1)与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析,使用getAs或模式匹配获取字段值。
df.foreach{
line =>
val col1=line.getAs[String]("name")
val col2=line.getAs[Long]("age")
println(col1+":"+col2)
}
2)DataFrame与Dataset一般不与spark mlib同时使用
3)DataFrame与Dataset均支持sparksql的操作
4)DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头
//保存
val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")
datawDF.write.format("com.woodie.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//读取
val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")
val datarDF= spark.read.options(options).format("com.woodie.spark.csv").load()
DS
1)Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。
2)DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息
ds.map{
line=>
println(line.col1)
println(line.col2)
}
12. 如果对RDD进行cache操作后,数据在哪里?
数据在第一执行cache算子时会被加载到各个Executor进程的内存中,第二次就会直接从内存中读取而不会去磁盘。
13. Flink与SparkStreaming做实时的区别
SparkStreaming是基于微批处理的,所以他采用DirectDstream的方式根据计算出的每个partition要取数据的Offset范围,拉取一批数据形成Rdd进行批量处理,而且该Rdd和kafka的分区是一一对应的;
Flink是真正的流处理,他是基于事件触发机制进行处理,在KafkaConsumer拉取一批数据以后,Flink将其经过处理之后变成逐个Record发送的事件触发式的流处理;
Flink支持动态发现新增topic或者新增partition,而SparkStreaming和0.8版本的kafka结合是不支持的,后来跟0.10版本的kafka结合的时候支持了
14. SparkContext的作用
加载配置文件,然后创建SparkEnv,TaskScheduler,DAGScheduler
原文链接:https://blog.csdn.net/duguwanglong/article/details/104448719
15. Sparkstreaming读取kafka数据为什么选择直连方式
1.createDirectStream的方式从Kafka集群中读取数据,并且在SparkStreaming系统里面维护偏移量相关的信息,实现零数据丢失,保证不重复消费,比createStream更高效;
2.创建的DStream的rdd的partition做到了和Kafka中topic的partition一一对应。
但是采用直连(createDirectStream)的方式,就不再向zookeeper中更新offset信息。因此,在采用直连的方式消费kafka中的数据的时候,大体思路是首先获取保存在zookeeper中的偏移量信息,根据偏移量信息去创建stream,消费数据后再把当前的偏移量写入zookeeper中。
原文链接:https://blog.csdn.net/zryowen123/article/details/78360164
16. Spark为何比MapReduce更快?
-
1 spark的task是线程,启动更快;mr的task是进程 -
2 spark的很多操作是在内存进行,只有shuffle操作才会把数据落盘;mr的很多操作,包括shuffle,会把数据落盘 -
3 spark的shuffle阶段对中间结果文件建立有索引文件,读取更快;mr对中间文件没有建立索引文件; -
4 spark的shuffle阶段启用bypass时不会对中间结果文件进行排序;mr的shuffle阶段包含3次排序; -
5 spark可以对反复用到的数据进行缓存,避免多次加载花费时间;mr不能把多次用到的数据缓存起来 -
6 Spark对于DAG进行了高度的优化,具体在于Spark划分了不同的stage和使用了延迟计算技术 -
7 Spark对于反复用到的数据进行了缓存
17. Spark堆内内存和堆外内存
堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放
堆内内存大小:由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。
Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间
堆外内存
1)通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小
2)为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据
3)Executor的堆外内存主要用于程序的共享库、Perm Space、 线程Stack和一些Memory mapping等, 或者类C方式allocate object
参考 1.9章节
18. Spark优化
一、常规性能
1)最优资源配置(将任务分配的资源调节到可以使用的最大限度)
-
1.提高task并行度
-
2.增加内存量
2)程序优化
-
1.RDD复用:RDD持久化、副本机制、缓存
-
2.RDD数据裁剪
-
3.广播大变量
-
4.Kryo序列化
-
5.调节本地化等待时长
-
6.算子调优:mapPartition、foreachPartition、coalesce/reparation重新分区、reduceByKey本地聚合
3)Shuffle调优
-
1.Map端缓冲区大小
-
2.Reduce端拉取数据缓冲区大小
-
3.Reduce端拉取数据重试次数/等待间隔
-
4.SortShuffle排序操作阈值
4)JVM调优
-
1.静态内存管理机制若GC频繁,可提升Excution内存
-
2.shuffle output file cannot find,executor lost,task lost,out of memory时,调节Excution堆外内存
-
3.调节连接等待时长
5)数据倾斜
-
1.过滤导致倾斜的key
-
2.提高shuffle操作中的reduce并行度(spark.sql.shuffle.partitions/reduceByKey(500))
-
3.随机key实现双重聚合
-
4.mapjoin,广播变量+map
-
5.单独处理倾斜的key
参考