vlambda博客
学习文章列表

Java纤程尝鲜|Loom项目实战

在Java时代的早期,Java语言抽象出来隐藏了各种操作系统线程差异性的统一线程接口,这曾经是它区别于其他编程语言的一大优势。但是在某些场景下,却也已经显现出了疲态。2018年,OpenJDK创建了Loom项目,用来实现纤程(Fiber)。

Java纤程尝鲜|Loom项目实战

基础概念

      已了解理论知识的同学们,可直接跳转到纤程实战部分,来尝鲜一下Java未来将提供的新并发模型编程方式。


线程的实现方式


实现线程主要有三种方式:

1.使用内核线程实现(1:1实现)

2.使用用户线程实现(1:N实现)

3.使用用户线程加轻量级进程混合实现(N:M实现)


内核线程实现

      使用内核线程实现的方式也被称为1:1实现。内核线程(Kernel-Level Thread,KLT)就是直接由操作系统内核(Kernel,下称内核)支持的线程,这种线程由内核来完成线程切换,内核通过操纵调度器(Scheduler)对线程进行调度,并负责将线程的任务映射到各个处理器上。每个内核线程可以视为内核的一个分身,这样操作系统就有能力同时处理多件事情,支持多线程的内核就称为多线程内核(Multi-Threads Kernel)。

      程序一般不会直接使用内核线程,而是使用内核线程的一种高级接口——轻量级进程(LightWeight Process,LWP),轻量级进程就是我们通常意义上所讲的线程,由于每个轻量级进程都由一个内核线程支持,因此只有先支持内核线程,才能有轻量级进程。这种轻量级进程与内核线程之间1:1的关系称为一对一的线程模型,如下图所示

Java纤程尝鲜|Loom项目实战


用户线程实现

      使用用户线程实现的方式被称为1:N实现。广义上来讲,一个线程只要不是内核线程,都可以认为是用户线程(User Thread,UT)的一种,因此从这个定义上看,轻量级进程也属于用户线程,但轻量级进程的实现始终是建立在内核之上的,许多操作都要进行系统调用,因此效率会受到限制,并不具备通常意义上的用户线程的优点。

      而狭义上的用户线程指的是完全建立在用户空间的线程库上,系统内核不能感知到用户线程的存在及如何实现的。用户线程的建立、同步、销毁和调度完全在用户态中完成,不需要内核的帮助。如果程序实现得当,这种线程不需要切换到内核态,因此操作可以是非常快速且低消耗的,也能够支持规模更大的线程数量,部分高性能数据库中的多线程就是由用户线程实现的。这种进程与用户线程之间1:N的关系称为一对多的线程模型,如下图所示。

Java纤程尝鲜|Loom项目实战


      用户线程的优势在于不需要系统内核支援,劣势也在于没有系统内核的支援,所有的线程操作都需要由用户程序自己去处理。线程的创建、销毁、切换和调度都是用户必须考虑的问题,而且由于操作系统只把处理器资源分配到进程,那诸如“阻塞如何处理”“多处理器系统中如何将线程映射到其他处理器上”这类问题解决起来将会异常困难,甚至有些是不可能实现的。Java、Ruby等语言都曾经使用过用户线程,最终又都放弃了使用它。但是近年来许多新的、以高并发为卖点的编程语言又普遍支持了用户线程,譬如Golang、Erlang等,使得用户线程的使用率有所回升。


混合实现

      线程除了依赖内核线程实现和完全由用户程序自己实现之外,还有一种将内核线程与用户线程一起使用的实现方式,被称为N:M实现。在这种混合实现下,既存在用户线程,也存在轻量级进程。用户线程还是完全建立在用户空间中,因此用户线程的创建、切换、析构等操作依然廉价,并且可以支持大规模的用户线程并发。而操作系统支持的轻量级进程则作为用户线程和内核线程之间的桥梁,这样可以使用内核提供的线程调度功能及处理器映射,并且用户线程的系统调用要通过轻量级进程来完成,这大大降低了整个进程被完全阻塞的风险。在这种混合模式中,用户线程与轻量级进程的数量比是不定的,是N:M的关系,如下图所示,这种就是多对多的线程模型。

Java纤程尝鲜|Loom项目实战

     许多UNIX系列的操作系统,如Solaris、HP-UX等都提供了M:N的线程模型实现。在这些操作系统上的应用也相对更容易应用M:N的线程模型。


Java纤程尝鲜|Loom项目实战

Java线程的实现


目前方式

      以HotSpot为例,目前是基于操作系统原生线程模型来实现,即采用1:1的线程模型。它的每一个Java线程都是直接映射到一个操作系统原生线程来实现的,而且中间没有额外的间接结构,所以HotSpot自己是不会去干涉线程调度的(可以设置线程优先级给操作系统提供调度建议),全权交给底下的操作系统去处理,所以何时冻结或唤醒线程、该给线程分配多少处理器执行时间、该把线程安排给哪个处理器核心去执行等,都是由操作系统完成的,也都是由操作系统全权决定的。


局限

      现今对Web应用的服务要求,不论是在请求数量上还是在复杂度上,与十多年前相比已不可同日而语,这一方面是源于业务量的增长,另一方面来自于为了应对业务复杂化而不断进行的服务细分。现代B/S系统中一次对外部业务请求的响应,往往需要分布在不同机器上的大量服务共同协作来实现,这种服务细分的架构在减少单个服务复杂度、增加复用性的同时,也不可避免地增加了服务的数量,缩短了留给每个服务的响应时间。这要求每一个服务都必须在极短的时间内完成计算,这样组合多个服务的总耗时才不会太长;也要求每一个服务提供者都要能同时处理数量更庞大的请求,这样才不会出现请求由于某个服务被阻塞而出现等待。

      Java目前的并发编程机制就与上述架构趋势产生了一些矛盾,1:1的内核线程模型是如今Java虚拟机线程实现的主流选择,但是这种映射到操作系统上的线程天然的缺陷是切换、调度成本高昂,系统能容纳的线程数量也很有限。以前处理一个请求可以允许花费很长时间在单体应用中,具有这种线程切换的成本也是无伤大雅的,但现在在每个请求本身的执行时间变得很短、数量变得很多的前提下,用户线程切换的开销甚至可能会接近用于计算本身的开销,这就会造成严重的浪费。

      传统的Java Web服务器的线程池的容量通常在几十个到两百之间,当程序员把数以百万计的请求往线程池里面灌时,系统即使能处理得过来,但其中的切换损耗也是相当可观的。


未来方向-这次的主角:纤程(Fiber)

Java纤程尝鲜|Loom项目实战

JVMLS 2018大会上Oracle对纤程的介绍


Loom项目

       Loom项目背后的意图不是为了取代当前基于操作系统的线程实现,而是会有两个并发编程模型在Java虚拟机中并存,可以在程序中同时使用。新模型有意地保持了与目前线程模型相似的API设计,它们甚至可以拥有一个共同的基类,这样现有的代码就不需要为了使用纤程而进行过多改动,甚至不需要知道背后采用了哪个并发编程模型。

       Loom团队在JVMLS 2018大会上公布了他们对Jetty基于纤程改造后的测试结果,同样5000QPS的压力下,以容量为400的线程池的传统模式和每个请求配以一个纤程的新并发处理模式进行对比,前者的请求响应延迟在10000至20000毫秒之间,而后者的延迟普遍在200毫秒以下,具体结果如下图所示。

Java纤程尝鲜|Loom项目实战


       在新并发模型下,一段使用纤程并发的代码会被分为两部分——执行过程(Continuation)和调度器(Scheduler)。执行过程主要用于维护执行现场,保护、恢复上下文状态,而调度器则负责编排所有要执行的代码的顺序。将调度程序与执行过程分离的好处是,用户可以选择自行控制其中的一个或者多个,而且Java中现有的调度器也可以被直接重用。事实上,Loom中默认的调度器就是原来已存在的用于任务分解的Fork/Join池(JDK 7中加入的ForkJoinPool)。

Java纤程尝鲜|Loom项目实战


纤程实战

1.下载early access版本的JDK

Java纤程尝鲜|Loom项目实战


2.编写代码

/**

* Java使用纤程Demo

*

* @author jiazhifeng

* @date 2022/1/6 07:23

*/

public class LoomTest {

   public static void main(String[] args) throws InterruptedException {

       //方式1

       Thread.startVirtualThread(() -> System.out.println("Fiber Thread" + Thread.currentThread()));

       //方式2

       Thread.Builder.OfVirtual virtualThreadBuilder = Thread.ofVirtual()

               .name("Fiber Thread-", 0L)// 纤程名称前缀 + 起始自增数字 => prefix + start

               .allowSetThreadLocals(false)// 是否启用ThreadLocal

               .inheritInheritableThreadLocals(false)// 是否启用InheritableThreadLocal

               .uncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {// 设置未捕获异常处理器

                   @Override

                   public void uncaughtException(Thread t, Throwable e) {

                   }

               });

       // Create Fiber Thread 1

       virtualThreadBuilder.start(() -> System.out.println("Fiber Thread First" + Thread.currentThread()));

       // Create Fiber Thread 2

       virtualThreadBuilder.start(() -> System.out.println("Fiber Thread Second" + Thread.currentThread()));

       Thread.sleep(1000L);

   }

}



3.编译

目前IDEA不支持JDK19,所以编译,执行需要在终端进行。

终端执行命令
C:\jdk-19\bin\javac -source 19 --enable-preview loomTest.java


4.运行

终端执行命令
C:\jdk-19\bin\java --enable-preview loom.LoomTest


结果:

Fiber ThreadVirtualThread[#14]/runnable@ForkJoinPool-1-worker-1

Fiber Thread SecondVirtualThread[#18,Fiber Thread-1]/runnable@ForkJoinPool-1-worker-3

Fiber Thread FirstVirtualThread[#16,Fiber Thread-0]/runnable@ForkJoinPool-1-worker-2

原理分析

通过调用Thead类的静态方法startVirtualThread()获取Thead实例

public static Thread startVirtualThread(Runnable task) {

   //Loom项目引入的新的类,表示一个虚拟线程实例(纤程实例)

   var thread = new VirtualThread(null, null, 0, task);

   //启动虚拟线程

   thread.start();

   return thread;

}

VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {

   super(name, characteristics);

   Objects.requireNonNull(task);

   // choose scheduler if not specified

   if (scheduler == null) {

       Thread parent = Thread.currentThread();

       if (parent instanceof VirtualThread vparent) {

           //获取父线程的调度器

           scheduler = vparent.scheduler;

       } else {

           //默认调度器为:ForkJoinPool实例

           scheduler = DEFAULT_SCHEDULER;

       }

   }

   this.scheduler = scheduler;

   //task实例会被封装为RunContinuation,也就是上面所说的执行过程,主要用于维护执行现场,保护、恢复上下文状态,最终由调度器执行

   this.cont = new VThreadContinuation(this, task);

   this.runContinuation = this::runContinuation;

}




      接下来看下纤程如果遇传统的线程阻塞会怎样?没有了系统调用,会不会挂起整个线程呢?这个问题是最值得关注的,也是最重要的。

//这个是目前Thread中sleep方法的实现

public static native void sleep(long millis) throws InterruptedException;

//这个是loom版jdk中Thead类中sleep方法的实现

public static void sleep(long millis) throws InterruptedException {

   if (millis < 0) {

       throw new IllegalArgumentException("timeout value is negative");

   }

   if (ThreadSleepEvent.isTurnedOn()) {

       ThreadSleepEvent event = new ThreadSleepEvent();

       try {

           event.time = NANOSECONDS.convert(millis, MILLISECONDS);

           event.begin();

           sleepMillis(millis);

       } finally {

           event.commit();

       }

   } else {

       sleepMillis(millis);

   }

}


private static void sleepMillis(long millis) throws InterruptedException {

   Thread thread = currentThread();

   if (thread instanceof VirtualThread vthread) {

       long nanos = NANOSECONDS.convert(millis, MILLISECONDS);

       //可以看到,虚拟线程会调用这里。

       vthread.sleepNanos(nanos);

   } else {

   //普通线程将挂起整个线程

       sleep0(millis);

   }

}




void sleepNanos(long nanos) throws InterruptedException {

       assert Thread.currentThread() == this;

       if (nanos >= 0) {

           if (getAndClearInterrupt())

               throw new InterruptedException();

           if (nanos == 0) {

               tryYield();

           } else {

               // park for the sleep time

               try {

                   long remainingNanos = nanos;

                   long startNanos = System.nanoTime();

                   while (remainingNanos > 0) {

                      //虚拟线程挂起的核心方法,挂起当前纤程,协程进入Block状态

                       parkNanos(remainingNanos);

                       if (getAndClearInterrupt()) {

                           throw new InterruptedException();

                       }

                       remainingNanos = nanos - (System.nanoTime() - startNanos);

                   }

               } finally {

                   // may have been unparked while sleeping

                   setParkPermit(true);

               }

           }

       }

   }



void sleepNanos(long nanos) throws InterruptedException {

       assert Thread.currentThread() == this;

       if (nanos >= 0) {

           if (getAndClearInterrupt())

               throw new InterruptedException();

           if (nanos == 0) {

               tryYield();

           } else {

               // park for the sleep time

               try {

                   long remainingNanos = nanos;

                   long startNanos = System.nanoTime();

                   while (remainingNanos > 0) {

                      //虚拟线程挂起的核心方法,挂起当前纤程,协程进入Block状态

                       parkNanos(remainingNanos);

                       if (getAndClearInterrupt()) {

                           throw new InterruptedException();

                       }

                       remainingNanos = nanos - (System.nanoTime() - startNanos);

                   }

               } finally {

                   // may have been unparked while sleeping

                   setParkPermit(true);

               }

           }

       }

   }


可以得出:
      线程的挂起和恢复使用的时Unsafe类中的park方法,unpark方法。挂起的是整个线程。
      纤程的挂起和恢复使用的是VirtualThread类中的park方法,unpark方法。挂起的只是当前纤程。

      Loom项目目前仍然在开发当中,还没有明确的发布日期,上面内容日后都有被改动的可能。



「作者简介」


贾志峰


一个喜欢探索新技术的程序猿




Java纤程尝鲜|Loom项目实战

关注 分享 在看,就是所长最大的生产力!