vlambda博客
学习文章列表

scala-spark实现重分区和打印各个分区的data

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import java.util._
import scala.collection.Iterator
import org.apache.spark.TaskContext

object my_Object {

def main(args: Array[String]): Unit = {


// 创建spark session
val spark = SparkSession.builder().master("local[*]").getOrCreate()

val data = java.util.Arrays.asList("Hadoop", "Spark", "Flink", "Hive", "Impala", "Hbase", "Kafka", "ClickHouse", "KUDU", "zookeeper")
import spark.implicits._
val dataset = spark.createDataset(data)
println("当前rdd的分区数为:" + dataset.rdd.partitions.length) // 当前rdd的分区数为10
val reparation9 = dataset.repartition(9)
println("重分区之后的分区为:" + reparation9.rdd.partitions.length) // 重分区之后的分区为:9
reparation9.rdd.foreachPartition(i => {

println("总共有 " + reparation9.rdd.partitions.length + " 个分区 " + "当前为第 " + TaskContext.getPartitionId() + " 分区的data:" + i.toBuffer)

})


val reparation8 = dataset.repartition(8)
println("重分区之后的分区为:" + reparation8.rdd.partitions.length) // 重分区之后的分区为:8
reparation8.rdd.foreachPartition(i => {

println("总共有 " + reparation8.rdd.partitions.length + " 个分区 " + "当前为第 " + TaskContext.getPartitionId() + " 分区的data:" + i.toBuffer)

})


val repartition6 = dataset.repartition(6)
println("重分区之后的分区为:" + repartition6.rdd.partitions.length)

repartition6.rdd.foreachPartition(i => {

println("总共有 " + repartition6.rdd.partitions.length + " 个分区 " + "当前为第 " + TaskContext.getPartitionId() + " 分区的data:" + i.toBuffer)

}

)

}

}




当分区为9的输出结果:

总共有 9 个分区 当前为第 4 分区的data:ArrayBuffer()

总共有 9 个分区 当前为第 3 分区的data:ArrayBuffer()

总共有 9 个分区 当前为第 1 分区的data:ArrayBuffer()

总共有 9 个分区 当前为第 5 分区的data:ArrayBuffer(Flink)

总共有 9 个分区 当前为第 6 分区的data:ArrayBuffer(Hive)

总共有 9 个分区 当前为第 8 分区的data:ArrayBuffer(Kafka, ClickHouse)

总共有 9 个分区 当前为第 2 分区的data:ArrayBuffer(KUDU, zookeeper)

总共有 9 个分区 当前为第 0 分区的data:ArrayBuffer(Impala, Hbase)

总共有 9 个分区 当前为第 7 分区的data:ArrayBuffer(Hadoop, Spark)



当分区为8的输出结果:

总共有 8 个分区 当前为第 0 分区的data:ArrayBuffer()

总共有 8 个分区 当前为第 7 分区的data:ArrayBuffer()

总共有 8 个分区 当前为第 2 分区的data:ArrayBuffer()

总共有 8 个分区 当前为第 4 分区的data:ArrayBuffer()

总共有 8 个分区 当前为第 3 分区的data:ArrayBuffer()

总共有 8 个分区 当前为第 5 分区的data:ArrayBuffer()

总共有 8 个分区 当前为第 1 分区的data:ArrayBuffer()

总共有 8 个分区 当前为第 6 分区的data:ArrayBuffer(Hadoop, Spark, Flink, Hive, Impala, Hbase, Kafka, ClickHouse, KUDU, zookeeper)


当分区为6的输出结果:

总共有 6 个分区 当前为第 0 分区的data:ArrayBuffer(Hbase)

总共有 6 个分区 当前为第 4 分区的data:ArrayBuffer(Spark)

总共有 6 个分区 当前为第 1 分区的data:ArrayBuffer(Hadoop)

总共有 6 个分区 当前为第 2 分区的data:ArrayBuffer(Kafka, zookeeper)

总共有 6 个分区 当前为第 3 分区的data:ArrayBuffer(Hive, Impala)

总共有 6 个分区 当前为第 5 分区的data:ArrayBuffer(Flink, ClickHouse, KUDU)



总结:

本文利用了repartition对数据进行了重分区,总的来说分区越多,每个分区处理的data越小,每个分区的Data越趋均匀, 但当重分区为8的时候估计是发生了数据倾斜,把所有的数据都存在了一个partition里面.


问题:

  1. 怎么按分区顺序输出data?

  2. 怎么自定义分区使得存在每个partition的数据均匀分布在每个partition上?