vlambda博客
学习文章列表

使用线程池时一定要注意的五个点

一、 使用线程池在流量突发期间能够平滑地服务降级

很多场景下应用程序必须能够处理一系列传入请求,简单的处理方式是通过一个线程顺序的处理这些请求,如下图:

单线程策略的优势和劣势都非常明显:

优势:设计和实现简单;劣势:这种方式会带来处理效率的问题,单线程的处理能力是有限,不能发挥多核处理器优势。

在这种场景下我们就需要考虑并发,一个简单的并发策略就是Thread-Per-Message模式,即为每个请求使用一个新的线程。Thread-Per-Message策略的优势和劣势也非常明显:

优势:设计和实现比较简单,能够同时处理多个请求,提升响应效率;

劣势:主要在两个方面

1.资源消耗 引入了在串行执行中所没有的开销,包括线程创建和调度,任务处理,资源分配和回收以及频繁上下文切换所需的时间和资源。2.安全

  • 攻击者可以通过一次进行大量请求使系统瘫痪并且拒绝服务 (DoS),从而导致系统立即不响应而不是平滑地退出。
  • 从安全角度来看,一个组件可能由于连续的错误而耗尽所有资源,因此使所有其他组件无法获得资源。

有没有一种方式可以并发执行又可以克服Thread-Per-Message的问题?

采用线程池的策略,线程池通过控制并发执行的工作线程的最大数量来解决Thread-Per-Message带来的问题。可见下图,请求来临时先放入线程池的队列

线程池可以接受一个RunnableCallable<T>任务,并将其存储在临时队列中,当有空闲线程时可以从队列中拿到一个任务并执行。

反例(使用 Thread-Per-Message 策略)

class Helper {
    public void handle(Socket socket) {
        // do something
    }
}

final class RequestHandler {
    private final Helper helper = new Helper();

    //......
    private RequestHandler(int port) throws IOException {
        //do something
    }

    public void handleRequest() {
        new Thread(new Runnable() {
            public void run() {
                try {
                    helper.handle(server.accept());
                } catch (IOException e) {
                    // Forward to handler
                }
            }
        }).start();
    }
}

正例(使用 线程池 策略)

class Helper {
    public void handle(Socket socket) {
        // do something
    }
}

final class RequestHandler {
    private final Helper helper = new Helper();
    private final ServerSocket server;
    private final ExecutorService exec;

    private RequestHandler(int port, int poolSize) throws IOException {
        server = new ServerSocket(port);
        exec = Executors.newFixedThreadPool(poolSize);
    }

    public static RequestHandler newInstance(int poolSize) throws IOException {
        return new RequestHandler(0, poolSize);
    }

    public void handleRequest() {
        Future<?> future = exec.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    helper.handle(server.accept());
                } catch (IOException e) {
                    // Forward to handler
                }
            }
        });
    }
    // ... Other methods such as shutting down the thread pool
    // and task cancellation ...
}

JAVA 中(JDK 1.5+)线程池的种类:

  • newFixedThreadPool()

  • newCachedThreadPool()

  • newSingleThreadExecutor()

  • newScheduledThreadPool()

    线程池的详细使用方法可参见Java API文档

二、不要在有界线程池中执行相互依赖的任务

程序不能使用来自有界线程池的线程来执行依赖于线程池中其他任务的任务。

有两个场景:

  1. 当线程池中正在执行的线程阻塞在依赖于线程池中其他任务的完成上,这样就会出现称为线程饥饿(threadstarvation)死锁的死锁形式。
  2. 线程饥饿死锁还会发生在当前执行的任务向线程池提交其他任务并等待这些任务完成的时候,然而此时线程池缺乏一次容纳所有任务的能力。

要缓解上面两个场景产生的问题有两个简单的办法:

  1. 扩大线程池中的线程数,以容纳更多的任务,但 决定一个线程池合适的大小可能是困难的甚至不可能的。
  2. 线程池中的队列改为无界,由于系统资源有限,无界队列只能说是尽可能容纳任务 但饥饿死锁的现象无法消除。

真正解决此类方法还是需要梳理线程池执行业务流程,不要在有界线程池中执行相互依赖的任务,防止出现竞争和死锁。

三、确保提交到线程池的任务可中断

向线程池提交的任务需要支持中断。从而保证线程可以中断,线程池可以关闭。线程池支持 java.util.concurrent.ExecutorService.shutdownNow() 方法,该方法尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务的列表。

但是 shutdownNow() 除了尽力尝试停止处理主动执行的任务之外不能保证一定能够停止。例如,典型的实现是通过Thread.interrupt()来停止,因此任何未能响应中断的任务可能永远不会终止,也就造成线程池无法真正的关闭。

反例:

public final class Worker implements Runnable // Thread‐safe class
    private AtomBoolean flag = new AtomBoolean(true);

    public Worker() throws IOException {
        //do something
    }

    // Only one thread can use the socket at a particular time
    @Override
    public void run() {
        try {
            while (flag.get()) {
                // do something
            }
        } catch (IOException ie) {
            // Forward to handler
        }
    }

    public void shutdown() {
        this.flag.set(false);
    }
}

正例:

public final class Worker implements Runnable // Thread‐safe class
    public Worker() throws IOException {
        //do something
    }

    // Only one thread can use the socket at a particular time
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // do something
            }
        } catch (IOException ie) {
            // Forward to handler
        }
    }
}

四、确保在线程池中执行的任务不能悄无声息地失败

线程池中的所有任务必须提供机制,如果它们异常终止,则需要通知应用程序.

如果不这样做不会导致资源泄漏,但由于池中的线程仍然被会重复使用,使故障诊断非常困难或不可能。

在应用程序级别处理异常的最好方法是使用异常处理。异常处理可以执行诊断操作,清理和关闭Java虚拟机,或者只是记录故障的详细信息。

也就是说在线程池里执行的任务也需要能够抛出异常并被捕获处理。

任务恢复或清除操作可以通过重写 java.util.concurrent.ThreadPoolExecutor 类的 afterExecute() 钩子来执行。

当任务通过执行其 run() 方法中的所有语句并且成功结束任务,或者由于异常而导致任务停止时,将调用此钩子。

可以通过自定义 ThreadPoolExecutor 服务来重载 afterExecute() 钩子。

还可以通过重载 terminated() 方法来释放线程池获取的资源,就像一个finally块。

反例:

final class PoolService {
    private final ExecutorService pool = Executors.newFixedThreadPool(10);

    public void doSomething() {
        pool.execute(new Task());
    }
}

final class Task implements Runnable {
    @Override
    public void run() {
        // do something
        throw new NullPointerException();
    }
}

任务意外终止时作为一个运行时异常,无法通知应用程序。此外,它缺乏恢复机制。因此,如果Task抛出一个NullPointerException ,异常将被忽略。

正例:

class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    // ... Constructor ...
    public CustomThreadPoolExecutor(
        int corePoolSize, int maximumPoolSize, long keepAliveTime,
                TimeUnit unit, BlockingQueue<Runnable> workQueue)
 
{
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
                }
    @Override
    public void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null) {
            // Exception occurred, forward to handler
        }
            // ... Perform task‐specific cleanup actions
        }
    @Override
    public void terminated() {
        super.terminated();
        // ... Perform final clean‐up actions
    }
}

另外一种方式是使用 ExecutorService.submit() 方法(代替 execute() 方法)将任务提交到线程池并获取 Future 对象。

当通过 ExecutorService.submit() 提交任务时,抛出的异常并未到达未捕获的异常处理机制,因为抛出的异常被认为是返回状态的一部分,因此被包装在ExecutionException ,并由Future.get() 返回。

Future<?> future = threadPool.submit(new Task());
try {
    future.get();
catch (InterruptedException e) {
    Thread.currentThread().interrupt();  // Reset interrupted status
catch (ExecutionException e) {
    Throwable exception = e.getCause();
    // Forward to exception reporter
}

五、确保在使用线程池时重新初始化ThreadLocal变量

java.lang.ThreadLocal 类提供线程内的本地变量。根据Java API

这些变量与其它正常变量不同,每个线程访问(通过其get或set方法)都有其属于各自线程的,独立初始化的变量拷贝。ThreadLocal实例通常是一些希望将状态与线程(例如,用户ID或事务ID)相关联的类中的私有静态字段。

ThreadLocal对象需要关注那些对象被线程池中的多个线程执行的类。

线程池缓存技术允许线程重用以减少线程创建开销,或者当创建无限数量的线程时可以降低系统的可靠性。

当 ThreadLocal 对象在一个线程中被修改,随后变得可重用时,在重用的线程上执行的下一个任务将能看到该线程上执行过的上一个任务修改的ThreadLocal 对象的状态。

所以要在使用线程池时重新初始化的ThreadLocal对象实例。

反例:

public enum Day {
    MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY;
}

public final class Diary {
    private static final ThreadLocal<Day> days = new ThreadLocal<Day>() {
        // Initialize to Monday
        protected Day initialValue() {
            return Day.MONDAY;
        }
    };
    private static Day currentDay() {
        return days.get();
    }
    public static void setDay(Day newDay) {
        days.set(newDay);
    }
    // Performs some thread‐specific task
    public void threadSpecificTask() {
        // Do task ...
    }
}

public final class DiaryPool {
    final int numOfThreads = 2// Maximum number of threads allowed in pool
    final Executor exec;
    final Diary diary;

    DiaryPool() {
        exec = (Executor) Executors.newFixedThreadPool(numOfThreads);
        diary = new Diary();
    }

    public void doSomething1() {
        exec.execute(new Runnable() {
            @Override
            public void run() {
                diary.setDay(Day.FRIDAY);
                diary.threadSpecificTask();
            }
        });
    }

    public void doSomething2() {
        exec.execute(new Runnable() {
            @Override
            public void run() {
                diary.threadSpecificTask();
            }
        });
    }

    public static void main(String[] args) {
        DiaryPool dp = new DiaryPool();
        dp.doSomething1(); // Thread 1, requires current day as Friday
        dp.doSomething2(); // Thread 2, requires current day as Monday
        dp.doSomething2(); // Thread 3, requires current day as Monday
    }
}

DiaryPool类创建了一个线程池,它可以通过一个共享的无界的队列来重用固定数量的线程。

在任何时候,不超过numOfThreads个线程正在处理任务。如果在所有线程都处于活动状态时提交其他任务,则 它们在队列中等待,直到线程可用。

当线程循环时,线程的线程局部状态仍然存在。

下表显示了可能的执行顺序:

时间 任务 线程池 提交方法 日期
1 t1 1 doSomething1() 星期五
2 t2 2 doSomething2() 星期一
3 t3 1 doSomething3() 星期五

在这个执行顺序中,期望从doSomething2() 开始的两个任务( t 2和t 3 doSomething2() 将当天视为星 期一。然而,因为池线程1被重用,所以t 3观察到星期五。

解决方案(try-finally条款)

符合规则的方案removeDay() 方法添加到Diary类,并在try‐finally 块中的实现doSomething1() 类的doSomething1() 方法的语句。finally 块通过删除当前线程中的值来恢复threadlocal类型的days对象的初始状态。

public final class Diary {
    // ...
    public static void removeDay() {
        days.remove();
    }
}

public final class DiaryPool {
    // ...
    public void doSomething1() {
        exec.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Diary.setDay(Day.FRIDAY);
                    diary.threadSpecificTask();
                } finally {
                    Diary.removeDay(); // Diary.setDay(Day.MONDAY)
                    // can also be used
                }
            }
        });
    }
// ...
}

如果threadlocal变量再次被同一个线程读取,它将使用initialValue()方法重新初始化 ,除非任务已经明确设置了变量的值。这个解决方案将维护的责任转移到客户端( DiaryPool ),但是当Diary类不能被修改时是一个好的选择。

解决方案(beforeExecute())

使用一个自定义ThreadPoolExecutor 来扩展 ThreadPoolExecutor 并覆盖beforeExecute() 方法。beforeExecute() 方法在 Runnable 任务在指定线程中执行之前被调用。该方法在线程 “t” 执行任务 “r” 之前重新初始化 threadlocal 变量。

class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                    long keepAliveTime, TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue)
 
{
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void beforeExecute(Thread t, Runnable r) {
        if (t == null || r == null) {
            throw new NullPointerException();
        }
        Diary.setDay(Day.MONDAY);
        super.beforeExecute(t, r);
    }
}

public final class DiaryPool {
    // ...
    DiaryPool() {
        exec = new CustomThreadPoolExecutor(NumOfthreads, NumOfthreads,
                10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
        diary = new Diary();
    }
    // ...
}