介绍 线程 和 线程池 在 Java 中的基本概念、创建方式、核心参数和底层源码
chou403
/ Thread
/ c:
/ u:
/ 27 min read
Java 中线程的状态
5 种状态一般是针对传统的线程状态来说(操作系统层面)
Java 中给线程准备了 6 种状态
NEW: Thread 对象被创建出来,但是还没有执行 start 方法
RUNNABLE: Thread 对象调用了 start 方法,就为 RUNNABLE 状态(CPU 调度/没有调度)
BLOCKED: synchronized 没有拿到同步锁,被阻塞的情况
WAITING: 调用 wait 方法就会处于 WAITING 状态,需要被手动唤醒
TIMED_WAITING: 调用 sleep 方法或者 join 方法,会被自动唤醒,无需手动唤醒
TERMINATED: run 方法执行完毕,线程生命周期到头了
1)新建状态(NEW): 当我们创建一个新的Thread对象时,该线程就处于新建状态,例如: Thread t = new Thread();
2)可运行状态(RUNNABLE): 当线程对象调用start()方法后,线程进入可运行状态。在这个状态下,线程已经做好了准备,随时等待CPU调度执行,这个状态包括了”就绪”和”运行”状态。
3)阻塞状态(BLOCKED): 线程在等待获取一个锁以进入或重新进入同步代码块时,它会进入阻塞状态。只有当该锁被释放并且线程被调度去获取这个锁,线程才能转换到RUNNABLE状态。
4)等待状态(WAITING): 线程进入等待状态,是因为它调用了其它线程的join方法,或者调用了无参数的wait方法。在这种情况下,线程会等待另一个线程的操作完成或者等待notify/notifyAll消息。
5)定时等待状态(TIMED_WAITING): 线程进入定时等待状态,是因为它调用了sleep或者带有指定时间的wait或join方法。在指定的时间过去之后,线程会自动返回RUNNABLE状态。如果它是由于调用wait或join方法进入的定时等待状态,还需要等待notify/notifyAll消息或者等待join的线程终止。
6)终止状态(TERMINATED): 线程任务执行完毕或者由于异常而结束,线程就会进入终止状态。在这个状态下,线程的生命周期实际上已经结束了,它不能再转换到其他任何状态。
BLOCKED,WAITING,TIMED_WAITING: 都可以理解为阻塞,等待状态,因为处在这三种状态下,CPU 不会调度当前线程
NEW:
Thread thread = new Thread(()->{
});
System.out.println(thread.getState());
RUNNABLE:
Thread thread = new Thread(()->{
while (true) {
}
});
thread.start();
System.out.println(thread.getState());
BLOCKED:
Object object = new Object();
Thread thread = new Thread(()->{
synchronized (object) {
}
});
synchronized (object) {
thread.start();
Thread.sleep(500);
System.out.println(thread.getState());
}
WAITING:
Object object = new Object();
Thread thread = new Thread(() -> {
synchronized (object) {
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
Thread.sleep(500);
System.out.println(thread.getState());
TIMED_WAITING:
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
Thread.sleep(500);
System.out.println(thread.getState());
TERMINATED:
Thread thread = new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
Thread.sleep(1000);
System.out.println(thread.getState());
ThreadPool
JDK 提供的线程池
newFixedThreadPool
这个线程的特点是线程数是固定的。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
构建时,需要给 newFixedThreadPool 方法提供一个 nThreads 的属性,而这个属性就是当前线程池线程的个数,当前线程池的本质其实就是使用 ThreadPoolExecutor。
构建好当前线程池后,线程个数已经固定好(线程是懒加载,在构建之初,线程并没有构建出来,而是随着任务的提交才会将线程在线程池中构建出来)。如果线程没有构建,线程会待着任务执行被创建和执行。如果线程都已经构建好了,此时任务会被放到 LinkedBlockingQueue 无界队列中存放,等待线程从 LinkedBlockingQueue 中去 take 出去,然后执行。
newSingleThreadExecutor
单例线程池,该线程池只有一个工作线程在处理任务,如果业务是顺序消费们可以采用该线程池。
这种线程池比较简单,它内部只有一个线程,会用唯一的工作线程来执行任务。它的原理和固定线程数量的线程池的原理是一样的,只不过这个时候它的线程数量就直接被设置为 1,也就是只有一个线程。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
private static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
@SuppressWarnings("deprecation")
protected void finalize() {
super.shutdown();
}
}
单例线程池,线程池中只有一个工作线程在处理任务。
如果业务中涉及了顺序消费,可以采用 newSingleThreadExecutor。
newCachedThreadPool
缓存线程池,corePoolSize为0,当第一次提交任务到线程池时,会直接构建一个工作线程,Integer.MAX_VALUE: 意味着线程数量可以无限大,keepAliveTime为60S,60秒内没有任务进来,意味着线程空闲时间超过60S就会被杀死;如果在等待60秒期间有任务进来,他会再次拿到这个任务去执行,特点是: 任务只要提交到该线程池就必然有工作线程处理。
它是可缓存的线程池,并且还会回收。它会把任务交给我们的线程,而且线程不够用的话,就会创建线程。如果线程过多,就会把这些线程给回收回来。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newScheduledThreadPool
定时线程池,创建一个定时线程池,即按一定的周期执行任务,即定时任务,或者设置定时时间,延迟执行任务,由于该线程池是继承ThreadPoolExecutor,所以本质上还是ThreadPoolExecutor线程池,只不过是在原来的基础上添加了定时功能,其原理是基于DelayQueue实现延迟执行,周期性执行,任务执行完毕,再次扔会到阻塞队列。
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
newWorkStealingPool
newWorkStealingPool简单翻译是任务窃取线程池。和别的4种不同,它用的是ForkJoinPool。使用ForkJoinPool的好处是,把1个任务拆分成多个”小任务”,把这些”小任务”分发到多个线程上执行。这些”小任务”都执行完成后,再将结果合并。之前的线程池中,多个线程共有一个阻塞队列,而newWorkStealingPool 中每一个线程都有一个自己的队列。当线程发现自己的队列没有任务了,就会到别的线程的队列里获取任务执行。可以简单理解为”窃取”。一般是自己的本地队列采取LIFO(后进先出),窃取时采用FIFO(先进先出),一个从头开始执行,一个从尾部开始执行,由于偷取的动作十分快速,会大量降低这种冲突,也是一种优化方式。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
核心参数
为什么要自定义线程池, 首先 ThreadPoolExecutor 中,一种提供了 7 个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。
但是如果直接采用 JDK 提供的方式去构建,可以设置的参数最多两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐去自定义线程池。手动的去 new ThreadPoolExecutor 设置它的一些核心属性。
自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。
自定义线程池七大参数
-
int corePoolSize
核心线程数,创建线程池后不会立即创建核心线程,当有任务到达时才触发核心线程的创建,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。核心线程在allowCoreThreadTimeout被设置为true时会超时并被回收,默认情况下不会被回收。
-
int maximumPoolSize
最大线程数,代表当前线程池中,一共可以有多少个工作线程。当线程数大于或等于corePoolSize,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会按照rejectedExecutionHandler配置的处理策略进行处理线程。
-
long keepAliveTime
非核心工作线程在阻塞队列位置等待的时间,当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
-
TimeUnit unit
非核心工作线程在阻塞队列位置等待时间的单位。
-
BlockingQueue<Runnable> workQueue
任务没有在核心工作线程处理时,任务先扔到阻塞队列中。阻塞队列,用来存储等待执行的任务,新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。这里的阻塞队列有以下几种选择:
- ArrayBlockingQueue: 基于数组的有界阻塞队列,按FIFO排序
- LinkedBlockingQueue: 基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序
- PriorityBlockingQueue: 具有优先级的无界阻塞队列,优先级通过参数Comparator实现
- SynchronousQueue: 一个不缓存任务的阻塞队列,也就是说新任务进来时,不会缓存,而是直接被调度执行该任务
-
ThreadFactory threadFactory
线程工厂,创建一个新线程时使用的工厂,可以用来设定线程名,是否为daemon线程等等。在构建线程的线程工作,可以设置thread 的一些信息。
-
RejectedExecutionHandler handler
当前线程池无法处理投递过来的任务时,执行当前的拒绝策略 。拒绝策略有以下:
- AbortPolicy: 当前拒绝策略在无法处理任务时,会抛出一个异常
- CallerRunPolicy: 当前拒绝策略在线程池无法处理任务时,会将任务交给调用者处理
- DiscardPolicy: 当前拒绝策略在无法处理任务时,会直接将任务丢掉
- DiscardOldestPolicy: 当前拒绝策略在无法处理任务时,将队列中最早的任务丢掉,将当前再次尝试交给线程池处理
- 自定义Policy: 根据自己的任务,将任务扔到数据库,也可以做其他操作。
线程池的状态
线程池的5种状态: RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
// 是个int类型的数值 表达了两个意思 1: 声明当前线程池的状态 2: 声明线程池中的线程数
// 高3位是线程池状态 低29位是线程池中线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29 方便后面的位运算
private static final int COUNT_BITS = Integer.SIZE - 3;
// 通过位运算得出最大容量
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS; // 111 代表线程池为RUNNING,代表正常接收任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 代表线程池为SHUTDOWN,不接收新任务,但是内部还会处理阻塞队列中的任务,正在进行的任务也会正常处理
private static final int STOP = 1 << COUNT_BITS; // 001 代表线程池为STOP,不接收新任务,也不会处理阻塞队列中的任务,同时会中断正在进行的任务
private static final int TIDYING = 2 << COUNT_BITS; // 010 代表线程池为TIDYING,过渡的状态,代表当前线程池即将 Game Over
private static final int TERMINATED = 3 << COUNT_BITS; // 011 代表线程池TERMINATED,代表当前线程池已经 Game Over,要执行terminated()
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; } // 得到线程池的状态
private static int workerCountOf(int c) { return c & COUNT_MASK; } // 得到当前线程池的线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } // 得到上面提到的 32 位 int类型的数值
-
RUNNING
线程池一旦被创建,就处于 RUNNING 状态,任务数为 0,能够接收新任务,对已排队的任务进行处理。
-
SHUTDOWN
不接收新任务,但能处理已排队的任务。调用线程池的 shutdown() 方法,线程池由 RUNNING 转变为 SHUTDOWN 状态。
-
STOP
不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的 shutdownNow() 方法,线程池由(RUNNING 或 SHUTDOWN ) 转变为 STOP 状态。
-
TIDYING
- SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态,会执行 terminated() 方法。线程池中的 terminated() 方法是空实现,可以重写该方法进行相应的处理。
- 线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池就会由 SHUTDOWN 转变为 TIDYING 状态。
- 线程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态。
-
TERMINATED
线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会由 TIDYING 转变为 TERMINATED 状态。
线程池执行流程
ThreadPoolExecutor 的 execute() 是提任务到线程池的核心方法,很重要。
public void execute(Runnable command) {
// 提交的任务不能为 null
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 获取核心线程 ctl,用于后面的判断
int c = ctl.get();
// 如果工作线程个数小于核心线程数
// 满足要求,添加核心工作线程
if (workerCountOf(c) < corePoolSize) {
// addWorker(任务,是核心线程吗)
// addWorker返回 true,代表添加工作线程成功
// addWorker返回 false,代表添加工作线程失败
// addWorker中会基于线程池状态,以及工作线程个数做判断,查看能否添加工作线程
if (addWorker(command, true))
// 工作线程构建出来了,任务也交给 command 去处理了
return;
// 说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次 ctl
c = ctl.get();
}
// 添加核心工作线程失败,调用一下
// 判断线程池状态是否是 RUNNING,如果是,正常基于阻塞队列的 offer 方法,将任务添加到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 如果任务添加到阻塞队列成功,走 if 内部
// 如果任务在扔到阻塞队列之前,线程池状态突然改变了
// 重新获取 ctl
int recheck = ctl.get();
// 如果线程池的状态不是 RUNNING,将任务从阻塞队列中移除
if (! isRunning(recheck) && remove(command))
// 并且直接拒绝策略
reject(command);
// 在这,说明阻塞队列有我刚刚放进去的任务
// 查看一下工作线程数是不是 0 个
// 如果工作线程为 0 个,需要添加一个非核心工作线程去处理阻塞队列中的任务
// 发生这种情况有两种:
// 1.构建线程池时,核心线程数为 0 个
// 2.即便有核心线程,可以设置核心线程也允许超时,设置 allowCoreThreadTimeOut 为 true,代表核心线程也可以关闭
else if (workerCountOf(recheck) == 0)
// 为了避免阻塞队列中的任务饥饿,添加一个非核心工作线程去处理
addWorker(null, false);
}
// 任务添加到阻塞队列失败
// 构建一个非核心工作线程
// 如果添加非核心工作线程成功,直接完事
else if (!addWorker(command, false))
// 添加失败,执行拒绝策略
reject(command);
}
addWorker() 中主要分成两大部分去看
- 校验线程池的状态以及工作线程个数
- 添加工作线程并且启动工作线程
校验线程池的状态以及工作线程个数
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 循环检查状态
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
// 检查线程池的运行状态是否 shutdown,任务队列是否为空等状态是否正常
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// 循环更新状态,和线程数量的更新,都是使用 CAS 的模式更新
for (;;) {
// 检查运行的线程数是否超标
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// CAS 方式更新线程数量
if (compareAndIncrementWorkerCount(c))
// 更新成功后直接跳转到方法第一行,并且不在进入这些 for 循环中
// 因为状态啥的已经更新成功了
break retry;
c = ctl.get(); // Re-read ctl
// 如果 CAS 增加线程数失败,检查下状态是否还是和之前一样
if (runStateAtLeast(c, SHUTDOWN))
// 不一样了,会跳转到第一行,并会重新进入 for 循环中走一遍流程
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建一个 worker,内部包含线程
w = new Worker(firstTask);
// 获取内部的线程对象
final Thread t = w.thread;
if (t != null) {
// 加锁,锁定
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 二次检查线程池的状态
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 池的状态没有问题,检查线程是否存活
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// worker 放入池的 hashset 中
workers.add(w);
workerAdded = true;
int s = workers.size();
// 并将 worker 的数量赋予池中的 largestPoolSize 成员变量
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
// worker 创建成功并加入 workers 成功
if (workerAdded) {
// 启动线程
t.start();
// 修改状态
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 启动线程失败,进行失败处理
addWorkerFailed(w);
}
return workerStarted;
}
// 如果可以在不超过队列容量的情况下立即插入指定的元素,则在该队列的尾部插入该元素;如果成功,则返回true;如果该队列已满,则返回false
public boolean offer(E e) {
// 不允许元素为空
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁 保证调用 offer 方法的时候只有一个线程
try {
// 队列满
if (count == items.length)
return false;
else {
// 队列未满
enqueue(e);
return true;
}
} finally {
// 释放锁 让其他线程可以调用 offer 方法
lock.unlock();
}
}
// 在当前放置位置插入元素,前进和发出信号。只有在锁定的情况下才能呼叫。
private void enqueue(E e) {
final Object[] items = this.items;
// 元素添加到数组中
items[putIndex] = e;
// 当索引满了 修改为0
if (++putIndex == items.length) putIndex = 0;
count++;
// 使用条件对象 notEmpty 通知 比如使用 take 方法的时候队列中没有数据 被阻塞 这个时候队列中插入了数据 需要调用 signal() 进行通知
notEmpty.signal();
}
Woker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 创建 worker 线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
private boolean isHeldExclusively() {
return getState() != 0;
}
private boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
private boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 任务不为空 执行任务 如果任务为空 通过getTask()从阻塞队列中获取任务
while (task != null || (task = getTask()) != null) {
// 加锁 避免被 SHUTDOWN 任务也不会中断
w.lock();
// 如果线程池状态大于等于STOP,请确保线程被中断;如果没有,请确保线程没有中断。这需要在第二种情况下重新检查,以处理关闭在清除中断时无竞争
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 中断
wt.interrupt();
try {
// 执行任务前的操作
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
// 执行任务后的操作
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}