scala-spark实现重分区和打印各个分区的data
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import java.util._
import scala.collection.Iterator
import org.apache.spark.TaskContext
object my_Object {def main(args: Array[String]): Unit = {
// 创建spark session
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val data = java.util.Arrays.asList("Hadoop", "Spark", "Flink", "Hive", "Impala", "Hbase", "Kafka", "ClickHouse", "KUDU", "zookeeper")
import spark.implicits._
val dataset = spark.createDataset(data)
println("当前rdd的分区数为:" + dataset.rdd.partitions.length) // 当前rdd的分区数为10
val reparation9 = dataset.repartition(9)
println("重分区之后的分区为:" + reparation9.rdd.partitions.length) // 重分区之后的分区为:9
reparation9.rdd.foreachPartition(i => {
println("总共有 " + reparation9.rdd.partitions.length + " 个分区 " + "当前为第 " + TaskContext.getPartitionId() + " 分区的data:" + i.toBuffer)
})
val reparation8 = dataset.repartition(8)
println("重分区之后的分区为:" + reparation8.rdd.partitions.length) // 重分区之后的分区为:8
reparation8.rdd.foreachPartition(i => {
println("总共有 " + reparation8.rdd.partitions.length + " 个分区 " + "当前为第 " + TaskContext.getPartitionId() + " 分区的data:" + i.toBuffer)
})
val repartition6 = dataset.repartition(6)
println("重分区之后的分区为:" + repartition6.rdd.partitions.length)
repartition6.rdd.foreachPartition(i => {
println("总共有 " + repartition6.rdd.partitions.length + " 个分区 " + "当前为第 " + TaskContext.getPartitionId() + " 分区的data:" + i.toBuffer)
}
)
}
}
当分区为9的输出结果:
总共有 9 个分区 当前为第 4 分区的data:ArrayBuffer()
总共有 9 个分区 当前为第 3 分区的data:ArrayBuffer()
总共有 9 个分区 当前为第 1 分区的data:ArrayBuffer()
总共有 9 个分区 当前为第 5 分区的data:ArrayBuffer(Flink)
总共有 9 个分区 当前为第 6 分区的data:ArrayBuffer(Hive)
总共有 9 个分区 当前为第 8 分区的data:ArrayBuffer(Kafka, ClickHouse)
总共有 9 个分区 当前为第 2 分区的data:ArrayBuffer(KUDU, zookeeper)
总共有 9 个分区 当前为第 0 分区的data:ArrayBuffer(Impala, Hbase)
总共有 9 个分区 当前为第 7 分区的data:ArrayBuffer(Hadoop, Spark)
当分区为8的输出结果:
总共有 8 个分区 当前为第 0 分区的data:ArrayBuffer()
总共有 8 个分区 当前为第 7 分区的data:ArrayBuffer()
总共有 8 个分区 当前为第 2 分区的data:ArrayBuffer()
总共有 8 个分区 当前为第 4 分区的data:ArrayBuffer()
总共有 8 个分区 当前为第 3 分区的data:ArrayBuffer()
总共有 8 个分区 当前为第 5 分区的data:ArrayBuffer()
总共有 8 个分区 当前为第 1 分区的data:ArrayBuffer()
总共有 8 个分区 当前为第 6 分区的data:ArrayBuffer(Hadoop, Spark, Flink, Hive, Impala, Hbase, Kafka, ClickHouse, KUDU, zookeeper)
当分区为6的输出结果:
总共有 6 个分区 当前为第 0 分区的data:ArrayBuffer(Hbase)
总共有 6 个分区 当前为第 4 分区的data:ArrayBuffer(Spark)
总共有 6 个分区 当前为第 1 分区的data:ArrayBuffer(Hadoop)
总共有 6 个分区 当前为第 2 分区的data:ArrayBuffer(Kafka, zookeeper)
总共有 6 个分区 当前为第 3 分区的data:ArrayBuffer(Hive, Impala)
总共有 6 个分区 当前为第 5 分区的data:ArrayBuffer(Flink, ClickHouse, KUDU)
总结:
本文利用了repartition对数据进行了重分区,总的来说分区越多,每个分区处理的data越小,每个分区的Data越趋均匀, 但当重分区为8的时候估计是发生了数据倾斜,把所有的数据都存在了一个partition里面.
问题:
怎么按分区顺序输出data?
怎么自定义分区使得存在每个partition的数据均匀分布在每个partition上?