Java线程池浅析

Java线程池

合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

一、简单介绍

最简单的新建一个线程的方式就是new Thread

1
2
3
4
5
6
7
new Thread(new Runnable() {
@Overrider
public void run(){
//do something
}
}
).start();

虽然简单,但是有很多缺点:

  1. 频繁的使用new Thread 来新建线程会造成新建对象性能差
  2. 新建的线程缺乏统一的管理,并且新建线程没有限制,互相竞争,可能占用系统资源导致oom
  3. 功能单一

既然不能new Thread不能满足多样的需求变化,那么Java提供的线程池就很好的解决了这些问题。Java提供四种线程池,下面一一介绍。

二、Java线程池

优点:

  1. 重用线程池中的线程,避免因频繁创建和销毁线程带来的性能的开销
  2. 能有效控制线程池的最大并发数,避免大量的线程之间因互相抢占系统资源而导致阻塞的现象
  3. 能够对线程进行简单的管理,并提供定时执行及指定间隔循环执行的功能

Java中的线程池的具体实现是在ThreadPoolExecutor,通过配置不同的参数来实现不同功能的线程池。

ThreadPoolExecutor构造函数有四个,源码实现最终都是调用了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造函数的各个参数将会影响到线程池的功能特性,

  • corePoolSize: 线程池中的核心线程池数量,除非设置了allowCoreThreadTimeOut为true,否则即使处于空闲状态也一直保留在线程池中。
  • maximumPoolSize:最大线程数,当活动的线程数达到最大,后续新的任务将会被阻塞
  • keepAliveTime:非核心线程空闲时的超时时间,空闲超过该时间将会被回收,当allowCoreThreadTimeOut为true也会作用于核心线程
  • unit:超时时间的单位
  • workQueue:线程池中的任务队列,通过线程池的execute方法提交的Runnable会存储在这个参数中在JDK中提供了如下阻塞队列:
    1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
    2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
    3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
    4、priorityBlockingQuene:具有优先级的无界阻塞队列;

  • threadFactory:线程工厂,为线程池提供创建新线程的功能

  • handler:RejectedExecutionHandler(饱和策略),当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
    1、AbortPolicy:直接抛出异常。
    2、CallerRunsPolicy:只用调用者所在线程来运行任务。
    3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    4、DiscardPolicy:不处理,丢弃掉。
    当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

三、线程池的分类

常见的线程池有四种:

  1. FixedThreadPool
  2. CachedThreadPool
  3. ScheduledThreadPool
  4. SingleThreadExecutor

1.FixedThreadPool

FixedThreadPool是通过Executors的newCachedThreadPool()方法来创建的。它是一种线程数量固定的线程池,并且全都是核心线程。当该线程池中所有的线程都被占用了,新任务都会处于等待状态,直到有空闲线程。FixedThreadPool只有核心线程,并且没有超时机制,任务队列无限大。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

2.CacheThreadPool

CacheThreadPool是通过Executors的newCacheThreadPool()来创建的。它是线程数量不定的线程池,最大数量可达到Integer.MAX_VALUE,当该线程池中的线程都被占用了,有新的任务将会新建线程来处理,该线程池中的线程有超时机制,超时时长为60s,任务队列与FixedThreadPool不同的是该任务队列相当于一个空集合,那么任务将会马上执行,在这种情况下,SynchronousQueue可以理解为无法插入任务。从这些特性看来,CacheThreadPool是比较适合执行大量的耗时较少的任务的。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

3.ScheduledThreadPool

ScheduledThreadPool是通过Executors的newScheduledThreadPool(int corePoolSize)来创建,核心线程是固定的,非核心线程是无限制的,超时时间默认10s,这类线程池主要用于执行定时任务和具有固定周期的重复任务。

1
2
3
4
5
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}

4.SingleThreadExecutor

SingleThreadExecutor是通过newSingleThreadExecutor()来创建的,该线程池中只有一个核心线程,没有超时时间,它确保所有的任务都在同一线程中按顺序执行。会统一所有外界的任务到同一个线程按序执行,这些任务不需考虑同步的问题。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

四、任务队列

1
BlockingQueue<Runnable> workQueue

即用于保存等待执行的任务的阻塞队列。

BlockingQueue是个接口,有如下实现类:

  1. ArrayBlockQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。创建其对象必须明确大小,像数组一样。最大的特点是可以防止资源被耗尽。

  2. LinkedBlockQueue:一个可改变大小的阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。创建其对象如果没有明确大小,默认值是Integer.MAX_VALUE。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

  3. PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。

  4. SynchronousQueue:同步队列。同步队列没有任何容量,每个插入必须等待另一个线程移除,反之亦然。一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue

五、饱和策略

RejectedExecutionHandler(饱和策略),当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

RejectedExecutionHandler提供了四种方式来处理任务拒绝策略:

  1. DiscardPolicy(直接丢弃)
  2. DiscardOldestPolicy(丢弃队列中最老的任务)
  3. ABortPolicy(抛异常)
  4. CallerRunsPolicy(将任务分给调用线程来执行)

六、基本使用

向线程池提交任务

1
2
3
4
5
6
7
8
Runnable cacheThread = new Runnable() {
@Override
public void run() {
System.out.println("Test CacheThreadPool");
}
};
ExecutorService cacheThreadPool = Executors.newCachedThreadPool();
cacheThreadPool.execute(cacheThread);

七、整体分析

整体流程大致如图:

threadpoolpic

了解了整体流程,那么再去看源码就会很清楚明了了。

Neil Liu wechat
个人微信,欢迎交流
让我感受下知识的力量~