vlambda博客
学习文章列表

JDK17 |java17学习 第 7 章 Java 标准和外部库

Chapter 8: Multithreading and Concurrent Processing

在本章中,我们将讨论通过使用同时处理数据的工作线程(线程)来提高 Java 应用程序性能的方法。我们将解释 Java 线程的概念并演示它们的用法。我们还将讨论并行和并发处理之间的区别,以及如何避免因并发修改共享资源而导致的不可预知的结果。

完成本章后,您将能够为多线程处理编写代码——创建和执行线程以及在并行和并发情况下使用线程池。

本章将涵盖以下主题:

  • 线程与进程
  • 用户线程与守护进程
  • 扩展 Thread
  • 实现 Runnable 接口
  • 扩展 Thread 与实现 Runnable
  • 使用线程池
  • 从线程中获取结果
  • 并行与并发处理
  • 同一资源的并发修改

Technical requirements

为了能够执行本章提供的代码示例,您将需要以下内容:

  • 装有 Microsoft Windows、Apple macOS 或 Linux 操作系统的计算机
  • Java 标准版 (SE) 17 或更高版本
  • 集成开发环境 (IDE) 或您首选的代码编辑器

第 1 章Java 17 入门。本章的代码示例文件可在 GitHub 上的 https://github 中找到。 com/PacktPublishing/Learn-Java-17-Programming.git 存储库,位于 examples/src/main/java/com/packt/learnjava/ch08_threads 文件夹中。

Thread versus process

Java 有两个执行单元——一个进程和一个线程。 进程通常代表整个Java虚拟机(JVM),虽然应用程序可以使用 java.lang.ProcessBuilder 创建 另一个进程。但由于多进程 的情况超出了本书的范围,我们将重点关注第二个执行单元——即 线程,类似于进程,但与其他线程的隔离度较低,执行所需的资源较少。

一个进程可以有许多线程在运行,并且至少有一个称为主线程的线程——启动应用程序的线程——我们在每个示例中使用。线程可以共享资源,包括内存和打开的文件,这样可以提高效率,但这是有代价的:意外相互干扰的风险更高,甚至执行阻塞。这就是需要编程技能和理解并发技术的地方。

User thread versus daemon

有一种特殊的 线程称为 守护进程。

笔记

daemon 一词起源于古希腊,意思是 神与人之间的自然神性或超自然存在 一种内在的或伴随的精神或鼓舞人心的力量。

在计算机科学中,术语 daemon 有更普通的用法,适用于 作为后台进程运行的计算机程序,而不是在直接控制交互式用户。这就是为什么Java中有以下两种类型的线程:

  • 由应用程序启动的用户线程(默认)(主线程就是这样一个例子)
  • 在后台工作以支持用户线程活动的守护线程

这就是为什么所有守护线程在最后一个用户线程退出后立即退出或在未处理的异常后被 JVM 终止的原因。

Extending the Thread class

创建线程 的一种方法是扩展 java.lang.Thread 类并覆盖其 run() 方法。这是一个例子:

class MyThread extends Thread {
    private String parameter;
    public MyThread(String parameter) {
        this.parameter = parameter;
    }
    public void run() {
        while(!"exit".equals(parameter)){
           System.out.println((isDaemon() ? "daemon" 
              : "  user") + " thread " + this.getName() + 
              "(id=" + this.getId() + ") parameter: " +
                                               parameter);
            pauseOneSecond();
        }
        System.out.println((isDaemon() ? "daemon" 
              : "  user") + " thread " + this.getName() + 
              "(id=" + this.getId() + ") parameter: " +
                                               parameter);
    }
    public void setParameter(String parameter) {
        this.parameter = parameter;
    }
}

如果 run() 方法没有被覆盖,线程什么也不做。在我们的示例中,只要参数不等于 "exit",线程每秒都会打印其名称和其他属性代码>字符串;否则,它会退出。

pauseOneSecond() 方法如下所示:

private static void pauseOneSecond(){
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

我们现在可以使用 MyThread 类来运行 两个线程——一个用户线程和一个守护线程,如下所示:

public static void main(String... args) {
    MyThread thr1 = new MyThread("One");
    thr1.start();
    MyThread thr2 = new MyThread("Two");
    thr2.setDaemon(true);
    thr2.start();
    pauseOneSecond();
    thr1.setParameter("exit");
    pauseOneSecond();
    System.out.println("Main thread exists");
}

可以看到,主线程创建了另外两个线程,暂停一秒,在用户线程上设置exit参数,再暂停一秒,最后退出(main() 方法完成执行)。

如果我们运行前面的代码,我们会看到这样的东西(id 线程在不同的操作系统中可能不同):

JDK17 |java17学习 第 7 章 Java 标准和外部库

前面的屏幕截图显示,一旦最后一个用户线程(在我们的示例中为主线程)退出,守护线程就会自动退出。

Implementing the Runnable interface

创建 线程的第二种方法是使用实​​现java.lang.Runnable 的类。这是一个与 MyThread 类具有几乎完全相同功能的类的示例:

class MyRunnable implements Runnable {
    private String parameter, name;
    public MyRunnable(String name) {
        this.name = name;
    }
    public void run() {
        while(!"exit".equals(parameter)){
            System.out.println("thread " + this.name + 
                              ", parameter: " + parameter);
            pauseOneSecond();
        }
        System.out.println("thread " + this.name +
                              ", parameter: " + parameter);
    }
    public void setParameter(String parameter) {
        this.parameter = parameter;
    }
}

不同之处在于没有 isDaemon()getId() 或任何其他开箱即用的方法. MyRunnable 类可以是任何实现了 Runnable 接口的类,因此我们无法打印线程是否为守护进程。我们添加了 name 属性,以便我们可以识别线程。

我们可以使用 MyRunnable 类来创建 线程,类似于我们使用 MyThread< /code> 类,如下:

public static void main(String... args) {
    MyRunnable myRunnable1 = new MyRunnable("One");
    MyRunnable myRunnable2 = new MyRunnable("Two");
    Thread thr1 = new Thread(myRunnable1);
    thr1.start();
    Thread thr2 = new Thread(myRunnable2);
    thr2.setDaemon(true);
    thr2.start();
    pauseOneSecond();
    myRunnable1.setParameter("exit");
    pauseOneSecond();
    System.out.println("Main thread exists");
}

以下截图证明 MyRunnable 类的行为与 MyThread 类的行为相似:

JDK17 |java17学习 第 7 章 Java 标准和外部库

守护线程(名为 Two)在最后一个用户线程退出后退出——这正是 MyThread 类的情况。

Extending Thread versus implementing Runnable

Runnable 的实现具有允许实现 扩展另一个类的优势(在某些情况下,这是唯一可能的选择)。当您想向现有类添加类似线程的行为时,它特别 很有帮助。实现 Runnable 可以让使用更加灵活,但除此之外,与 Thread 类的扩展相比,在功能上没有区别。

Thread 类有几个构造函数,允许设置线程名称和它所属的组。在许多线程并行运行的情况下,线程分组有助于管理它们。 Thread 类还有几个方法可以提供有关线程状态及其属性的信息,并允许我们控制其行为。

如您所见,线程的 identifier (ID) 是自动生成的。它不能更改,但可以在线程终止后重用。另一方面,可以使用相同的名称设置多个线程。

执行优先级也可以在 Thread.MIN_PRIORITYThread.MAX_PRIORITY 之间以编程方式设置。该值越小,线程被允许运行的时间越多,这意味着它具有更高的优先级。如果未设置,则优先级值默认为 Thread.NORM_PRIORITY

线程的状态可以具有以下值之一:

  • NEW:当线程尚未启动时
  • RUNNABLE:线程正在执行时
  • BLOCKED:当线程被阻塞并等待监视器锁时
  • WAITING:当一个线程无限期等待另一个线程执行特定操作时
  • TIMED_WAITING:当一个线程等待另一个线程执行一个动作达到指定的等待时间
  • TERMINATED:线程退出时

线程——以及任何对象——也可以使用wait()相互交流java.lang.Object 基类的 "literal">notify() 和 notifyAll() 方法,但是线程行为的这一方面超出了本书的范围。

Using a pool of threads

每个线程都需要资源——中央处理器(CPU)和内存 .这意味着必须控制 线程的数量,而 一种方法是创建 固定数量——一个池子。此外,创建对象会产生对某些应用程序来说可能很重要的开销。

在本节中,我们将研究 java.util.concurrent 包中提供的 Executor 接口及其实现。它们封装了线程管理,并最大限度地减少了应用程序开发人员编写与线程生命周期相关的代码的时间。

java.util.concurrent包中定义了三个Executor接口 , 如下:

  • 基本 Executor 接口:其中只有一个 void execute(Runnable r) 方法。
  • ExecutorService 接口:它扩展了 Executor 并添加了四组方法来管理工作线程和执行器本身的生命周期, 如下:
    • submit() 方法, Runnable or Callable 队列中的对象 用于执行(Callable 允许工作线程返回一个值)并返回 Future 接口的一个对象,可用于访问 Callable返回的值 对象并管理工作线程的状态
    • invokeAll() 方法,它放置一个集合的 Callable interface 在队列中等待执行,然后 返回一个 List 接口的 Future  所有工作线程完成时的对象(还有一个重载的 invokeAll() 方法超时)
    • invokeAny() 方法, 其中放置了 接口集合 Callable 对象 in队列 用于执行并返回一个 Future 任何已完成的工作线程的对象(有也是一个重载的 invokeAny() 方法超时)
  • 管理工作线程状态和服务本身的方法,如下:
    • shutdown():阻止新的工作线程提交给服务。
    • shutdownNow(): 中断每个未完成的工作线程。应该编写一个工作线程,以便它定期检查自己的状态 (例如,使用 Thread.currentThread().isInterrupted())并优雅地自行关闭;否则,它会在 shutdownNow() 被调用后继续运行。
    • isShutdown():检查executor的关闭是否启动。
    • awaitTermination(long timeout, TimeUnit timeUnit):等待关闭请求后所有工作线程都执行完毕,或者发生超时,或者当前线程被中断,以两者为准首先发生。
    • isTerminated():检查启动关闭后是否所有工作线程都已完成。它永远不会返回 true 除非 shutdown() 或 shutdownNow () 首先被调用。
  • ScheduledExecutorService 接口:它扩展了 ExecutorService 并添加了允许调度工作线程执行(一次性和周期性)的方法

ExecutorService 的基于池的实现可以使用 java.util.concurrent.ThreadPoolExecutorjava.util.concurrent.ScheduledThreadPoolExecutor 类。还有一个 java.util.concurrent.Executors 工厂类,涵盖了大多数实际情况。因此,在为工作线程的池创建编写自定义代码之前,我们强烈建议使用 java.util.concurrent 的以下工厂方法.Executors 类:

  • newCachedThreadPool():创建线程池,根据需要添加新线程,除非之前创建了空闲线程;从池中删除已空闲 60 秒的线程
  • newSingleThreadExecutor():创建一个按顺序执行工作线程的ExecutorService(池)实例
  • newSingleThreadScheduledExecutor():创建一个单线程执行器,可以安排在给定延迟后运行,或者定期执行
  • newFixedThreadPool(int nThreads):创建复用固定数量工作线程的线程池;如果在所有工作线程仍在执行时提交了新任务,则将其放入队列中,直到有工作线程可用
  • newScheduledThreadPool(int nThreads):创建一个固定大小的线程池,可以安排在给定延迟后运行,或者定期执行
  • newWorkStealingThreadPool(int nThreads): Creates a thread pool that uses the work-stealing algorithm used by ForkJoinPool, which is particularly useful in case the worker threads generate other threads, such as in a recursive algorithm; it also adapts to the specified number of CPUs, which you may set higher or lower than the actual CPU count on your computer

    工作窃取算法

    工作窃取算法允许已完成分配任务的线程 帮助其他仍在忙于分配的任务。例如,请参阅 Oracle Java 官方文档中对 fork/join 实现的描述 (https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html)。

这些方法中的每一个都有一个重载版本,允许传入用于在需要时创建新线程的ThreadFactory。让我们看看它是如何在代码示例中工作的。首先,我们运行另一个版本的 MyRunnable 类,如下:

class MyRunnable implements Runnable {
   private String name;
   public MyRunnable(String name) {
      this.name = name;
   }
   public void run() {
      try {
            while (true) {
           System.out.println(this.name + 
                                     " is working...");
           TimeUnit.SECONDS.sleep(1);
         }
      } catch (InterruptedException e) {
        System.out.println(this.name + 
                      " was interrupted\n" + this.name + 
            " Thread.currentThread().isInterrupted()=" +
                  Thread.currentThread().isInterrupted());
      }
   }
}

我们不能再使用 parameter 属性来告诉线程停止执行,因为线程生命周期 现在将由ExecutorService 接口,其实现方式是调用 interrupt() 线程方法。另外,请注意我们创建的线程有一个无限循环,因此它永远不会停止执行,直到被强制执行(通过调用 interrupt() 方法)。

让我们编写执行以下操作的代码:

  • 创建一个包含三个线程的池
  • 确保池不接受更多线程
  • 等待一个固定的时间让所有线程完成他们正在做的事情
  • 停止(中断)没有完成他们正在做的事情的线程
  • 出口

以下代码执行上述列表中描述的所有操作:

ExecutorService pool = Executors.newCachedThreadPool();
String[] names = {"One", "Two", "Three"};
for (int i = 0; i < names.length; i++) {
    pool.execute(new MyRunnable(names[i]));
}
System.out.println("Before shutdown: isShutdown()=" +
            pool.isShutdown() + ", isTerminated()=" + 
                                pool.isTerminated());
pool.shutdown(); 
           // New threads cannot be added to the pool
//pool.execute(new MyRunnable("Four"));    
                          //RejectedExecutionException
System.out.println("After shutdown: isShutdown()=" +
           pool.isShutdown() + ", isTerminated()=" + 
                               pool.isTerminated());
try {
  long timeout = 100;
  TimeUnit timeUnit = TimeUnit.MILLISECONDS;
  System.out.println("Waiting all threads completion for "
                      + timeout + " " + timeUnit + "...");
         // Blocks until timeout, or all threads complete
         // execution, or the current thread is
         // interrupted, whichever happens first.
  boolean isTerminated = 
                pool.awaitTermination(timeout, timeUnit);
  System.out.println("isTerminated()=" + isTerminated);
  if (!isTerminated) {
    System.out.println("Calling shutdownNow()...");
    List<Runnable> list = pool.shutdownNow();
    System.out.println(list.size() + " threads running");
    isTerminated = 
                pool.awaitTermination(timeout, timeUnit);
    if (!isTerminated) {
     System.out.println("Some threads are still running");
    }
    System.out.println("Exiting");
  }
} catch (InterruptedException ex) {
    ex.printStackTrace();
}

在调用 pool.shutdown() 之后尝试将另一个线程 添加到池中会生成 java.util.concurrent.RejectedExecutionException

执行上述代码会产生以下结果:

JDK17 |java17学习 第 7 章 Java 标准和外部库

请注意前面屏幕截图中的 Thread.currentThread().isInterrupted()=false 消息。线程被中断。我们知道这一点是因为线程收到了 InterruptedException 消息。那么,为什么 isInterrupted() 方法会返回 false?这是因为线程状态在收到中断消息后立即被清除。我们现在提到它是因为它是一些程序员错误的根源。例如,如果主线程监视 MyRunnable 线程并在其上调用 isInterrupted(),则返回值将是false,线程中断后可能会产生误导。

因此,如果另一个线程可能正在监视 MyRunnable 线程,则必须将 MyRunnable 的实现更改为此。请注意以下代码 片段如何在 catch< 中调用 interrupt() 方法/代码>块:

class MyRunnable implements Runnable {
   private String name;
   public MyRunnable(String name) {
      this.name = name;
   }
   public void run() {
      try {
         while (true) {
             System.out.println(this.name + " is working...");
             TimeUnit.SECONDS.sleep(1);
         }
      } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         System.out.println(this.name + 
           " was interrupted\n" + this.name + 
           " Thread.currentThread().isInterrupted()=" + 
                Thread.currentThread().isInterrupted());
      }
   }
}

现在,如果我们再次使用相同的 ExecutorService 池运行 这个线程,结果如下:

JDK17 |java17学习 第 7 章 Java 标准和外部库

可以看到,isInterrupted()方法返回的值现在是true,对应 发生了什么事。公平地说,在许多应用程序中,一旦线程中断,就不会再次检查其状态。但是设置正确的状态是一种很好的做法,尤其是在您不是创建特定线程的高级代码的作者的情况下。

在我们的示例中,我们使用了一个缓存线程池,它根据需要创建一个新线程,或者,如果可用,重用已使用但已完成其工作并返回池以进行新分配的线程。我们并不担心创建了太多线程,因为我们的演示应用程序最多有三个工作线程,而且它们的寿命很短。

但是在应用程序没有它可能需要的工作线程的固定限制或者没有好的方法来预测线程可能需要多少内存或它可以执行多长时间的情况下,设置工作线程数的上限可以防止应用程序性能意外下降、内存不足或工作线程使用的任何其他资源耗尽。如果线程行为极其不可预测,则单个线程池可能是唯一的解决方案,可以选择使用自定义线程池执行器。但在大多数情况下,固定大小的线程池执行器是应用程序需求和代码复杂性之间的一个很好的实际折衷方案(在本节前面,我们列出了由 Executors 工厂类)。

将池的大小设置得太低可能会剥夺应用程序有效利用可用资源的机会。因此,在选择池大小之前,建议花一些时间监视应用程序,以识别应用程序行为的特性。事实上,deploy-monitor-adjust 循环必须在整个应用程序的生命周期中重复,以适应和利用代码或执行环境中发生的变化。

您要考虑的第一个特征 是系统中的 CPU 数量,因此线程池大小至少可以与 CPU 数量一样大。然后,您可以监控应用程序并查看每个线程占用 CPU 的时间以及它使用其他资源的时间(例如 input /output(I/O)操作)。如果不使用 CPU 所花费的时间与线程的总执行时间相当,那么您可以按以下比例增加池大小:未使用 CPU 的时间除以总执行时间,但这是在另一个资源(磁盘或数据库)不是线程之间争用的主题的情况。如果是后者,那么您可以使用该资源而不是 CPU 作为描述因素。

假设您的应用程序的工作线程不是太大或执行时间不是太长,并且属于在相当短的时间内完成工作的典型工作线程的主流群体,您可以通过添加 (四舍五入)所需的响应时间与线程使用 CPU 或其他最有争议的资源的时间的比率。这意味着,在期望的响应时间相同的情况下,线程使用 CPU 或其他并发访问的资源越少,池大小就应该越大。如果有争议的资源有自己的能力来提高并发访问(例如数据库中的连接池),请考虑首先使用该功能。

如果在不同情况下运行时所需的同时运行的线程数发生变化,您可以使池大小动态化并创建一个具有新大小的新池(在其所有线程完成后关闭旧池)。添加或删除可用资源后,可能还需要重新计算新池的大小。例如,您可以使用 Runtime.getRuntime().availableProcessors() 根据可用 CPU 的当前计数以编程方式调整池大小。

如果 Java 开发工具包 (JDK) 适合特定应用程序的需求,在从头开始编写线程管理代码之前,请尝试使用 java.util.concurrent.ThreadPoolExecutor先上课。它有几个重载的构造函数。

为了让您了解它的功能,这里是具有最多选项的构造函数:

ThreadPoolExecutor (int corePoolSize, 
                    int maximumPoolSize, 
                    long keepAliveTime, 
                    TimeUnit unit, 
                    BlockingQueue<Runnable> workQueue, 
                    ThreadFactory threadFactory, 
                    RejectedExecutionHandler handler)

这些是前面构造函数的参数

  • corePoolSize 是保留在池中的线​​程数,即使它们处于空闲状态,除非 allowCoreThreadTimeOut(boolean value) 方法使用 true 值调用。
  • maximumPoolSize 是池中允许的最大线程数。
  • keepAliveTime:当线程数大于核心时,这是多余的空闲线程在终止前等待新任务的最长时间。
  • unitkeepAliveTime 参数的时间单位。
  • workQueue 是用于在执行任务之前保存任务的队列;此队列将仅保存 execute() 方法提交的 Runnable 对象。
  • threadFactory 是执行器创建新线程时使用的工厂。
  • handler 是由于达到线程边界和队列容量而阻塞执行时使用的处理程序。

前面的构造函数 的每个参数,除了workQueue 也可以在ThreadPoolExecutor 类已被创建,从而允许对现有池特性进行更大的灵活性和动态调整。

Getting results from a thread

到目前为止,在我们的示例中,我们使用了 ExecutorService 接口execute() 方法 启动一个线程。事实上,这个方法来自 Executor 基接口。同时,ExecutorService 接口还有其他方法(在前面的使用线程池部分列出)可以启动线程并获取返回线程执行的结果。

带回线程执行结果的对象是 Future 类型——一个具有以下方法的接口:

  • V get():阻塞直到线程结束;返回结果(如果可用
  • V get(long timeout, TimeUnit unit):阻塞直到线程完成或提供的超时时间到;返回结果(如果可用)
  • boolean isDone():如果线程已完成,则返回 true
  • boolean cancel(boolean mayInterruptIfRunning):尝试取消线程的执行;如果成功则返回 true;返回 false 在调用方法时线程已正常完成的情况下
  • boolean isCancelled():如果线程执行在正常完成之前被取消,则返回 true

get() 方法描述中的if available 说明原则上结果并不总是可用,即使当调用不带参数的 get() 方法。这完全取决于用于生成 Future 对象的方法。以下是返回 Future 对象的所有 ExecutorService 方法的列表:

  • 未来<?> submit(Runnable task):提交线程(任务)执行;返回代表 任务的Future 对象;返回的 Future 对象的 get() 方法返回 null。例如,让我们使用仅工作 100 毫秒的 MyRunnable 类,如下所示:
    class MyRunnable 实现 Runnable {    私有字符串名称;    public MyRunnable(字符串名称) {      this.name = name;    }    公共无效运行(){       试试{          System.out.println(this.name +                                    "正在工作...");          TimeUnit.MILLISECONDS.sleep(100);          System.out.println(this.name + "完成");       } catch (InterruptedException e) {          Thread.currentThread().interrupt();          System.out.println(this.name +                       “被打断了\n” + 这个名字 +             " Thread.currentThread().isInterrupted()=" +                 Thread.currentThread().isInterrupted());       }    } }

并且基于上一节的代码示例,让我们创建一个关闭池并在必要时终止所有线程的方法,如下所示:

void shutdownAndTerminate(ExecutorService pool){
  try {
    long timeout = 100;
    TimeUnit timeUnit = TimeUnit.MILLISECONDS;
    System.out.println("Waiting all threads " + 
             "completion for " + timeout + " " + 
                               timeUnit + "...");
     //Blocks until timeout or all threads complete
     // execution, or the current thread is
     // interrupted, whichever happens first.
    boolean isTerminated = 
          pool.awaitTermination(timeout, timeUnit);
    System.out.println("isTerminated()=" +
                                     isTerminated);
    if(!isTerminated) {
      System.out.println("Calling shutdownNow()...");
      List<Runnable> list = pool.shutdownNow();
      System.out.println(list.size() + 
                                 " threads running");
       isTerminated = 
            pool.awaitTermination(timeout, timeUnit);
       if (!isTerminated) {
          System.out.println("Some threads are still running");
       }
       System.out.println("Exiting");
     }
  } catch (InterruptedException ex) {
      ex.printStackTrace();
   }
}

我们将在 finally 块中使用前面的 shutdownAndTerminate() 方法来确保没有运行 线程被留下。这是我们要执行的代码:

ExecutorService pool = Executors.newSingleThreadExecutor();
Future future = pool.submit(new MyRunnable("One"));
System.out.println(future.isDone());         
                                          //prints: false
System.out.println(future.isCancelled());    
                                          //prints: false
try{
    System.out.println(future.get());        
                                           //prints: null
    System.out.println(future.isDone());     
                                           //prints: true
    System.out.println(future.isCancelled());
                                          //prints: false
} catch (Exception ex){
    ex.printStackTrace();
} finally {
    shutdownAndTerminate(pool);
}

您可以在以下屏幕截图中看到此代码的输出

JDK17 |java17学习 第 7 章 Java 标准和外部库

正如所料,Future 对象的 get() 方法返回 null因为 Runnablerun() 方法不返回任何内容。我们可以从返回的 Future 对象中得到的只是任务是否完成的信息。

  • 未来<T> submit(Runnable task, T result):提交线程(任务)执行;返回一个代表任务的 Future 对象,其中包含提供的 result;例如,我们将使用以下类作为结果:
    类结果 {     私有字符串名称;     私人双重结果;     公共结果(字符串名称,双重结果){         this.name = name;         this.result = 结果;     }     @Override     public String toString() {         return "结果{name=" + name +                 ", result=" + result + "}";     } }

以下代码片段演示了 Future 对象如何返回默认结果,该对象由 返回提交()方法:

ExecutorService pool = 
                Executors.newSingleThreadExecutor();
Future<Result> future = 
            pool.submit(new MyRunnable("Two"), 
                            new Result("Two", 42.));
System.out.println(future.isDone());   
                                     //prints: false
System.out.println(future.isCancelled());     
                                      //prints: false
try{
    System.out.println(future.get());         
                                       //prints: null
    System.out.println(future.isDone());      
                                       //prints: true
    System.out.println(future.isCancelled());
                                      //prints: false
} catch (Exception ex){
    ex.printStackTrace();
} finally {
    shutdownAndTerminate(pool);
}

如果我们执行前面的 代码,输出将如下所示:

JDK17 |java17学习 第 7 章 Java 标准和外部库

正如所料,Futureget() 方法返回作为参数传入的对象。

  • 未来<T> submit(Callable task) :提交线程(任务)执行;返回一个代表任务的 Future 对象,其结果由 V call() 方法产生并返回 Callable 接口,这是该接口唯一的 Callable 方法。这是一个例子:
    class MyCallable 实现 Callable {    私有字符串名称;    public MyCallable(字符串名称) {         this.name = name;    }    public 结果调用() {       试试{          System.out.println(this.name +                                         "正在工作...");          TimeUnit.MILLISECONDS.sleep(100);          System.out.println(this.name + "完成");          return new Result(name, 42.42);       } catch (InterruptedException e) {          Thread.currentThread().interrupt();          System.out.println(this.name +                       “被打断了\n” + 这个名字 +             " Thread.currentThread().isInterrupted()=" +                 Thread.currentThread().isInterrupted());       }       返回null;    }

上述代码 的结果如下所示:

JDK17 |java17学习 第 7 章 Java 标准和外部库

可以看到,Future对象get()方法返回 MyCallable 类的 call() 方法产生的值:

  • 列表<未来<T>> invokeAll(Collection :执行所提供集合的所有 Callable任务;返回 Future 对象的列表,其中包含由执行的 Callable 对象产生的结果
  • 列表<未来<T>> invokeAll(Collection :执行所提供集合的所有 Callable任务;返回 Future< /code> 对象与执行的 Callable 对象产生的结果或超时到期,以先发生者为准
  • T invokeAny(Collection<Callable<T>>tasks):执行所提供集合的所有Callable任务;返回已成功完成的结果(意思是,不抛出异常),如果有的话
  • T invokeAny(Collection :执行所有 Callable任务提供收藏;如果在提供的超时到期之前可用,则返回已成功完成的结果(意思是不抛出异常)

如您所见,有很多方法可以从线程中获取结果。您选择的方法取决于您的应用程序的特定需求。

Parallel versus concurrent processing

当我们听说正在工作的 线程同时执行时,我们会自动假设它们确实在执行它们被编程 要做的事情平行。只有在我们深入了解这样一个系统的底层之后,我们才会意识到只有当线程分别由不同的 CPU 执行时,这样的并行处理才是可能的。否则,它们分时共享相同的处理能力。我们认为它们同时工作只是因为它们使用的时间段非常短——只是我们日常生活中使用的时间单位的一小部分。当线程共享相同的资源时,在计算机科学中,我们说它们并发

Concurrent modification of the same resource

两个或多个线程修改相同的值 而其他线程读取它是对并发访问问题之一的最一般描述。更微妙的问题包括线程干扰和内存一致性错误,两者都会产生意想不到的结果 在看似良性的代码片段 中。在本节中,我们将演示此类情况以及避免它们的方法。

乍一看,解决方案似乎很简单:一次只允许一个线程修改/访问资源,就是这样。但是,如果访问需要很长时间,就会产生瓶颈,可能会消除许多线程并行工作的优势。或者,如果一个线程在等待访问另一个资源时阻塞了对一个资源的访问,而第二个线程在等待访问第一个资源时阻塞了对第二个资源的访问,它会创建一个称为死锁的问题。这是程序员在使用多线程时可能遇到的挑战的两个非常简单的例子。

首先,我们将重现由同时修改相同值引起的问题。让我们创建一个 Calculator 接口,如下:

interface Calculator {
    String getDescription();
    double calculate(int i);
}

我们将使用 getDescription() 方法来获取实现的描述。这是第一个实现:

class CalculatorNoSync implements Calculator{
    private double prop;
    private String description = "Without synchronization";
    public String getDescription(){ return description; }
    public double calculate(int i){
        try {
            this.prop = 2.0 * i;
            TimeUnit.MILLISECONDS.sleep(i);
            return Math.sqrt(this.prop);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("Calculator was interrupted");
        }
        return 0.0;
    }
}

如您所见,calculate() 方法为 prop 属性分配一个新值,然后执行其他操作(我们模拟它通过调用 sleep() 方法),然后计算分配给 prop 属性的值的平方根。 "Without synchronization" 描述描述了 prop 属性的值在每次 calculate()方法——没有任何协调或同步,因为它在协调的情况下被调用< /a> 在线程同时修改同一资源时。

我们现在要在两个线程之间共享 这个对象,这意味着 prop 属性将被同时更新和使用.因此,围绕 prop 属性的某种线程同步是必要的,但我们决定我们的第一个实现不这样做。

这是我们在执行我们将要创建的每个 Calculator 实现时使用的方法:

void invokeAllCallables(Calculator c){
    System.out.println("\n" + c.getDescription() + ":");
    ExecutorService pool = Executors.newFixedThreadPool(2);
    List<Callable<Result>> tasks = 
                              List.of(new MyCallable("One", c), 
                                     new MyCallable("Two", c));
    try{
        List<Future<Result>> futures = pool.invokeAll(tasks);
        List<Result> results = new ArrayList<>();
        while (results.size() < futures.size()){
            TimeUnit.MILLISECONDS.sleep(5);
            for(Future future: futures){
                if(future.isDone()){
                    results.add((Result)future.get());
                }
            }
        }
        for(Result result: results){
            System.out.println(result);
        }
    } catch (Exception ex){
        ex.printStackTrace();
    } finally {
        shutdownAndTerminate(pool);
    }
}

如您所见,上述方法 执行以下操作:

  • 打印传入的 Calculator 实现的描述。
  • 为两个线程创建一个固定大小的池。
  • 创建两个 Callable 任务的列表——以下 MyCallable 类的对象:
    class MyCallable 实现 Callable
                       
                 
                   
                 
                   ; {     私有字符串名称;     私人计算器计算器;     public MyCallable(字符串名称,                       计算器){         this.name = name;         this.calculator = 计算器;     }     public 结果调用() {         双倍总和 = 0.0;         for(int i = 1; i < 20; i++){             sum +=calculate(i);         }         return new Result(name, sum);     } } 
                 
  • 任务列表 被传递到池的 invokeAll() 方法中,其中每个任务通过调用call() 方法;每个 call() 方法都应用传入的 Calculator< 的 calculate() 方法/code> 对象从 1 到 20 的 19 个数字中的每一个,并对结果求和。在 Result 对象中返回结果总和,以及 MyCallable 对象的名称。
  • 每个 Result 对象最终都会在 Future 对象中返回。
  • invokeAllCallables() 方法然后遍历 Future 对象列表并检查它们的每个任务是否已完成。当一个任务完成时,结果被添加到 List ;结果
  • 完成所有任务后,invokeAllCallables() 方法然后打印 List 的所有元素。结果 并终止池。

这是我们从 invokeAllCallables(new CalculatorNoSync()) 运行中得到的结果:

JDK17 |java17学习 第 7 章 Java 标准和外部库

每次我们运行前面的代码时,实际数字都会略有不同,但 One 任务的结果永远不会等于 的结果Two 任务。这是因为,在设置 prop 字段的值和在 calculate() 方法中返回其平方根之间的期间,另一个线程设法为 prop 分配不同的值。这是线程干扰的情况。

有几种方法可以解决这个问题。我们从原子变量开始,作为实现对属性的线程安全并发访问的一种方式。然后,我们还将演示两种线程同步的方法。

Atomic variable

原子变量只能在其当前值与预期匹配时更新一。在我们的例子中,这意味着如果 prop 值已被另一个线程更改,则不应使用它。

java.util.concurrent.atomic 包有十几个支持这种逻辑的类:AtomicBooleanAtomicIntegerAtomicReferenceAtomicIntegerArray 等等。这些类中的每一个都有许多方法,可用于不同的同步 需求。查看每个 的在线应用程序编程接口(API)文档类(https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/atomic/package-summary.html)。对于演示,我们将仅使用它们中存在的两种方法,如下所述:

  • V get():返回当前值
  • boolean compareAndSet(V expectedValue, V newValue):如果当前值通过 (==) 运算符 expectedValue 值;如果成功则返回 true 或如果实际值不等于预期值则返回 false

下面是如何使用 AtomicReference 类来解决访问 prop 属性时的线程干扰问题="literal">Calculator 对象同时使用这两种方法:

class CalculatorAtomicRef implements Calculator {
    private AtomicReference<Double> prop = 
                              new AtomicReference<>(0.0);
    private String description = "Using AtomicReference";
    public String getDescription(){ return description; }
    public double calculate(int i){
       try {
          Double currentValue = prop.get();
          TimeUnit.MILLISECONDS.sleep(i);
          boolean b = 
           this.prop.compareAndSet(currentValue, 2.0 * i);
          //System.out.println(b); 
                            //prints: true for one thread 
                            //and false for another thread
           return Math.sqrt(this.prop.get());
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         System.out.println("Calculator was interrupted");
       }
       return 0.0;
    }
}

如您所见,前面的 代码确保 currentValueprop 属性在线程休眠时不会更改。下面是我们运行 invokeAllCallables(new CalculatorAtomicRef()) 时产生的消息截图:

JDK17 |java17学习 第 7 章 Java 标准和外部库

现在,线程 产生的结果 是相同的。

java.util.concurrent 包的以下类也提供同步支持:

  • Semaphore:限制可以访问资源的线程数
  • CountDownLatch:允许一个或多个线程等待,直到其他线程中正在执行的一组操作完成
  • CyclicBarrier:允许线程集互相等待到达共同的屏障点
  • Phaser:提供更灵活的屏障形式,可用于控制多个线程之间的分阶段计算
  • Exchanger:允许两个线程在集合点交换对象,在多种管道设计中很有用

Synchronized method

解决问题的另一种方法 是使用同步方法。这是 Calculator 接口 的另一个实现,它使用这种解决线程干扰的方法:

class CalculatorSyncMethod implements Calculator {
    private double prop;
    private String description = "Using synchronized method";
    public String getDescription(){ return description; }
    synchronized public double calculate(int i){
       try {
           //there may be some other code here
           synchronized (this) {
              this.prop = 2.0 * i;
              TimeUnit.MILLISECONDS.sleep(i);
              return Math.sqrt(this.prop);
           }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         System.out.println("Calculator was interrupted");
       }
       return 0.0;
    }
}

我们刚刚在前面添加了 synchronized关键字="literal">calculate() 方法。现在,如果我们运行 invokeAllCallables(new CalculatorSyncMethod()),两个线程的结果总是相同的,如下所示:

JDK17 |java17学习 第 7 章 Java 标准和外部库

这是因为在当前线程(已经进入该方法的线程)退出之前,另一个线程无法进入同步方法。这可能是最简单的解决方案,但是如果方法执行时间过长,这种方法 可能会导致性能下降。在这种情况下,可以使用同步块,它只在原子操作中包装几行代码。

Synchronized block

下面是一个用于解决线程干扰问题的同步块的例子:

class CalculatorSyncBlock implements Calculator {
    private double prop;
    private String description = "Using synchronized block";
    public String getDescription(){
        return description;
    }
    public double calculate(int i){
        try {
            //there may be some other code here
            synchronized (this) {
                this.prop = 2.0 * i;
                TimeUnit.MILLISECONDS.sleep(i);
                return Math.sqrt(this.prop);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("Calculator was interrupted");
        }
        return 0.0;
    }
}

可以看到,同步块 获取了 this 对象上的锁,该对象由两个线程共享,然后释放<一个 id="_idIndexMarker1042"> 它仅在线程退出块之后。在我们的演示代码中,block 覆盖了方法的所有代码,所以性能上没有区别。但是想象一下,方法中有更多代码(我们将位置注释为 这里可能还有一些其他代码)。如果是这种情况,则代码的同步部分更小,因此成为瓶颈的机会更少。

如果我们运行 invokeAllCallables(new CalculatorSyncBlock()),结果如下所示:

JDK17 |java17学习 第 7 章 Java 标准和外部库

如您所见,结果与前两个示例完全相同。 java.util.concurrent.locks 包中组装了针对不同需求和不同行为的不同类型的锁。

Java 中的每个对象都继承了 wait()notify()notifyAll()来自基 对象的 方法。这些方法也可以用来控制线程的行为和它们对锁的访问。

Concurrent collections 

解决并发问题的另一种方法 是使用 java.util 中的线程安全集合 .concurrent 包。在选择要使用的集合 之前,请阅读 Javadoc 文档(https://docs.oracle.com/en/java/javase/17/docs/api/index.html< /a>) 查看集合的限制对于您的应用程序是否可以接受。以下是这些收藏的列表和一些建议:

  • ConcurrentHashMap<K,V>:支持检索的全并发和更新的高预期并发;当并发要求非常苛刻并且您需要允许锁定写操作但不需要锁定元素时使用它。
  • ConcurrentLinkedQueue<E>:基于链接节点的线程安全队列;采用高效的非阻塞算法。
  • ConcurrentLinkedDeque<E>:基于链接节点的并发队列;当许多线程共享对公共集合的访问时,ConcurrentLinkedQuequeConcurrentLinkedDeque 都是合适的选择。
  • ConcurrentSkipListMap<K,V>:一个并发的ConcurrentNavigableMap接口实现。
  • ConcurrentSkipListSet<E>:基于 ConcurrentSkipListMap 类的并发 NavigableSet 实现. ConcurrentSkipListSetConcurrentSkipListMap 类,根据 Javadoc 文档,“< em class="italic">为包含、添加和删除操作及其变体提供预期的平均 log(n) 时间成本。升序视图及其迭代器比降序视图更快。”当您需要以特定顺序快速迭代元素时使用它们。
  • CopyOnWriteArrayList<E>ArrayList 的线程安全变体,其中所有可变操作(添加、设置等)通过制作底层数组的新副本来实现。根据 Javadoc 文档,CopyOnWriteArrayList 类“通常成本太高,但可能当遍历操作的数量远远超过突变时,它比替代方案更有效,并且在您不能或不想同步遍历但需要排除并发线程之间的干扰时很有用。”当您 不需要在不同位置添加新元素 并且不需要排序时使用它;否则,使用 ConcurrentSkipListSet
  • CopyOnWriteArraySet<E>:使用内部 CopyOnWriteArrayList 类进行所有操作的集合。
  • PriorityBlockingQueue:当自然顺序可接受并且您需要快速向尾部添加元素并从头部快速移除元素时,这是一个更好的选择 的队列。 阻塞表示队列在检索元素时等待变为非空,在存储元素时等待队列中的空间变为可用。
  • ArrayBlockingQueueLinkedBlockingQueueLinkedBlockingDeque有固定大小(有界);其他队列是无界的。

根据指南使用这些和类似的特征和建议,但在实现功能之前和之后执行全面的测试和性能测量。为了演示其中一些集合的功能,让我们使用 CopyOnWriteArrayList 。首先,让我们看看下面的代码片段,看看当我们尝试同时修改它时 ArrayList 的行为:

List<String> list = Arrays.asList("One", "Two");
System.out.println(list);
try {
    for (String e : list) {
        System.out.println(e);  //prints: One
        list.add("Three");      //UnsupportedOperationException
    }
} catch (Exception ex) {
    ex.printStackTrace();
}
System.out.println(list);       //prints: [One, Two]

正如预期的那样,在迭代 时尝试修改列表会产生异常,并且列表 保持不变。

现在,让我们在相同的情况下使用 CopyOnWriteArrayList<E>,如下所示:

List<String> list = 
       new CopyOnWriteArrayList<>(Arrays.asList("One", "Two"));
System.out.println(list);
try {
    for (String e : list) {
        System.out.print(e + " "); //prints: One Two
        list.add("Three");         //adds element Three
    }
} catch (Exception ex) {
    ex.printStackTrace();
}
System.out.println("\n" + list);
                             //prints: [One, Two, Three, Three]

此代码产生的输出如下所示:

JDK17 |java17学习 第 7 章 Java 标准和外部库

如您所见,列表被修改 无一例外,但不是当前迭代的副本。这就是您可以根据需要使用的行为

Addressing memory consistency errors

内存一致性错误 在多线程 环境中可能有多种形式和原因。在 java.util.concurrent 包的 Javadoc 文档中对它们进行了很好的讨论。在这里,我们将只提及最常见的情况,这是由于缺乏可见性造成的。

当一个线程更改属性值时,另一个线程可能不会立即看到更改,并且您不能将 synchronized 关键字用于原始类型。在这种情况下,请考虑对属性使用 volatile 关键字,因为这可以保证其在不同线程之间的读/写可见性。

并发问题不容易解决,这就是为什么越来越多的开发人员现在采取更激进的方法也就不足为奇了。他们更喜欢在一组无状态操作中处理数据,而不是管理对象状态。我们将在 第 13 章中看到此类代码的示例>函数式编程第 14 章Java 标准流。似乎 Java 和许多现代语言和计算机系统都在朝着这个方向发展。

Summary

在本章中,我们讨论了多线程处理、组织它的方法以及避免因并发修改共享资源而导致的不可预测的结果。我们已经向您展示了如何创建线程并使用线程池执行它们。我们还演示了如何从成功完成的线程中提取结果,并讨论了并行处理和并发处理之间的区别。

在下一章中,我们将让您更深入地了解 JVM 及其结构和流程,并详细讨论防止内存溢出的垃圾收集过程。在本章结束时,您将了解 Java 应用程序执行的构成、JVM 中的 Java 进程、垃圾收集以及 JVM 的一般工作原理。

Quiz

  1. 选择所有正确的陈述:
    1. JVM 进程可以有主线程。
    2. 主线程是主进程。
    3. 一个进程可以启动另一个进程。
    4. 一个线程可以启动另一个线程。
  2. 选择所有正确的陈述:
    1. 守护进程是一个用户线程。
    2. 第一个用户线程完成后,守护线程退出。
    3. 最后一个用户线程完成后,一个守护线程退出。
    4. 主线程是用户线程。
  3. 选择所有正确的陈述:
    1. 所有线程都有 java.lang.Thread 作为基类。
    2. 所有线程都扩展java.lang.Thread
    3. 所有线程都实现java.lang.Thread
    4. 守护线程不扩展 java.lang.Thread
  4. 选择所有正确的陈述:
    1. 任何类都可以实现Runnable接口。
    2. Runnable 接口实现是一个线程。
    3. Runnable 接口实现被线程使用。
    4. Runnable 接口只有一个方法。
  5. 选择所有正确的陈述:
    1. 线程名称必须是唯一的。
    2. 会自动生成线程 ID。
    3. 可以设置线程名称。
    4. 可以设置线程优先级。
  6. 选择所有正确的陈述:
    1. 线程池执行线程。
    2. 线程池重用线程。
    3. 一些线程池可以有固定的线程数。
    4. 一些线程池可以有无限数量的线程。
  7. 选择所有正确的陈述:
    1. Future 对象是从线程获取结果的唯一方法。
    2. Callable 对象是从线程获取结果的唯一方法。
    3. Callable 对象允许我们从线程中获取结果。
    4. Future 对象代表一个线程。
  8. 选择所有正确的陈述:
    1. 并发处理可以并行进行。
    2. 只有在计算机上有多个 CPU 或内核可用时才能进行并行处理。
    3. 并行处理是并发处理。
    4. 没有多个 CPU,并发处理是不可能的。
  9. 选择所有正确的陈述:
    1. 同时修改总是会导致错误的结果。
    2. 原子变量保护属性​​免受并发修改。
    3. 原子变量保护属性​​免受线程干扰。
    4. 原子变量是保护属性免受并发修改的唯一方法。
  10. 选择所有正确的陈述:
    1. synchronized 方法是避免线程干扰的最佳方法。
    2. synchronized 关键字可以应用于任何方法。
    3. 同步的方法会造成处理瓶颈。
    4. 同步的方法很容易实现。
  11. 选择所有正确的陈述:
    1. 同步的块只有在小于方法时才有意义。
    2. 同步的块需要共享锁。
    3. 每个 Java 对象都可以提供一个锁。
    4. 同步块是避免线程干扰的最佳方式。
  12. 选择所有正确的陈述:
    1. 使用并发集合优于使用非并发集合。
    2. 使用并发收集会产生一些开销。
    3. 并非每个并发集合都适合每个并发处理场景。
    4. 我们可以通过调用Collections.makeConcurrent()方法来创建并发集合。
  13. 选择所有正确的陈述:
    1. 避免内存一致性错误的唯一方法是声明 volatile 变量。
    2. 使用 volatile 关键字可确保值更改在所有线程中的可见性。
    3. 避免并发的一种方法是避免任何状态管理。
    4. 无状态实用程序方法不能有并发问题。