搜文章
推荐 原创 视频 Java开发 iOS开发 前端开发 JavaScript开发 Android开发 PHP开发 数据库 开发工具 Python开发 Kotlin开发 Ruby开发 .NET开发 服务器运维 开放平台 架构师 大数据 云计算 人工智能 开发语言 其它开发
Lambda在线 > Spark技术日报 > Spark源码分析|存储体系概述与Shuffle服务

Spark源码分析|存储体系概述与Shuffle服务

Spark技术日报 2017-12-01

本文转载自博客园,作者ID「ChouYarn」

http://www.cnblogs.com/ChouYarn/p/7169472.html


一、概述


根据《深入理解Spark:核心思想与源码分析》一书,结合最新的spark源代码master分支进行源码阅读,对新版本的代码加上自己的一些理解,如有错误,希望指出。


1.块管理器BlockManager的实现


块管理器是Spark存储体系的核心组件,Driver Application和Executor都会创建BlockManager,源代码位置在core/org.apache.spark.storage,部分代码如下。


private[spark] val externalShuffleServiceEnabled =

    conf.getBoolean("spark.shuffle.service.enabled", false)


  val diskBlockManager = {

    // Only perform cleanup if an external service is not serving our shuffle files.

    val deleteFilesOnStop =

      !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER

    new DiskBlockManager(conf, deleteFilesOnStop)

  }


  // Visible for testing

  private[storage] val blockInfoManager = new BlockInfoManager


  private val futureExecutionContext = ExecutionContext.fromExecutorService(

    ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))


  // Actual storage of where blocks are kept

  private[spark] val memoryStore =

    new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)

  private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)

  memoryManager.setMemoryStore(memoryStore)


  // Note: depending on the memory manager, `maxMemory` may actually vary over time.

  // However, since we use this only for reporting and logging, what we actually want here is

  // the absolute maximum value that `maxMemory` can ever possibly reach. We may need

  // to revisit whether reporting this value as the "max" is intuitive to the user.

  private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory

  private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory


  // Port used by the external shuffle service. In Yarn mode, this may be already be

  // set through the Hadoop configuration as the server is launched in the Yarn NM.

  private val externalShuffleServicePort = {

    val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt

    if (tmpPort == 0) {

      // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds

      // an open port.  But we still need to tell our spark apps the right port to use.  So

      // only if the yarn config has the port set to 0, we prefer the value in the spark config

      conf.get("spark.shuffle.service.port").toInt

    } else {

      tmpPort

    }

  }


  var blockManagerId: BlockManagerId = _


  // Address of the server that serves this executor's shuffle files. This is either an external

  // service, or just our own Executor's BlockManager.

  private[spark] var shuffleServerId: BlockManagerId = _


  // Client to read other executors' shuffle files. This is either an external service, or just the

  // standard BlockTransferService to directly connect to other Executors.

  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {

    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)

    new ExternalShuffleClient(transConf, securityManager,

      securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))

  } else {

    blockTransferService

  }


  // Max number of failures before this block manager refreshes the block locations from the driver

  private val maxFailuresBeforeLocationRefresh =

    conf.getInt("spark.block.failures.beforeLocationRefresh", 5)


  private val slaveEndpoint = rpcEnv.setupEndpoint(

    "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,

    new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))


  // Pending re-registration action being executed asynchronously or null if none is pending.

  // Accesses should synchronize on asyncReregisterLock.

  private var asyncReregisterTask: Future[Unit] = null

  private val asyncReregisterLock = new Object


  // Field related to peer block managers that are necessary for block replication

  @volatile private var cachedPeers: Seq[BlockManagerId] = _

  private val peerFetchLock = new Object

  private var lastPeerFetchTime = 0L


  private var blockReplicationPolicy: BlockReplicationPolicy = _


上面代码中声明的BlockInfoManager用于管理BlockManager缓存BlockId及对应的BlockInfo,BlockInfoManager提供一些列的同步读写策略。BlockManager由以下部分组成。


  • shuffle客户端shuffleClient;

  • BlockManagerMaster,对存在于所有Executor上的BlockManager进行统一管理;

  • 磁盘块管理器DiskBlockManager;

  • 内存存储MemoryStore;

  • 磁盘存储DiskStore;


BlockManager要生效必须要初始化,初始化代码如下:


def initialize(appId: String): Unit = {

    blockTransferService.init(this)

    shuffleClient.init(appId)


    blockReplicationPolicy = {

      val priorityClass = conf.get(

        "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)

      val clazz = Utils.classForName(priorityClass)

      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]

      logInfo(s"Using $priorityClass for block replication policy")

      ret

    }


    val id =

      BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)


    val idFromMaster = master.registerBlockManager(

      id,

      maxOnHeapMemory,

      maxOffHeapMemory,

      slaveEndpoint)


    blockManagerId = if (idFromMaster != null) idFromMaster else id


    shuffleServerId = if (externalShuffleServiceEnabled) {

      logInfo(s"external shuffle service port = $externalShuffleServicePort")

      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)

    } else {

      blockManagerId

    }


    // Register Executors' configuration with the local shuffle service, if one should exist.

    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {

      registerWithExternalShuffleServer()

    }


    logInfo(s"Initialized BlockManager: $blockManagerId")

  }


  • BlockTransferService和shuffle客户端shuffleClient的初始化,ShuffleClien默认是BlockTransferService,当有外部的ShuffleService时,则调用外部的ExternalShuffleClient。

  • 创建id为本地BlockManagerId,向BlockManagerMaster注册此id,获取从BlockManagerMaster的idFromMaster,如果idFromMaster为空则BlockManagerId为刚才创建的id,否则使用BlockManagerMaster注册到的idFromMaster。

  • ShuffleServerId的创建,当有外部的ShuffleService时,创建新的BlockManagerId作为ShuffleServerId。

  • 当有外部的ShuffleService并且当前BlockMaId不是Driver端,则需要向ShuffleClient注册ShuffleServerId。


2.Spark存储体系架构



  • 1表示Executor的BlockManager与Driver的BlockManager进行消息通信,例如注册BlockManager、更新Block信息、获取Block所在的BlockManager、删除Executor等。

  • 2表示对BlockManager的读操作如get、doGetLocal等和写操作doPut、puSingle等。

  • 3表示当MemoryStore的内存不足时,写入DiskStore,而DiskStore实际依赖于DiskBlockManager。

  • 4表示通过访问远端节点的Executor的BlockManager中的TransportServer提供RPC服务下载或者上传Block。

  • 5表示远端节点的Executor的BlockManager访问本地Executor的BlockManager中的TransportServer提供的RPC服务下载或者上传Block。


二、shuffle服务与客户端


1.Block的RPC服务


当map任务与reduce任务处于不同的节点时,reduce任务需要从远端节点下载map任务的中间件输出,因此NettyBlockRpcServer提供打开,即下载Block文件的功能;一些情况下,为了容错,需要将Block的数据备份到其他节点上,所以NettyBlockRpcServer还提供了上传Block文件的RPC服务,实现见代码,代码位置:core/org.apache.spark.network.netty。


class NettyBlockRpcServer(

    appId: String,

    serializer: Serializer,

    blockManager: BlockDataManager)

  extends RpcHandler with Logging {


  private val streamManager = new OneForOneStreamManager()


  override def receive(

      client: TransportClient,

      rpcMessage: ByteBuffer,

      responseContext: RpcResponseCallback): Unit = {

    val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)

    logTrace(s"Received request: $message")


    message match {

      case openBlocks: OpenBlocks =>

        val blocksNum = openBlocks.blockIds.length

        val blocks = for (i <- (0 until blocksNum).view)

          yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))

        val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)

        logTrace(s"Registered streamId $streamId with $blocksNum buffers")

        responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)


      case uploadBlock: UploadBlock =>

        // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.

        val (level: StorageLevel, classTag: ClassTag[_]) = {

          serializer

            .newInstance()

            .deserialize(ByteBuffer.wrap(uploadBlock.metadata))

            .asInstanceOf[(StorageLevel, ClassTag[_])]

        }

        val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))

        val blockId = BlockId(uploadBlock.blockId)

        blockManager.putBlockData(blockId, data, level, classTag)

        responseContext.onSuccess(ByteBuffer.allocate(0))

    }

  }


  override def getStreamManager(): StreamManager = streamManager

}


2.构造传输上下文TransportContext


代码位置,common/network-common/org.apache.spark.network


  public TransportContext(

      TransportConf conf,

      RpcHandler rpcHandler,

      boolean closeIdleConnections) {

    this.conf = conf;

    this.rpcHandler = rpcHandler;

    this.closeIdleConnections = closeIdleConnections;

  }


TransportContext既可以创建Netty服务,也可以创建Netty访问客户端,组成部分如下:


  • TransportConf:主要控制Netty框架提供的shuffle的I/O交互的客户端和服务端线程数量等;

  • RpcHandler:负责shuffle的I/O服务端在接收到客户端的RPC请求后,提供打开Block或者上传Block的RPC处理,此处实现为NettyBlockRpcServer;

  • 是否关闭闲置连接。


3.RPC客户端工厂TransportClientFactory


 public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {

    return new TransportClientFactory(this, bootstraps);

  }


以下分析TransportClientFactory代码。


public TransportClientFactory(

      TransportContext context,

      List<TransportClientBootstrap> clientBootstraps) {

    this.context = Preconditions.checkNotNull(context);

    this.conf = context.getConf();

    this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));

    this.connectionPool = new ConcurrentHashMap<>();

    this.numConnectionsPerPeer = conf.numConnectionsPerPeer();

    this.rand = new Random();


    IOMode ioMode = IOMode.valueOf(conf.ioMode());

    this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);

    this.workerGroup = NettyUtils.createEventLoop(

        ioMode,

        conf.clientThreads(),

        conf.getModuleName() + "-client");

    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(

      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());

  }


  • clientBootstraps:用于缓存客户端列表

  • connectionPool:用户缓存客户端连接

  • numConnectionsPerPeer:节点之间取数据的连接数,可以使用属性spark.shuffle.io.numConnectionsPerPeer来配置,默认为1

  • SocketChannelClass:客户端channel被创建时使用的类,可以使用属性spark.shuffle.io.mode来配置

  • workerGroup:根据Netty的规范,客户端只有worker组,所以此处创建workerGroup,实际是NioEventLoopGroup

  • pooledAllocator:汇集ByteBuf但对本地线程缓存禁用的分配器。


4.Netty服务器TransportServer


TransportServer提供了Netty实现的服务器端,用于提供RPC服务,如上传、下载等,代码如下:


 public TransportServer createServer(

      String host, int port, List<TransportServerBootstrap> bootstraps) {

    return new TransportServer(this, host, port, rpcHandler, bootstraps);

  }


TransportServer构造器如下:


public TransportServer(

      TransportContext context,

      String hostToBind,

      int portToBind,

      RpcHandler appRpcHandler,

      List<TransportServerBootstrap> bootstraps) {

    this.context = context;

    this.conf = context.getConf();

    this.appRpcHandler = appRpcHandler;

    this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));


    try {

      init(hostToBind, portToBind);

    } catch (RuntimeException e) {

      JavaUtils.closeQuietly(this);

      throw e;

    }

  }


init方法对TransportServer进行初始化,通过使用Netty框架的EventLoopGroup、ServerBootstrap等API创建shuffle的I/O交互的服务端,主要代码见清单。


  private void init(String hostToBind, int portToBind) {


    IOMode ioMode = IOMode.valueOf(conf.ioMode());

    EventLoopGroup bossGroup =

      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");

    EventLoopGroup workerGroup = bossGroup;


    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(

      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());


    bootstrap = new ServerBootstrap()

      .group(bossGroup, workerGroup)

      .channel(NettyUtils.getServerChannelClass(ioMode))

      .option(ChannelOption.ALLOCATOR, allocator)

      .childOption(ChannelOption.ALLOCATOR, allocator);


    if (conf.backLog() > 0) {

      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());

    }


    if (conf.receiveBuf() > 0) {

      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());

    }


    if (conf.sendBuf() > 0) {

      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());

    }


    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

      @Override

      protected void initChannel(SocketChannel ch) throws Exception {

        RpcHandler rpcHandler = appRpcHandler;

        for (TransportServerBootstrap bootstrap : bootstraps) {

          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);

        }

        context.initializePipeline(ch, rpcHandler);

      }

    });


    InetSocketAddress address = hostToBind == null ?

        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);

    channelFuture = bootstrap.bind(address);

    channelFuture.syncUninterruptibly();


    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();

    logger.debug("Shuffle server started on port: {}", port);

  }


  • ioMode:NIO或者EPOLL。

  • ChannelOption.ALLOCATOR:在Netty 4中实现了一个新的ByteBuf内存池,它是一个纯Java版本的 jemalloc (Facebook也在用)。现在,Netty不会再因为用零填充缓冲区而浪费内存带宽了。不过,由于它不依赖于GC,开发人员需要小心内存泄漏。如果忘记在处理程序中释放缓冲区,那么内存使用率会无限地增长。Netty默认不使用内存池,需要在创建客户端或者服务端的时候进行指定,使用内存池之后,内存的申请和释放必须成对出现,即retain()和release()要成对出现,否则会导致内存泄露。

  • RpcHandler处理接收到的数据逻辑。


5.获取远程shuffle文件


NettyBlockTransferService的fetchBlocks方法用于获取远程的shuffle文件,实际是使用NettyBlockTransferService中创建的Netty服务。


override def fetchBlocks(

      host: String,

      port: Int,

      execId: String,

      blockIds: Array[String],

      listener: BlockFetchingListener,

      tempShuffleFileManager: TempShuffleFileManager): Unit = {

    logTrace(s"Fetch blocks from $host:$port (executor id $execId)")

    try {

      val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {

        override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {

          val client = clientFactory.createClient(host, port)

          new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,

            transportConf, tempShuffleFileManager).start()

        }

      }


      val maxRetries = transportConf.maxIORetries()

      if (maxRetries > 0) {

        // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's

        // a bug in this code. We should remove the if statement once we're sure of the stability.

        new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()

      } else {

        blockFetchStarter.createAndStart(blockIds, listener)

      }

    } catch {

      case e: Exception =>

        logError("Exception while beginning fetchBlocks", e)

        blockIds.foreach(listener.onBlockFetchFailure(_, e))

    }

  }


6.上传shuffle文件


NettyBlockTransferService的uploadBlock方法用于上传shuffle文件到远程的Executor,实际也是用NettyBlockTransferService中创建的Netty服务,步骤如下。


override def uploadBlock(

      hostname: String,

      port: Int,

      execId: String,

      blockId: BlockId,

      blockData: ManagedBuffer,

      level: StorageLevel,

      classTag: ClassTag[_]): Future[Unit] = {

    val result = Promise[Unit]()

    val client = clientFactory.createClient(hostname, port)


    // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.

    // Everything else is encoded using our binary protocol.

    val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))


    // Convert or copy nio buffer into array in order to serialize it.

    val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())


    client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,

      new RpcResponseCallback {

        override def onSuccess(response: ByteBuffer): Unit = {

          logTrace(s"Successfully uploaded block $blockId")

          result.success((): Unit)

        }

        override def onFailure(e: Throwable): Unit = {

          logError(s"Error while uploading block $blockId", e)

          result.failure(e)

        }

      })


    result.future

  }


  • 创建Netty服务的客户端,客户端连接的hostname和port正是BlockManager的hostname和port;

  • 将Block的存储级别StorageLevel和类标签序列化;

  • 将Block的ByteBuffer转化为数据,便于序列化;

  • 将appId、execId、blockId、metadata、转化为数组的Block封装为UploadBlock,并将其序列化为字节数组;

  • 最终调用Netty客户端的sendRpc方法将字节数组上传,回掉函数RpcResponseCallback根据RPC的结果更改上传状态。

版权声明:本站内容全部来自于腾讯微信公众号,属第三方自助推荐收录。《Spark源码分析|存储体系概述与Shuffle服务》的版权归原作者「Spark技术日报」所有,文章言论观点不代表Lambda在线的观点, Lambda在线不承担任何法律责任。如需删除可联系QQ:516101458

文章来源: 阅读原文

相关阅读

关注Spark技术日报微信公众号

Spark技术日报微信公众号:SparkDaily

Spark技术日报

手机扫描上方二维码即可关注Spark技术日报微信公众号

Spark技术日报最新文章

精品公众号随机推荐