为什么使用线程池?

  诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。请求以某种方式到达服务器,这种方式可能是通过网络协议(例如 HTTP、FTP 或 POP)、通过 JMS 队列或者可能通过轮询数据库。不管请求如何到达,服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的。

  构建服务器应用程序的一个简单模型是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务。实际上对于原型开发这种方法工作得很好,但如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显。每个请求对应一个线程(thread-per-request)方法的不足之一是:为每个请求创建一个新线程的开销很大;为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用户请求的时间和资源更多。

  除了创建和销毁线程的开销之外,活动的线程也消耗系统资源。在一个 JVM 里创建太多的线程可能会导致系统由于过度消耗内存而用完内存或“切换过度”。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目。

  线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。

四种线程池

Java通过Executors提供四种线程池,分别为:

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一定时的定长线程池,支持定时及周期性任务执行.
  • newSingleThreadExecutor 创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

newCachedThreadPool

 创建一个可缓存线程池如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(index);
}
});
}

  线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

  1. 该线程池的核心线程数量是0,线程的数量最高可以达到Integer 类型最大值

  2. 创建ThreadPoolExecutor实例时传过去的参数是一个SynchronousQueue实例,说明在创建任务时,若存在空闲线程就复用它,没有的话再新建线程

  3. 线程处于闲置状态超过60s的话,就会被销毁

newFixedThreadPool

  创建一个定长线程池可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}

  因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。

  定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache。

  1. 线程池的最大线程数等于核心线程数,并且线程池的线程不会因为闲置超时被销毁。

  2. 使用的列队是LinkedBlockingQueue表示如果当前线程数小于核心线程数,那么即使有空闲线程也不会复用线程去执行任务,而是创建新的线程去执行任务。如果当前执行任务数量大于核心线程数,此时再提交任务就在队列中等待,直到有可用线程。

newScheduledThreadPool

  创建一个延迟/周期执行线程池支持延迟及周期性任务执行

  延迟执行示例代码如下:

1
2
3
4
5
6
7
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);

  表示延迟3秒执行。

  定期执行示例代码如下:

1
2
3
4
5
6
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);

  表示延迟1秒后每3秒执行一次。ScheduledExecutorService比Timer更安全,功能更强大

  1. 该线程池可以设置核心线程数量,最大线程数与newCachedThreadPool一样,都是Integer.MAX_VALUE。

  2. 该线程池采用的队列是DelayedWorkQueue,具有延迟和定时的作用。

newSingleThreadExecutor

  创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}

  结果依次输出,相当于顺序执行各个任务。

  现行大多数GUI程序都是单线程的。Android中单线程可用于数据库操作,文件操作,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操作。

使用线程池的好处

  1. 减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
  2. 运用线程池能有效的控制线程最大并发数,可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
  3. 对线程进行一些简单的管理,比如:延时执行、定时循环执行的策略等,运用线程池都能进行很好的实现

线程池的主要组件

一个线程池包括以下四个基本组成部分:

  1. 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
  2. 工作线程(WorkThread):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
  3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  4. 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

ThreadPoolExecutor类

  讲到线程池,要重点介绍java.uitl.concurrent.ThreadPoolExecutor类,ThreadPoolExecutor线程池中最核心的一个类,ThreadPoolExecutor在JDK中线程池常用类UML类关系图如下:

我们可以通过ThreadPoolExecutor来创建一个线程池

1
2
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, 
milliseconds,runnableTaskQueue, threadFactory,handler);

创建一个线程池需要输入的几个参数

  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  • runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。
  • ThreadFactory用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
  • RejectedExecutionHandler(拒绝策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。n AbortPolicy:直接抛出异常。
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

对于新的任务,如果此时线程池里没有空闲线程,线程池会毫不犹豫的创建一条新的线程去处理这个任务。因为corePoolSize是0,当前线程数肯定大于等于corePoolSize,而workQueue是SynchronousQueue,前面说了,SynchronousQueue是不存放东西的,它只移交,所以你可以认为它的队列一直是满的,最后,maxPoolSize是无穷大,再继续创建也不会达到最大线程数,所以线程池会创建一条新的线程去处理这个任务;
keepAliveTime是60s,你可以认为这就是线程的失效时间。新创建的线程如果60s内都没有任务要执行(缓存没有命中),那么就会被销毁,而如果在这60s内,线程分配到任务了(缓存命中),那么就可以直接拿这条创建好的线程过去用;
corePoolSize设置成0还有一个好处,那就是当有一大段时间,线程池都没有接收到新的任务时,线程池里的线程会逐渐被销毁,直到线程池中线程数量降为0,这样整个线程池也就不会占用什么资源了,这个特性,使得CachedThreadPool特别适合处理具有周期性的,并且执行时间短(short-lived)的任务,比如晚上十二点时,会有一波业务过来处理,其他时间段,业务很少甚至没有,这种情况就很适合使用CachedThreadPool

向线程池提交任务

我们可以通过execute()submit()两个方法向线程池提交任务,不过它们有所不同

  • execute()方法没有返回值,所以无法判断任务知否被线程池执行成功
1
2
3
4
5
6
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
  • submit()方法返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
Future<String> taskret = executorService.submit(new CallTask());
//主线程可以继续执行自己的业务逻辑
try {
//在需要返回值时,调用get方法获取(可能阻塞),如果该任务已经执行完成 主线程不会阻塞。
String ob= taskret.get();
System.out.println(ob);
} catch (Exception e) {
e.printStackTrace();
}
}
}

class CallTask implements Callable<String>{
@Override
public String call() throws Exception {
return "call任务执行";
}
}

线程池的关闭

我们可以通过shutdown()shutdownNow()方法来关闭线程池,不过它们也有所不同

  • shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
  • shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

ThreadPoolExecutor执行的策略

  1. 线程数量未达到corePoolSize,则新建一个线程(核心线程)执行任务
  2. 线程数量达到了corePools,则将任务移入队列等待
  3. 队列已满,新建线程(非核心线程)执行任务
  4. 队列已满,总线程数又达到了maximumPoolSize,就会由(RejectedExecutionHandler)抛出异常

新建线程 -> 达到核心数 -> 加入队列 -> 新建线程(非核心) -> 达到最大数 -> 触发拒绝策略

四种拒绝策略

  1. AbortPolicy(中止):不执行新任务,直接抛出异常,提示线程池已满,线程池默认策略
1
2
3
4
5
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
  1. DiscardPolicy(抛弃):不执行新任务,也不抛出异常,基本上为静默模式。
1
2
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
  1. DisCardOldSetPolicy(抛弃最旧的):将消息队列中的第一个任务替换为当前新进来的任务执行
1
2
3
4
5
6
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
  1. CallerRunPolicy(调用者运行):用于被拒绝任务的处理程序,将任务分给调用线程来执行;如果执行程序已关闭,则会丢弃该任务。
1
2
3
4
5
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static class Worker implements Runnable {
public void run() {
System.out.println(Thread.currentThread().getName() + " is running");
}
}

public static void main(String[] args) {

int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 5;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10);

 //拒绝策略1:将抛出 RejectedExecutionException.

RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 
keepAliveTime, TimeUnit.SECONDS, queue, handler);

for(int i=0; i<100; i++) {
executor.execute(new Worker());
}
executor.shutdown();

}

为什么不建议使用Executors创建线程池?

JDK为我们提供了Executors线程池工具类,里面有默认的线程池创建策略,大概有以下几种:

  1. FixedThreadPool:线程池线程数量固定,即corePoolSize和maximumPoolSize数量一样。
  2. SingleThreadPool:单个线程的线程池。
  3. CachedThreadPool:初始核心线程数量为0,最大线程数量为Integer.MAX_VALUE,线程空闲时存活时间为60秒,并且它的阻塞队列为SynchronousQueue,它的初始长度为0,这会导致任务每次进来都会创建线程来执行,在线程空闲时,存活时间到了又会释放线程资源。
  4. ScheduledThreadPool:创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。

  用Executors工具类虽然很方便,我依然不推荐大家使用以上默认的线程池创建策略,阿里巴巴开发手册也是强制不允许使用Executors来创建线程池,我们从JDK源码中寻找一波答案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java.util.concurrent.Executors:
// FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

// SingleThreadPool
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

// CachedThreadPool
public static ExecutorService newCachedThreadPool() {
// 允许创建线程数为Integer.MAX_VALUE
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

// ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
// 允许创建线程数为Integer.MAX_VALUE
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

public LinkedBlockingQueue() {
// 允许队列长度最大为Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}

  从JDK源码可看出,Executors工具类无非是把一些特定参数进行了封装,并提供一些方法供我们调用而已,我们并不能灵活地填写参数,策略过于简单,不够友好

  CachedThreadPool和ScheduledThreadPool最大线程数为Integer.MAX_VALUE,如果线程无限地创建,会造成内存溢出(OOM)异常。

  LinkedBlockingQueue基于链表的FIFO队列,是无界的,默认大小是Integer.MAX_VALUE,因此FixedThreadPool和SingleThreadPool的阻塞队列长度为Integer.MAX_VALUE,如果此时队列被无限地堆积任务,会造成OOM异常。

联系我

评论