vlambda博客
学习文章列表

读书笔记《hands-on-high-performance-with-spring-5》多线程和并发编程

Multithreading and Concurrent Programming

在上一章中,我们学习了如何优化 Spring 消息传递。我们还学习了各种配置技巧和窍门,帮助我们提高应用程序的性能。我们还研究了 JMS 和 RabbitMQ 的监控和配置以获得最佳性能。

在本章中,我们将介绍 Java 线程的核心概念,然后将讨论 java.util.concurrent 包提供的高级线程支持。对于这个包,我们将看到帮助我们编写多线程和并发编程的各种类和接口。我们还将学习如何使用 Java ThreadPool 来提高性能。我们将介绍 Spring Framework 提供的有用功能,例如任务执行、调度和异步运行。最后,我们将研究带有线程的 Spring 事务管理和线程的各种最佳编程实践。

本章将涵盖以下主题:

  • Java classical threads
  • The java.util.concurrent package
  • Using the thread pools for asynchronous processing
  • Spring task execution and scheduling
  • Spring Async
  • Spring and threads—transactions
  • Java threads best programming practices

Java classical threads

Java 应用程序通过线程执行,线程是程序中独立的执行路径。任何 Java 程序都至少有一个线程,称为主线程,由 Java 虚拟机 (JVM) 创建。 Java 是一个多线程应用程序,它允许在任何特定时间执行多个线程,并且这些线程可以异步或同步地并发运行。当多个线程正在执行时,每个线程的路径可能与其他线程的路径不同。

JVM 为每个线程提供了自己的堆栈,以防止线程相互干扰。单独的堆栈帮助线程跟踪它们要执行的下一条指令,这可能因线程而异。堆栈还为线程提供了它自己的方法参数、局部变量和返回值的副本。

线程存在于进程中,并与进程的其他线程共享它们的资源,例如内存和打开的文件。在不同线程之间共享资源的能力使它们更更容易出错,其中性能是一个重要的要求。 Java 中的每个线程都是由 java.lang.Thread 类和 java.lang.Runnable 接口创建和控制的。

Creating threads

线程是 Java 语言中的对象。可以使用以下机制创建它们:

  • Create a class that implements the Runnable interface
  • Create a class that extends the Thread class

有两种方法可以创建 Runnable 对象。第一种方法是创建一个实现Runnable接口的类,如下:

public class ThreadExample {
  public static void main(String[] args) {
    Thread t = new Thread(new MyThread());
    t.start();
  }
}
class MyThread implements Runnable {
  private static final Logger LOGGER =     
  Logger.getLogger(MyThread.class);
  public void run() {
    //perform some task
    LOGGER.info("Hello from thread...");
  }
}

在 Java 8 之前,我们只有这种方式来创建 Runnable 对象。但是从 Java 8 开始,我们可以使用 Lambda 表达式创建一个 Runnable 对象。

创建 Runnable 对象后,我们需要将它传递给一个 Thread 构造函数,该构造函数接收 Runnable 对象作为参数:

Runnable runnable = () -> LOGGER.info("Hello from thread...");
Thread t = new Thread(runnable);

一些构造函数不将 Runnable 对象作为参数,例如 Thread()。在这种情况下,我们需要采取另一种方法来创建线程:

public class ThreadExample1 {
  public static void main(String[] args) {
    MyThread t = new MyThread1();
    t.start();
  }

}
class MyThread1 extends Thread {
  private static final Logger LOGGER = 
  Logger.getLogger(MyThread1.class);
  public void run() {
    LOGGER.info("Hello from thread...");
  }
}

Thread life cycle and states

在使用线程和多线程环境时,了解线程生命周期和状态非常重要。在前面的示例中,我们看到了如何使用 Thread 类和 Runnable 接口创建 Java 线程对象。但是要启动线程,我们首先要创建线程对象并调用它的start()方法来作为线程执行run()方法。

以下是Java中线程生命周期的不同状态:

  • New: The thread is in the new state when it is created with a new operator. At this stage, the thread is not alive.
  • Runnable: The thread is in the runnable state when we call the start() method of the thread object. At this stage, the thread scheduler still does not pick it for running.
  • Running: The thread state is changed from runnable to running when the thread scheduler has selected it.
  • Blocked/waiting: The thread state is blocked/waiting when it is currently not eligible to run.
  • Terminated/dead: The thread state is terminated/dead when it executes its run method. At this stage, it's considered to be not alive.

More advanced thread tasks

我们已经看到了线程生命周期及其状态,但线程也支持一些高级任务,例如休眠、加入和中断。让我们讨论一下:

  • Sleeping: The sleep() thread method can be used to pause the execution of the current thread for the specified amount of time.
  • Joining: The join() thread method can be used to pause the execution of the current thread until the thread it joins completes its task.
  • Interrupting: The interrupt() thread method can be used to break out the sleeping or waiting state of the thread. It throws InterruptedException if the thread is in the sleeping or waiting state, otherwise, it doesn't interrupt the thread but sets the interrupted flag to true.

Synchronizing threads

在多线程应用程序中,可能存在多个线程尝试访问共享资源并产生错误和意外结果的情况。我们需要确保资源一次只能被一个线程使用,这可以通过同步来实现。 synchronized关键字用于实现同步;当我们在 Java 中定义任何同步块时,只有一个线程可以访问该块,并且其他线程被阻塞,直到块内的线程退出该块。

synchronized 关键字可用于以下不同类型的块:

  • Instance methods
  • Static methods
  • Code blocks inside instance methods
  • Code blocks inside static methods

在 Java 中,同步块会降低性能。我们必须在需要的时候使用 synchronized 关键字,否则,我们应该只在需要的地方使用临界区的同步块。

Issues with multithreading

多线程是一种非常强大的机制,可以帮助我们更好地利用系统资源,但是我们在读写多线程共享的数据时需要特别小心。多线程编程有两个基本问题——可见性和访问问题。当一个线程的效果可以被另一个线程看到时,就会出现可见性问题。当多个线程同时访问相同的共享资源时,可能会出现访问问题。

由于可见性和访问问题,程序不再做出反应,并导致死锁或生成不正确的数据。

The java.util.concurrent package

在上一节中,我们重点介绍了 Java 对线程的低级支持。在本节中,我们将继续查看 java.util.concurrent 包提供的 Java 高级线程支持。这个包有各种类和接口,它们提供了非常有用的功能来帮助我们实现多线程和并发编程。在本节中,我们将主要关注这个包的一些最有用的实用程序。

下图显示了 java.util.concurrent API 的高级概述:

读书笔记《hands-on-high-performance-with-spring-5》多线程和并发编程

让我们详细讨论接口。

Executors

Executor为所有内部线程管理任务提供一个抽象层,管理线程的整个并发执行流程。 Executor 是执行提供的任务的对象。

Java 并发 API 为执行器提供了以下三个基本接口:

  • Executor: This is a simple interface that is used to launch a new task. It does not strictly require the execution to be asynchronous.
  • ExecutorService: This is a subinterface of the Executor interface. It allows us to pass a task to be executed by a thread asynchronously. It provides methods to manage the termination of previously sublimed tasks through shutdown(), shutdownNow(), and awaitTermination(long timeout, TimeUnit unit). It also provides methods that return the Future object for tracking the progress of one or more asynchronous tasks.
  • ScheduledExecutorService: This is a subinterface of ExecutorService. It provides various key methods, such as schedule(), scheduleAtFixedRate() and scheduleWithFixedDelay(). All schedule methods can accept relative delays and periods as arguments, and this helps us to schedule tasks to execute after a given delay or period.

以下是一个简单示例,展示了如何创建 Executor 以执行 Runnable 任务:

public class ExecutorExample {
    private static final Logger LOGGER = 
    Logger.getLogger(ExecutorExample.class);

    public static void main(String[] args) {
        ExecutorService pool = Executors.newSingleThreadExecutor();
 
            Runnable task = new Runnable() {
            public void run() {
                LOGGER.info(Thread.currentThread().getName());
            }
        }; 
 
        pool.execute(task); 
        pool.shutdown();
    }
}

在前面的示例中,Runnable 对象由匿名类创建,并通过单线程 Executor 接口执行任务。当我们编译并运行前面的类时,我们将得到以下输出:

pool-1-thread-1

ThreadFactory

ThreadFactory 接口用于按需创建新线程,也帮助我们消除了创建线程的大量样板代码。

以下示例显示了我们如何使用 ThreadFactory 接口来创建新线程:

public class ThreadFactoryExample implements ThreadFactory {
  private static final Logger LOGGER =   
  Logger.getLogger(ThreadFactoryExample.class);

  public static void main(String[] args) {
    ThreadFactoryExample factory = new ThreadFactoryExample();

    Runnable task = new Runnable() {
      public void run() {
        LOGGER.info(Thread.currentThread().getName());
      }
    };
    for (int i = 0; i < 5; i++) {
      Thread t = factory.newThread(task);
      t.start();
    }
  }

  @Override
  public Thread newThread(Runnable r) {
    Thread t = new Thread(r);
    return t;
  }
}

当我们编译并运行前面的类时,我们将得到以下输出:

Thread-0
Thread-1

Synchronizers

Java 提供了synchronized 关键字来编写同步代码,但是仅通过synchronized 关键字很难正确编写同步代码。 java.util.concurrent 包提供了各种实用程序类,例如 CountDownLatchCyclicBarrierExchangerSemaphorePhaser,它们被称为同步器。同步器是不使用 wait()notify() 方法提供线程同步的并发实用程序。让我们看一下以下类:

  • CountDownLatch: This allows one thread to wait for one or more threads to complete before it can start processing.
  • CyclicBarrier: This is very similar to CountdownLatch, but it allows multiple threads to wait for each other before they can start processing.
  • Semaphore: This maintains a set of permits for restricting the number of threads that can access a shared resource. Threads require a permit from Semaphore before accessing a shared resource. It provides two main methods, acquire() and release(), for getting and releasing permits, respectively.
  • Exchanger: This provides a synchronization point where threads can exchange objects.
  • Phaser: This provides a thread synchronization mechanism similar to CyclicBarrier and CountDownLatch, but it supports more flexible usage. It allows a group of threads to wait on a barrier and then proceed after the last thread arrives, and it also supports multiple phases of execution.

Concurrent collection classes

并发集合类提供了比其他集合类(例如 HashMapHashtable)更高的可伸缩性和性能。以下是 java.util.concurrent 包中提供的有用并发类:

  • ConcurrentHashMap: This is similar to HashMap and Hashtable, but it has been designed to work in concurrent programming without the need for explicit synchronization. Hashtable and ConcurrentHashMap are both thread-safe collections, but ConcurrentHashMap is more advanced than Hashtable. It does not lock the entire collection for synchronization, so it is very useful when there are a lot of updates and fewer concurrent reads.
  • BlockingQueue: The producer-consumer pattern is the most common design pattern in asynchronous programming, and the BlockingQueue data structure can be very useful in these asynchronous scenarios.
  • DelayQueue: This is an infinite size blocking queue of elements where an element can only be taken when its delay has expired. If multiple elements delay expiry, then the element with the longest delay expiration will be taken first.

Lock

Lock 接口提供了比 同步 块更高级的锁定机制。 同步块和Lock的主要区别在于同步块完全包含在一个方法中,而Lock接口有单独的方法,< span> lock() unlock(), 那个可以用不同的方法调用。

Callable and Future

Callable接口类似于 Runnable 对象,但是它可以返回任何类型的对象,这对我们有帮助从 Callable 任务中获取结果或状态。

Callable 任务返回 Future 对象,用于获取异步操作的结果。它的用途包括提供一些方法来检查异步执行是否完成以及检索计算结果。

Atomic variables

原子变量是 java.util.concurrent.atomic 包中引入的非阻塞算法。使用原子变量的主要好处是我们不需要担心同步。原子变量是多线程环境中避免数据不一致的必要条件。它支持对单个变量的无锁、线程安全操作。

Using thread pools for asynchronous processing

线程池是多线程编程中的一个核心概念,它为可用于执行任务的空闲线程集合提供服务。线程池可以重用之前创建的线程来执行当前任务,使线程在请求​​到达时就已经可用,这样可以减少线程创建的时间,提高应用程序的性能。通常,线程池可以在 Web 服务器中用于处理客户端请求并维护与数据库的开放连接。

我们可以配置池中的最大并发线程数,这对于防止过载很有用。如果所有线程都忙于执行任务,则将新任务放入队列中并等待线程可用。

Java 并发 API 支持以下类型的线程池:

  • Fixed-thread pool: A thread pool with a fixed number of threads. A task will only execute if a thread is available, otherwise, it is waiting in a queue. The Executors.newFixedThreadPool() method is used to create a fixed-thread pool.
  • Cached-thread pool: A thread pool where we can create new threads as required, but also reuse previously created threads. A thread will be terminated and removed from the pool if it is ideal for 60 seconds. The Executors.newCachedThreadPool() method is used to create a cached-thread pool.
  • Single-thread pool: A thread pool with one thread. It executes tasks one by one. The Executors.newSingleThreadExecutor() method is used to create a single-thread pool.
  • Fork/join pool: A thread pool that is used to perform heavy tasks faster, by splitting the task into smaller pieces recursively. To create a fork/join pool, we need to create an instance of the ForkJoinPool class.

下面是一个固定线程池的简单例子:

public class ThreadPoolExample {
  private static final Logger LOGGER = 
  Logger.getLogger(ThreadPoolExample.class);
  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(3);

    for (int i = 1; i <= 6; i++) {
      Runnable task = new Task(" " + i);
      executor.execute(task);
    }
    executor.shutdown();
    while (!executor.isTerminated()) {
    }
    LOGGER.info("All threads finished");
  }
}

下面演示了任务是如何实现的:

public class Task implements Runnable {
  private static final Logger LOGGER = Logger.getLogger(Task.class);
  private String taskNumber;

  public Task(String taskNumber) {
    this.taskNumber = taskNumber;
  }

  @Override
  public void run() {
    LOGGER.info(Thread.currentThread().getName() + ", Execute Task = " 
    + taskNumber);
    taskProcess();
    LOGGER.info(Thread.currentThread().getName() + ", End");
  }

  private void taskProcess() {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

在前面的示例中,我们创建了一个最多包含三个并发线程的池,并将 6 个任务提交给 executor 对象。当我们编译并运行前面的类时,我们知道只有三个线程执行任务。

这是输出:

pool-1-thread-1, Execute Task = 1
pool-1-thread-2, Execute Task = 2
pool-1-thread-3, Execute Task = 3
pool-1-thread-1, End
pool-1-thread-1, Execute Task = 4
pool-1-thread-3, End
pool-1-thread-2, End
pool-1-thread-2, Execute Task = 5
pool-1-thread-3, Execute Task = 6
pool-1-thread-1, End
pool-1-thread-2, End
pool-1-thread-3, End
All threads finished

Spring task execution and scheduling

当我们处理长时间运行的任务时,在任何 Web 应用程序中使用线程都不是一件容易的事。有时,我们需要异步或在特定延迟后运行任务,这可以通过 Spring 的任务执行和调度来完成。 Spring 框架通过 TaskExecutorTaskScheduler 接口引入了用于异步执行和调度任务的抽象。

TaskExecutor

Spring 提供了 TaskExecutor 接口作为处理 Executor 的抽象。 TaskExecutor的实现类如下:

  • SimpleAsyncTaskExecutor: This starts a new thread and executes it asynchronously. It does not reuse the thread.
  • SyncTaskExecutor: This executes each task synchronously in the calling thread. It does not reuse the thread.
  • ConcurrentTaskExecutor: This exposes bean properties for configuring java.util.concurrent.Executor.
  • SimpleThreadPoolTaskExecutor: This is a subclass of SimpleThreadPool of Quartz, which listens to Spring's life cycle callbacks.
  • ThreadPoolTaskExecutor: This exposes bean properties for configuring java.util.concurrent.ThreadPoolExecutor and wraps it in TaskExecutor.
  • TimerTaskExecutor: This implements a single TimerTask class as its backing implementation. It executes methods as synchronous in a separate thread.
  • WorkManagerTaskExecutor: This uses the CommonJ WorkManager interface as its backing implementation.

让我们看一个在 Spring 应用程序中使用 SimpleAsyncTaskExecutor 执行任务的简单示例。它为每个任务提交创建一个新线程并以异步方式运行。

这是配置文件:

@Configuration
public class AppConfig {
  @Bean
  AsyncTask myBean() {
    return new AsyncTask();
  }
  @Bean
  AsyncTaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor t = new SimpleAsyncTaskExecutor();
    return t;
  }
}

这是我们将 5 任务分配给 TaskExecutor 的 bean 类:

public class AsyncTask {
  @Autowired
  private AsyncTaskExecutor executor;
  public void runTasks() throws Exception {
    for (int i = 1; i <= 5; i++) {
      Runnable task = new Task(" " + i);
      executor.execute(task);
    }
  }
}

以下是从 main 方法执行任务的代码:

public class TaskExecutorExample {
  public static void main(String[] args) throws Exception {
    ApplicationContext context = new 
    AnnotationConfigApplicationContext(AppConfig.class);
    AsyncTask bean = context.getBean(AsyncTask.class);
    bean.runTasks();
  }
}

当我们编译并运行前面的类时,我们将得到以下输出。在这里,我们可以看到创建了五个线程,它们异步执行任务:

SimpleAsyncTaskExecutor-1, Execute Task = 1
SimpleAsyncTaskExecutor-4, Execute Task = 4
SimpleAsyncTaskExecutor-3, Execute Task = 3
SimpleAsyncTaskExecutor-2, Execute Task = 2
SimpleAsyncTaskExecutor-5, Execute Task = 5
SimpleAsyncTaskExecutor-2, End
SimpleAsyncTaskExecutor-1, End
SimpleAsyncTaskExecutor-4, End
SimpleAsyncTaskExecutor-3, End
SimpleAsyncTaskExecutor-5, End

TaskScheduler

有时,我们需要以固定的时间间隔执行一项任务,这可以通过 Spring 调度程序框架来实现。在本节中,我们将了解如何使用一些注释在 Spring 中安排任务。

让我们看一个在 Spring 应用程序中调度任务的简单示例:

@Configuration
@EnableScheduling
public class SpringSchedulingExample {
    private static final Logger LOGGER =                                                     
    Logger.getLogger(SpringSchedulingExample.class);
    @Scheduled(fixedDelay = 2000)
    public void scheduledTask() {
        LOGGER.info("Execute task " + new Date());
    }

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new 
        AnnotationConfigApplicationContext(
        SpringSchedulingExample.class);
        String scheduledAnnotationProcessor =         
        "org.springframework.context.annotation.
        internalScheduledAnnotationProcessor";
        LOGGER.info("ContainsBean : " + scheduledAnnotationProcessor + 
        ": " + context.containsBean(scheduledAnnotationProcessor));
        try {
            Thread.sleep(12000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            context.close();
        }
    }
} 

在 Spring 中,我们可以借助 @EnableScheduling 注解启用任务调度。一旦启用了任务调度,Spring 将自动注册一个内部 bean 后处理器,它将在 Spring 管理的 bean 上找到 @Scheduled 注释方法。

在前面的示例中,我们使用 @Scheduled 注释来注释 scheduledTask() 方法,并使用 fixedDelay 属性在每个 2 调用 秒。我们还可以使用其他属性,例如 fixedRatecron

@Scheduled(fixedRate = 2000)
@Scheduled(cron = "*/2 * * * * SAT,SUN,MON")

当我们编译并运行上一个类时,我们会得到如下输出:

Execute task Thu May 10 20:18:04 IST 2018
ContainsBean : org.springframework.context.annotation.internalScheduledAnnotationProcessor: true
Execute task Thu May 10 20:18:06 IST 2018
Execute task Thu May 10 20:18:08 IST 2018
Execute task Thu May 10 20:18:10 IST 2018
Execute task Thu May 10 20:18:12 IST 2018
Execute task Thu May 10 20:18:14 IST 2018

Spring Async

在本节中,我们将看到 Spring 中的异步执行支持。在某些情况下,我们需要异步执行一些任务,因为该任务的结果不需要用户,因此我们可以在单独的线程中处理该任务。异步编程的主要好处是我们可以提高应用程序的性能和响应能力。

Spring 通过 @EnableAsync@Async 为异步方法执行提供注解支持。让我们详细讨论它们。

The @EnableAsync annotation

我们可以通过简单地将 @EnableAsync 添加到配置类来启用异步处理,如下所示:

@Configuration
@EnableAsync
public class AppConfig {
  @Bean
  public AsyncTask asyncTask() {
    return new AsyncTask();
  }
}

在前面的代码中,我们没有将 TaskExecutor 作为 bean 提供,因此 Spring 将隐式使用默认的 SimpleAsyncTaskExecutor

The @Async annotation

启用异步处理后,使用 @Async 注释注释的方法将异步执行。

以下是 @Async 注释的简单示例:

public class AsyncTask {
  private static final Logger LOGGER = 
  Logger.getLogger(AsyncTask.class);
  @Async
  public void doAsyncTask() {
    try {
      LOGGER.info("Running Async task thread : " + 
      Thread.currentThread().getName());
    } catch (Exception e) {
    }
  }
}

我们也可以将@Async注解注解到带有返回类型的方法上,如下:

@Async
  public Future<String> doAsyncTaskWithReturnType() {
    try 
    {
      return new AsyncResult<String>("Running Async task thread : " + 
      Thread.currentThread().getName());
    } 
    catch (Exception e) { 
    }
    return null;
  }

在前面的代码中,我们使用了 AsyncResult 类,它实现了 Future。这可用于获取异步方法的执行结果。

以下是从 main 方法调用异步方法的代码:

public class asyncExample {
  private static final Logger LOGGER = 
  Logger.getLogger(asyncExample.class);
  public static void main(String[] args) throws InterruptedException {
    AnnotationConfigApplicationContext ctx = new 
    AnnotationConfigApplicationContext();
    ctx.register(AppConfig.class);
    ctx.refresh();
    AsyncTask task = ctx.getBean(AsyncTask.class);
    LOGGER.info("calling async method from thread : " + 
    Thread.currentThread().getName());
    task.doAsyncTask();
    LOGGER.info("Continue doing something else. ");
    Thread.sleep(1000);
  }
}

当我们编译并运行前面的类时,我们将得到以下输出:

calling async method from thread : main
Continue doing something else. 
Running Async Task thread : SimpleAsyncTaskExecutor-1

@Async with CompletableFuture

在上一节中,我们看到了如何使用 java.util.Future 来获取异步方法执行的结果。它提供了一个 isDone() 方法来检查计算是否完成,以及一个 get() 方法来在计算完成时检索计算结果。但是使用 Future API 有一定的限制:

  • Suppose we have written code to fetch the latest product price from an e-commerce system through a remote API. This task is time-consuming, so we need to run it asynchronously and use Future to get the result of that task. Now, the problem will occur when the remote API service is down. At that time, we need to complete Future manually by the last cached price of the product and that is not possible with Future.
  • Future only provides a get() method that notifies us when a result is available. We cannot attach a callback function to Future and have it get called automatically when the Future result is available.
  • Sometimes we have requirements, such as the result of the long-running task is needed to send another long-running task. We can't create such asynchronous workflow with Future.
  • We cannot run multiple Future in parallel.
  • The Future API does not have any exception handling.

由于这些限制,Java 8 引入了比 java.util.Future 更好的抽象,称为 CompletableFuture。我们可以简单地使用以下无参数构造函数来创建 CompletableFuture

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

以下是CompletableFuture提供的方法列表,帮助我们解决Future的限制:

  • The complete() method is used to complete the task manually.
  • The runAsync() method is used to run background tasks asynchronously that do not return anything. It takes a Runnable object and returns CompletableFuture<Void>.
  • The supplyAsync() method is used to run background tasks asynchronously and return a value. It takes Supplier<T> and returns CompletableFuture<T>, where T is the type of the value given by the supplier.
  • The thenApply(), thenAccept(), and thenRun() methods are used to attach a callback to CompletableFuture.
  • The thenCompose() method is used to combine two dependent CompletableFuture together.
  • The thenCombine() method is used to combine two independent CompletableFuture together.
  • The allOf() and anyOf() methods are used to combine multiple CompletableFuture together.
  • The exceptionally() method is used to get the generated error from Future. We can log the error and set a default value.
  • The handle() method is used to handle the exception.

Spring and threads – transactions

Spring Framework 为数据库事务管理提供了广泛的 API。 Spring 负责所有基本的事务管理控制,并为不同的事务 API 提供一致的编程模型,例如 JDBC、Hibernate、Java Transaction API (JTA)、 Java 持久性 API (JPA) 和 Java 数据对象 (JDO)。 Spring 提供了两种类型的事务:一种是声明式的,另一种是程序化的事务管理。声明式是非常高级的,而程序式则更高级但更灵活。

Spring 事务管理在单线程上工作得很好。但它不能跨多个线程管理事务。如果我们尝试将事务与多个线程一起使用,我们的程序会给出运行时错误或意外结果。

要理解为什么一个 Spring 事务在多线程时会失败,首先,我们需要了解事务如何与 Spring 一起工作。 Spring 将所有事务信息存储在 org.springframework.transaction.support.TransactionSynchronizationManager 类中的 ThreadLocal 变量中:

public abstract class TransactionSynchronizationManager {
  private static final Log logger =         
  LogFactory.getLog(TransactionSynchronizationManager.class);
  private static final ThreadLocal<Map<Object, Object>> resources = new  
  NamedThreadLocal("Transactional resources");
  private static final ThreadLocal<Set<TransactionSynchronization>> 
  synchronizations = new NamedThreadLocal("Transaction 
  synchronizations");
  private static final ThreadLocal<String> currentTransactionName = new 
  NamedThreadLocal("Current transaction name");
  private static final ThreadLocal<Boolean> currentTransactionReadOnly 
  = new NamedThreadLocal("Current transaction read-only status");
  private static final ThreadLocal<Integer> 
  currentTransactionIsolationLevel = new NamedThreadLocal("Current 
  transaction isolation level");
  private static final ThreadLocal<Boolean> actualTransactionActive = 
  new NamedThreadLocal("Actual transaction active");
}

线程的局部变量仅保存单个线程的特定事务的信息,并且不能被另一个线程访问。因此,正在进行的事务的信息不会传递给新创建的线程。结果将是指示事务丢失的错误。

现在我们能够理解具有多个线程的 Spring 事务的问题。 Spring 无法从新创建的线程维护到旧线程的事务状态。为了解决多线程的事务问题,我们需要手动将线程的局部变量值传递给新创建的线程。

Java threads best programming practices

使用多线程和并发编程的目的是提高性能,但我们需要始终记住,速度是在正确性之后。 Java 编程语言提供了从语言到 API 级别的大量同步和并发支持,但这取决于个人编写无错误 Java 并发代码的专业知识。以下是Java并发和多线程的最佳实践,帮助我们用Java编写更好的并发代码:

  • Use immutable classes: We should always prefer the immutable class in multithreading programming because immutable classes make sure that values are not changed in the middle of an operation without using synchronized blocks. For example, in an immutable class, such as java.lang.String, any modification on String, such as adding something or converting into uppercase, always creates another string object, keeping the original object unbroken.
  • Use local variables: Always try to use local variables instead of an instance or class-level variables because local variables are never shared between threads.
  • Use thread pool: Thread pool can reuse previously created threads and eliminate the time of thread creation, which improves the performance of the application.
  • Use the synchronization utility: Here, we can use the synchronization utility instead of the wait and notify methods. The java.util.concurrent package provides better synchronization utilities, such as CycicBariier, CountDownLatch, Sempahore, and BlockingQueue. It is very easy to wait for five threads using CountDownLatch to complete its task instead of implementing the same utility using the wait and notify methods. It is also easier to implement the producer-consumer design with the help of BlockingQueue rather than the wait and notify methods.
  • Use concurrent collections instead of synchronized collection: Concurrent collections are implemented with the new locking mechanism provided by the Lock interface and designed in such a way that we can take advantage of the native concurrency construct provided by the underlying hardware and JVM. Concurrent collections give more scalability and performance than their synchronized counterparts. ConcurrentHashMap provides better performance than synchronized HashMap or Hashtable classes if there are many updates and fewer reads concurrently.
  • Minimize locking scope: We should always try to reduce the locking scope as much as possible because locking block will not be executed concurrently and it impacts the application's performance. We should always first try to use atomic and volatile variables to achieve our synchronization requirement if our requirement is not satisfied with them, and then we need to use the functionality provided by the Lock interface. We can also reduce the locking scope to use a synchronized block instead of the synchronized method.
  • Use Java Executor framework: It provides an abstraction layer on the Java threading framework and provides better control in terms of creating and executing threads in a multithreaded environment.

Summary

在本章中,我们探索了 Java 线程并学习了如何在 java.util.concurrent 包的帮助下实现多线程和并发编程。我们还了解了如何在应用程序中使用线程池来提高性能。我们看到了 Spring 提供的任务执行和调度功能,还了解了 Spring 对 @Async 的支持,这可以提高我们应用程序的性能和响应能力。我们回顾了 Spring 事务管理在使用多线程时如何产生问题,还研究了多线程和并发编程的最佳编程实践。

在下一章中,我们将学习分析应用程序以了解应用程序的性能。它对于识别性能问题非常有用。我们还将了解日志记录,它是识别应用程序问题的重要工具。