并发编程补充知识之标准线程池
大家好,我是浮生君。夯实基础,长足进步,和浮生君一起复习不得不说的并发编程补充知识~
今天的内容是StandardThreadExecutorWithNameAndTime,借鉴了tomcat源码的线程池设计,对ThreadFactory、Queue、ThreadPoolExecutor进行了扩展,使得调用层尽量屏蔽底层的代码实现。
一、功能
1、定义线程池名称
2、指定阻塞队列长度
3、限制并发数量
4、参数使用逻辑
ThreadPoolExecutor的判断逻辑前面的内容提到过,如图
StandardThreadExecutorWithNameAndTime:如果线程数超过 corePoolSize,则会增加thread直到 maxThreads,然后才放入阻塞队列中。
二、源码
构造函数
public StandardThreadExecutorWithNameAndTime(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,int taskQueueCapacity, RejectedExecutionHandler handler, String name,Integer semaphoreNumber) {// 定义信号量,最大为corePoolSize,控制并发量if(semaphoreNumber != null){if(semaphoreNumber < corePoolSize) {semaphoreNumber = corePoolSize;}semaphore = new Semaphore(semaphoreNumber);}// 定义阻塞队列,底层使用LinkebBlockingQueueTaskQueue taskqueue = new TaskQueue(taskQueueCapacity==0?maxQueueSize:taskQueueCapacity);// 定义线程池的名称TaskThreadFactory tf = new TaskThreadFactory(StringUtils.isBlank(name)?namePrefix:name);executor = new ThreadExecutorWithNameAndTime(corePoolSize,maximumPoolSize, keepAliveTime, unit,taskqueue, tf,handler) {protected void afterExecute(Runnable r, Throwable t) {try {// todo 执行任务结束后相关操作,比如打印日志等}catch (Exception e){throw new RuntimeException("ThreadExecutorWithNameAndTime afterExecute error",e);}finally {//释放锁if(semaphore != null){semaphore.release();}}}};// 定义阻塞队列的parent为当前线程池,当前线程池关闭后,阻塞队列不再接收数据taskqueue.setParent( (ThreadPoolExecutor) executor);}
阻塞队列
// 阻塞队列 继承LinkedBlockingQueueclass TaskQueue extends LinkedBlockingQueue<Runnable> {/**阻塞队列默认长度设置为Integer.MAX_VALUE; */private static final long serialVersionUID = 8395648427240834739L;ThreadPoolExecutor parent = null;public TaskQueue() {super();}public TaskQueue(int initialCapacity) {super(initialCapacity);}public TaskQueue(Collection<? extends Runnable> c) {super(c);}public void setParent(ThreadPoolExecutor tp) {parent = tp;}public boolean force(Runnable o) {// 线程池关闭之后,拒绝数据再次存入if ( parent.isShutdown() ) {throw new RejectedExecutionException("Executor not running, can't force a command into the queue");}return super.offer(o); // forces the item onto the queue, to be used if the task is rejected}//这部分的实现逻辑是,当线程池的线程数小于最大线程数时,不往阻塞队列放数据public boolean offer(Runnable o) {// we can't do any checksif (parent==null) {return super.offer(o);}// 线程池的线程数已经等于最大线程数,往消息队列中存放数据int poolSize = parent.getPoolSize();// we are maxed out on threads, simply queue the objectif (poolSize == parent.getMaximumPoolSize()){return super.offer(o);}// we have idle threads, just add it to the queue// note that we don't use getActiveCount(), see BZ 49730AtomicInteger submittedTasksCount = StandardThreadExecutorWithNameAndTime.this.submittedTasksCount;if(submittedTasksCount!=null) {if (submittedTasksCount.get()<=poolSize) {return super.offer(o);}}// 线程池的线程数小于最大线程数,返回false// if we have less threads than maximum force creation of a new threadif (poolSize<parent.getMaximumPoolSize()){return false;}// if we reached here, we need to add it to the queuereturn super.offer(o);}}
线程工厂
class TaskThreadFactory implements ThreadFactory {final ThreadGroup group;final AtomicInteger threadNumber = new AtomicInteger(1);final String namePrefix;TaskThreadFactory(String namePrefix) {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();this.namePrefix = namePrefix;}public Thread newThread(Runnable r) {// 定义线程池中线程的名称Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());t.setDaemon(daemon);t.setPriority(getThreadPriority());return t;}}
三、实例
1、线程池结合CountDownLatch,等待所有数据处理完成
private void addData(List<String> data) throws InterruptedException {// 定义线程池final StandardThreadExecutorWithNameAndTime taskPool = new StandardThreadExecutorWithNameAndTime(2, 5, 60L,TimeUnit.SECONDS, 100000,new ThreadPoolExecutor.DiscardOldestPolicy(), "addData");// 定义倒数计数器CountDownLatch cdl = new CountDownLatch(data.size());for (int i = 0; i < data.size(); i++) {final int count = i;taskPool.submit(() -> {String print = data.get(count);try {// 子线程处理的业务逻辑System.out.println(print);} catch (Exception e) {System.out.println("error:" + e);} finally {cdl.countDown();}});}// 所有的数据处理完成后关闭线程池cdl.await();taskPool.shutdown();}
并发编程所有的内容就到这里啦~
“浮生若梦,为欢几何。” May the force be with you~
