释义

线程池框架

提供一个线程任务执行的容器,它主要管理、控制着线程资源,包括如果处理一系列提交的任务。

线程池工具类的底层实现:

Executors.newFixedThreadPool

Executors.newSingleThreadExecutor;

Executors.newCachedThreadPool;

当一个任务被提交时,会先检查当前线程池中线程数量是否超过corePoolSize,如果小于,则直接创建一个新线程(不管线程池中其他线程是否空闲),如果大于,则会检查workQueue是否满,如果未满,则丢到workQueue中,如果已满,则会新建线程,但线程池中总线程的数量不会超过maxiumPoolSize;

keepAliveTime

当线程池中的线程数量大于corePoolSize时,如果某个线程的idleTime超过keepAliveTime时,这个线程就会转变为终止态,这样就能通过终止个别线程,来保持线程池中线程数量在合理的范围内。allowCoreThreadTimeOut是否允许corePool内的线程超时,如果为true,则keepAliveTime对corePool中的线程也是生效的。

workQueue

currPoolSize < corePoolSize

  • 不使用queue,直接创建线程进行任务处理

currPoolSize > corePoolSize

  • pool中的线程从queue中取出任务,进行处理。
  • 如果queue未满,则将新提交的任务加入到queue中
  • 若queue已满,则新建线程直接处理任务,但总线程的数量不会超过maxiumPoolSize;

构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* corePoolSize:core工作线程数量
* maximumPoolSize:工作线程最大数量(上限)
* keepAliveTime:工作线程过期时间,从等待队列中获取任务的等待时间
* unit:keepAliveTime设置时的时间单位
* workQueue:等待队列
* threadFactory:线程工厂
* handler:拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

线程池状态信息(ctl)

ThreadPoolExecutor中,ctl中存储跟当前线程池状态和线程数量的信息,采用的是前3位表示状态,后29位表示数量。这个思想是cpu执行指令很类似(地址信息+数据信息)。

在了解线程池状态前,我们先看个特殊的值:

COUNT_BITS = Integer.SIZE - 3; 即29,我们前面所说的(3位状态+29位数量)。

CAPACTIY = 1 << COUNT_BITS -1 = 0001 1111 1111 1111 1111 1111 1111 1111

利用CAPACITY和ctl,就可以分别获取到状态和数量。

线程池的几种状态介绍:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-1 = 1111 1111 1111 1111 1111 1111 1111 1111
0 = 0000 0000 0000 0000 0000 0000 0000 0000
1 = 0000 0000 0000 0000 0000 0000 0000 0001
2 = 0000 0000 0000 0000 0000 0000 0000 0010
3 = 0000 0000 0000 0000 0000 0000 0000 0011

RUNNING = -1 << 29 = 111...
SHUTDOWN = 0 << 29 = 000...
STOP = 1 << 29 = 001...
TIDING = 2<< 29 = 010...
TERMINATED = 3 << 29 = 011...

各个状态说明:
RUNNING:线程池正常运行的状态,处理任务,并接受新的任务
SHUTDOWN:线程池即将关闭,正常处理正在进行的任务,workQueue中的任务也会陆续处理,但不接受新提交的任务
STOP:中断正在进行的任务,同时不会处理workQueue中的任务
TIDING:所有任务都已中止,并且workerCount也为0,即线程池已空,即将执行terminate()方法
TERMINATED:terminate()方法执行完成后的状态
  • 线程池信息获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//状态获取,传入的参数一般就是ctl.get();
//CAPACITY取反后,高3位为1,低29位就是0,这样就可以获取到状态
private static int runStateOf(int c) { return c & ~CAPACITY; }

//线程池中线程数量获取
//CAPACITY的高3位为0,低29位为1,取与,就可以获取到当前线程池中线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }

//线程池信息的生成,即将状态与线程数量信息组合,生成线程池信息。
//即状态数据与数量数据取或,就得到线程池信息
//该方法主要是后续在改变或设置线程池的信息时使用。
private static int ctlOf(int rs, int wc) { return rs | wc; }

//线程池状态c在何种状态s之上
//c一般是指线程池当前的状态,s一般为代码中写死的某种状态,流程逻辑中使用
private static boolean runStateLessThan(int c, int s) {
return c < s;
}

//线程池状态c在保种状态s之下(包含)
//c指当前状态,s为某个指定的已经的状态,流程逻辑中使用
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

//特殊判断逻辑,即当前线程是否是RUNNING状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
  • 线程池状态的变更
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//增加线程池中工作线程数量
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

//减少线程池中工作线程数量
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

//减少线程池中工作线程数量,直至更新成功
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

//更新线程池的信息,更新成目标状态,线程数量保持原来的数据
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}

我们都知道ctl是原子数,原子数都有其自增或自减的操作,为什么不直接用ctl.incrementAndGet()/ctl.decrementAndGet()直接进行线程数量增加或减少的操作呢。

这里要说明下:代码中的操作,采用CAS的方式进行,这样就会存在着false的可能,通过这种方式,可以起到一种锁的机制,用于控制具体的流程。后面我们会在代码中详细说明为何要这样写。

增加工作线程addWorker

方法如下:

1
2
3
4
5
6
7
//firstTask为null时,表示向线程池中增加工作线程,
//但该线程在start时,会从workQueue中取任务,即getTask()方法获取到的任务,
//不为null时,则该线程在start时,执行的就是该任务。
//core为true时,表示添加的线程数不会超过corePoolSize,为false表示添加的线程数不会超过maximumPoolSize;
private boolean addWorker(Runnable firstTask, boolean core){
//代码略。。。
}
  • 主体流程

image

  • 获取增加资格

流程如图:

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private boolean addWorker(Runnable firstTask, boolean core) {

//在增加工作线程时,

//首先要保证当前线程池所处的状态是允许增加线程,比如RUNNING或者SHUTDOWN的部分情况下,是允许的。

//SHUTDOWN状态下,正在执行或在等待队列中的任务是要逐个处理的,此时可能会存在着需要增加工作线程的情况,这种情况会在后续详细说明。

//其次,线程池中工作线程的数量不能超过限制,core为true时,不能超过corePoolSize,core为false时,不能超过maximumPoolSize。

//最后,由于addWorker有多线程竞争资源的问题,所以需要成功进行线程池中工作线程数量加1,方可进行后续的操作,这样保证的线程资源在控制范围内。

retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

//增加工作线程部分省略。。。
return workerStarted;
}
  • 增加工作线程

具体流程如下:

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* Worker内部类的构造方法
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
//firstTask即为提交的任务
this.firstTask = firstTask;
//getThreadFactory就是ThreadPoolExecutor的传入参数ThreadFactory
this.thread = getThreadFactory().newThread(this);
}
/**
* 在此说明下,new Worker的构造方法中,thread的值是由ThreadFactory的newThread方法获取的,
* 由于ThreadFactory具体实现可由用户自己定义,所以可能会存在着返回已经执行完的线程
* 示例如下:
* ThreadFactory returnsTerminatedThread = runnableIgnored -> {
* Thread thread = new Thread(() -> {});
* thread.start();
* try { thread.join(); }
* catch (InterruptedException ex) { throw new Error(ex); }
* return thread;
* };
* 它就会返回一个已经执行完的线程
* 这个也就对应了为什么在addWorker时,需要对待添加的工作线程的状态进行判断的原因。
*/

private boolean addWorker(Runnable firstTask, boolean core) {
//...获取资格部分省略

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建个worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//注:mainLock为ThreadPoolExecutor对象所拥有的,
//但在对workers进行操作时,存在着多线程同时操作的情况,
//所以需要加锁,保证workers信息的线程安全。同时也包含largestPoolSize数据的准确性。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//recheck状态,保证在正确的状态下操作。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//这里就是我们上面所说的,判断待添加工作线程的状态。
//注:此处是个bug,具体可见https://bugs.openjdk.java.net/browse/JDK-8221892
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
//更新线程池的统计信息
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//将上述获取到的资格还回,即将ctl中的数量减1,这里采用的类似decrementAndGet的操作,
//do{}while(decrementCompareAndSet(ctl.get()))
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

增加工作线程任务:

  1. 根据线程池的状态信息,发放加入资格
  2. 建立worker对象
  3. 根据线程池状态,将worker对象放入池中

它主要实现了对线程池中工作线程添加的控制。

工作线程启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
//Worker的run方法
/** Delegates main run loop to outer runWorker */
public void run() {
//调用的是其外部类ThreadPoolExecutor的runWorker方法。
runWorker(this);
}

//ThreadPoolExecutor
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//这个地方没明白什么用途,后续再看
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果该工作线程有任务,则进行后续操作
//如果该工作线程没有任务,则会从workQueue中取任务,
//注意,这里是个while循环,如果workQueue一直有任务,则它会不断取任务
//如果取不到任务,则会退出while循环,进入后续的工作线程退出流程
while (task != null || (task = getTask()) != null) {
//因为也会有其他线程操作worker,所以此处需要加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//此处可能会存在着多个线程操作资源,会存在着race condition,所以w.lock()是非常有必要的。
//interruptIdleWorkers
wt.interrupt();
try {
//beforeExecute,afterExecute可由子类来实现,比如记日志之类的动作,这是ThreadPoolExecutor
//预留的一些方法。
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务执行,注意,此处使用的run方法,而不是start方法
//这就相当于,多线程任务委托给了线程池中的线程来执行。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
//因while条件不满足,而导致的worker需要退出
//若while代码块内发生异常,则此参数将会为默认值(true),进入finnally块中执行。
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}


/**
* 该方法只在runWorker方法的代码块中调用
* completedAbruptly为true,表示异常导致线程退出
* 为false,表示在getTask返回为null的情况下,导致退出while循环
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//true -- 若为异常退出,则工作线程数应减少1
//false -- 若为while循环退出,即getTask返回的是null,
//我们看getTask代码,可以看到,当返回null时,之前一定已经操作过workerCount数量的更新。
//为什么不统一在这里操作,疑问????????
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
//因为涉及对公共资源的操作,需要获取公共资源锁。
mainLock.lock();
try {
//统计下当前线程池已完成的任务数
completedTaskCount += w.completedTasks;
//将工作线程从线程池中移出
workers.remove(w);
} finally {
mainLock.unlock();
}

//检测当前线程池是否可以terminated
tryTerminate();

int c = ctl.get();

if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//completedAbruptly为false情况下,就是说该线程正常退出,无任务可做
//allowCoreThreadTimeOut
//true表示core线程是允许超时销毁的
//false表示core线程是不需要超期销毁
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//若此时发现workQueue中又有任务了,则只需要core线程的最小值为1,当前线程池中工作线程数量只需大于此值即可
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//当前线程池中线程数量已经超过容许的最小值,则调用当前方法的线程就可以销毁
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//只要还是RUNNING或SHUTDOWN状态,再向线程池中重新添加一个工作线程。
addWorker(null, false);
}
}

runWorker是ThreadPoolExecutor的方法,该方法作用:

  1. 将用户提交的任务交由工作线程来执行

  2. 管理控制线程池中工作线程的数量

    • 重新生成一个工作线程,来回补当前生命周期结束的工作线程
    • 在当前工作线程数量已经超过corePoolSize时,不进行回补的方式,来逐渐减少工作线程

注:allowCoreThreadTimeOut,

当它的值为true时,表示线程池中core工作线程会等待keepAliveTime长的时间,如果还没有任务,则它会被销毁(即:不会重新生成工作线程),

当它的值为false时,表示线程池中的core工作线程只会发生替换,保持池内线程数量稳定。

但在这里,我们想一种场景,当线程池中只有core工作线程在工作,在core工作线程在退出时,会不会发生池内的线程不断替换,因为要保持线程数量的稳定,这样不就违反了线程池的规则了嘛。但实际情况是,这样的情况不会出现。这里我们就需要去分析下工作线程时在run时的逻辑,以上只是讲了runWorker的大概流程,这里我们看下其中一个细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 可看到getTask方法是在一个无限循环中,上面我们所说的情况,只有在getTask返回null时才会出现工作线程进入processWorkerExit块
* getTask返回null的几种场景
* 1. STOP(包含)以上的状态,或者SHUTDOWN状态下workQueue为空的情况
* 2. 当前工作线程数超过最大容许线程数
* 或者说工作线程允许销毁(设置了allowCoreThreadTimeOut或当前工作线程数超过corePoolSize),且上一次poll超时,且此时有工作线程或workQueue为空
* 以上两种情况,会返回null,其他情况,会在for循环中不断执行,直至获取到task
* 所以对于上面我们指出的一种场景,正常工作的线程池中,corePoolSize内的工作线程,在getTask时,不会出现下述返回null,
* 它只会在些阻塞,直至获取到task,这样就能保持corePoolSize下的工作线程数量的稳定了,防止频繁创建销毁了
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

支持的其他操作

  • 更改corePoolSize大小
1
2
3
4
5
6
7
8
9
public void setCorePoolSize(int corePoolSize);

//1.判定参数合法性
//2.设置corePoolSize
//3.如果corePoolSize变小,则释放一定量的core工作线程
//4.如果corePoolSize变大,在原有的规则下,增加工作线程数
// 原有的规则指:如果当前工作线程数量小于改变后的corePoolSize,
// 但若此时的woreQueue为空,则不需要增加线程数,
// 如果workeQueue不为空,则增加相应的线程数,但增加后的总数不会超过改变后的corePoolSize
  • 更改maxPoolSize大小
1
2
3
4
5
public void setMaximumPoolSize(int maximumPoolSize)
//1.判定参数合法性,不小于0,同时要大于corePoolSize
//2.设置maximumPoolSize的大小
//3.如果maximumPoolSize变小,
// 若此时工作线程总数大于改变后的 maximumPoolSize,则会去打断部分空闲线程
  • shutdown/shutdownNow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

 评论