Home
img of docs

介绍 线程 和 线程池 在 Java 中的基本概念、创建方式、核心参数和底层源码

chou403

/ Thread

/ c:

/ u:

/ 27 min read


Java 中线程的状态

5 种状态一般是针对传统的线程状态来说(操作系统层面)

image-20230823173134948

Java 中给线程准备了 6 种状态

image-20230823173427644

image-20230823173819528

NEW: Thread 对象被创建出来,但是还没有执行 start 方法

RUNNABLE: Thread 对象调用了 start 方法,就为 RUNNABLE 状态(CPU 调度/没有调度)

BLOCKED: synchronized 没有拿到同步锁,被阻塞的情况

WAITING: 调用 wait 方法就会处于 WAITING 状态,需要被手动唤醒

TIMED_WAITING: 调用 sleep 方法或者 join 方法,会被自动唤醒,无需手动唤醒

TERMINATED: run 方法执行完毕,线程生命周期到头了

1)新建状态(NEW): 当我们创建一个新的Thread对象时,该线程就处于新建状态,例如: Thread t = new Thread();

2)可运行状态(RUNNABLE): 当线程对象调用start()方法后,线程进入可运行状态。在这个状态下,线程已经做好了准备,随时等待CPU调度执行,这个状态包括了”就绪”和”运行”状态。

3)阻塞状态(BLOCKED): 线程在等待获取一个锁以进入或重新进入同步代码块时,它会进入阻塞状态。只有当该锁被释放并且线程被调度去获取这个锁,线程才能转换到RUNNABLE状态。

4)等待状态(WAITING): 线程进入等待状态,是因为它调用了其它线程的join方法,或者调用了无参数的wait方法。在这种情况下,线程会等待另一个线程的操作完成或者等待notify/notifyAll消息。

5)定时等待状态(TIMED_WAITING): 线程进入定时等待状态,是因为它调用了sleep或者带有指定时间的wait或join方法。在指定的时间过去之后,线程会自动返回RUNNABLE状态。如果它是由于调用wait或join方法进入的定时等待状态,还需要等待notify/notifyAll消息或者等待join的线程终止。

6)终止状态(TERMINATED): 线程任务执行完毕或者由于异常而结束,线程就会进入终止状态。在这个状态下,线程的生命周期实际上已经结束了,它不能再转换到其他任何状态。

BLOCKED,WAITING,TIMED_WAITING: 都可以理解为阻塞,等待状态,因为处在这三种状态下,CPU 不会调度当前线程

   NEW:
Thread thread = new Thread(()->{

});
System.out.println(thread.getState());

RUNNABLE:
Thread thread = new Thread(()->{
    while (true) {

    }
});
thread.start();
System.out.println(thread.getState());

BLOCKED:
Object object = new Object();
Thread thread = new Thread(()->{
    synchronized (object) {

    }
});
synchronized (object) {
    thread.start();
    Thread.sleep(500);
    System.out.println(thread.getState());
}

WAITING:
Object object = new Object();
Thread thread = new Thread(() -> {
    synchronized (object) {
        try {
            object.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
thread.start();
Thread.sleep(500);
System.out.println(thread.getState());

TIMED_WAITING:
Thread thread = new Thread(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
thread.start();
Thread.sleep(500);
System.out.println(thread.getState());

TERMINATED:
Thread thread = new Thread(() -> {
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
thread.start();
Thread.sleep(1000);
System.out.println(thread.getState());

ThreadPool

image-20230411174505779

JDK 提供的线程池

newFixedThreadPool

这个线程的特点是线程数是固定的。

   public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

构建时,需要给 newFixedThreadPool 方法提供一个 nThreads 的属性,而这个属性就是当前线程池线程的个数,当前线程池的本质其实就是使用 ThreadPoolExecutor。

构建好当前线程池后,线程个数已经固定好(线程是懒加载,在构建之初,线程并没有构建出来,而是随着任务的提交才会将线程在线程池中构建出来)。如果线程没有构建,线程会待着任务执行被创建和执行。如果线程都已经构建好了,此时任务会被放到 LinkedBlockingQueue 无界队列中存放,等待线程从 LinkedBlockingQueue 中去 take 出去,然后执行。

newSingleThreadExecutor

单例线程池,该线程池只有一个工作线程在处理任务,如果业务是顺序消费们可以采用该线程池。

这种线程池比较简单,它内部只有一个线程,会用唯一的工作线程来执行任务。它的原理和固定线程数量的线程池的原理是一样的,只不过这个时候它的线程数量就直接被设置为 1,也就是只有一个线程。

   public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

private static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    @SuppressWarnings("deprecation")
    protected void finalize() {
        super.shutdown();
    }
}

单例线程池,线程池中只有一个工作线程在处理任务。

如果业务中涉及了顺序消费,可以采用 newSingleThreadExecutor。

newCachedThreadPool

缓存线程池,corePoolSize为0,当第一次提交任务到线程池时,会直接构建一个工作线程,Integer.MAX_VALUE: 意味着线程数量可以无限大,keepAliveTime为60S,60秒内没有任务进来,意味着线程空闲时间超过60S就会被杀死;如果在等待60秒期间有任务进来,他会再次拿到这个任务去执行,特点是: 任务只要提交到该线程池就必然有工作线程处理。

它是可缓存的线程池,并且还会回收。它会把任务交给我们的线程,而且线程不够用的话,就会创建线程。如果线程过多,就会把这些线程给回收回来。

   public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
newScheduledThreadPool

定时线程池,创建一个定时线程池,即按一定的周期执行任务,即定时任务,或者设置定时时间,延迟执行任务,由于该线程池是继承ThreadPoolExecutor,所以本质上还是ThreadPoolExecutor线程池,只不过是在原来的基础上添加了定时功能,其原理是基于DelayQueue实现延迟执行,周期性执行,任务执行完毕,再次扔会到阻塞队列。

   public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}
newWorkStealingPool

newWorkStealingPool简单翻译是任务窃取线程池。和别的4种不同,它用的是ForkJoinPool。使用ForkJoinPool的好处是,把1个任务拆分成多个”小任务”,把这些”小任务”分发到多个线程上执行。这些”小任务”都执行完成后,再将结果合并。之前的线程池中,多个线程共有一个阻塞队列,而newWorkStealingPool 中每一个线程都有一个自己的队列。当线程发现自己的队列没有任务了,就会到别的线程的队列里获取任务执行。可以简单理解为”窃取”。一般是自己的本地队列采取LIFO(后进先出),窃取时采用FIFO(先进先出),一个从头开始执行,一个从尾部开始执行,由于偷取的动作十分快速,会大量降低这种冲突,也是一种优化方式。

   public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

核心参数

为什么要自定义线程池, 首先 ThreadPoolExecutor 中,一种提供了 7 个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。

但是如果直接采用 JDK 提供的方式去构建,可以设置的参数最多两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐去自定义线程池。手动的去 new ThreadPoolExecutor 设置它的一些核心属性。

自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。

自定义线程池七大参数
  • int corePoolSize

    核心线程数,创建线程池后不会立即创建核心线程,当有任务到达时才触发核心线程的创建,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。核心线程在allowCoreThreadTimeout被设置为true时会超时并被回收,默认情况下不会被回收。

  • int maximumPoolSize

    最大线程数,代表当前线程池中,一共可以有多少个工作线程。当线程数大于或等于corePoolSize,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会按照rejectedExecutionHandler配置的处理策略进行处理线程。

  • long keepAliveTime

    非核心工作线程在阻塞队列位置等待的时间,当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。

  • TimeUnit unit

    非核心工作线程在阻塞队列位置等待时间的单位。

  • BlockingQueue<Runnable> workQueue

    任务没有在核心工作线程处理时,任务先扔到阻塞队列中。阻塞队列,用来存储等待执行的任务,新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。这里的阻塞队列有以下几种选择:

    • ArrayBlockingQueue: 基于数组的有界阻塞队列,按FIFO排序
    • LinkedBlockingQueue: 基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序
    • PriorityBlockingQueue: 具有优先级的无界阻塞队列,优先级通过参数Comparator实现
    • SynchronousQueue: 一个不缓存任务的阻塞队列,也就是说新任务进来时,不会缓存,而是直接被调度执行该任务
  • ThreadFactory threadFactory

    线程工厂,创建一个新线程时使用的工厂,可以用来设定线程名,是否为daemon线程等等。在构建线程的线程工作,可以设置thread 的一些信息。

  • RejectedExecutionHandler handler

    当前线程池无法处理投递过来的任务时,执行当前的拒绝策略 。拒绝策略有以下:

    • AbortPolicy: 当前拒绝策略在无法处理任务时,会抛出一个异常
    • CallerRunPolicy: 当前拒绝策略在线程池无法处理任务时,会将任务交给调用者处理
    • DiscardPolicy: 当前拒绝策略在无法处理任务时,会直接将任务丢掉
    • DiscardOldestPolicy: 当前拒绝策略在无法处理任务时,将队列中最早的任务丢掉,将当前再次尝试交给线程池处理
    • 自定义Policy: 根据自己的任务,将任务扔到数据库,也可以做其他操作。

线程池的状态

线程池的5种状态: RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。

   // 是个int类型的数值 表达了两个意思 1: 声明当前线程池的状态 2: 声明线程池中的线程数
// 高3位是线程池状态 低29位是线程池中线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29 方便后面的位运算
private static final int COUNT_BITS = Integer.SIZE - 3;
// 通过位运算得出最大容量
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 线程池状态
private static final int RUNNING    = -1 << COUNT_BITS; // 111 代表线程池为RUNNING,代表正常接收任务
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000 代表线程池为SHUTDOWN,不接收新任务,但是内部还会处理阻塞队列中的任务,正在进行的任务也会正常处理
private static final int STOP       =  1 << COUNT_BITS; // 001 代表线程池为STOP,不接收新任务,也不会处理阻塞队列中的任务,同时会中断正在进行的任务
private static final int TIDYING    =  2 << COUNT_BITS; // 010 代表线程池为TIDYING,过渡的状态,代表当前线程池即将 Game Over
private static final int TERMINATED =  3 << COUNT_BITS; // 011 代表线程池TERMINATED,代表当前线程池已经 Game Over,要执行terminated()

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~COUNT_MASK; } // 得到线程池的状态
private static int workerCountOf(int c)  { return c & COUNT_MASK; } // 得到当前线程池的线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } // 得到上面提到的 32 位 int类型的数值
  • RUNNING

    线程池一旦被创建,就处于 RUNNING 状态,任务数为 0,能够接收新任务,对已排队的任务进行处理。

  • SHUTDOWN

    不接收新任务,但能处理已排队的任务。调用线程池的 shutdown() 方法,线程池由 RUNNING 转变为 SHUTDOWN 状态。

  • STOP

    不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的 shutdownNow() 方法,线程池由(RUNNING 或 SHUTDOWN ) 转变为 STOP 状态。

  • TIDYING

    • SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态,会执行 terminated() 方法。线程池中的 terminated() 方法是空实现,可以重写该方法进行相应的处理。
    • 线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池就会由 SHUTDOWN 转变为 TIDYING 状态。
    • 线程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态。
  • TERMINATED

    线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会由 TIDYING 转变为 TERMINATED 状态。

image-20230411173800910

线程池执行流程

ThreadPoolExecutor 的 execute() 是提任务到线程池的核心方法,很重要。

   public void execute(Runnable command) {
  // 提交的任务不能为 null
  if (command == null)
    throw new NullPointerException();
  /*
    * Proceed in 3 steps:
    *
    * 1. If fewer than corePoolSize threads are running, try to
    * start a new thread with the given command as its first
    * task.  The call to addWorker atomically checks runState and
    * workerCount, and so prevents false alarms that would add
    * threads when it shouldn't, by returning false.
    *
    * 2. If a task can be successfully queued, then we still need
    * to double-check whether we should have added a thread
    * (because existing ones died since last checking) or that
    * the pool shut down since entry into this method. So we
    * recheck state and if necessary roll back the enqueuing if
    * stopped, or start a new thread if there are none.
    *
    * 3. If we cannot queue task, then we try to add a new
    * thread.  If it fails, we know we are shut down or saturated
    * and so reject the task.
    */
  // 获取核心线程 ctl,用于后面的判断
  int c = ctl.get();
  // 如果工作线程个数小于核心线程数
  // 满足要求,添加核心工作线程
  if (workerCountOf(c) < corePoolSize) {
    // addWorker(任务,是核心线程吗)
    // addWorker返回 true,代表添加工作线程成功
    // addWorker返回 false,代表添加工作线程失败
    // addWorker中会基于线程池状态,以及工作线程个数做判断,查看能否添加工作线程
    if (addWorker(command, true))
        // 工作线程构建出来了,任务也交给 command 去处理了
        return;
    // 说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次 ctl
    c = ctl.get();
  }
  // 添加核心工作线程失败,调用一下
  // 判断线程池状态是否是 RUNNING,如果是,正常基于阻塞队列的 offer 方法,将任务添加到阻塞队列
  if (isRunning(c) && workQueue.offer(command)) {
    // 如果任务添加到阻塞队列成功,走 if 内部
    // 如果任务在扔到阻塞队列之前,线程池状态突然改变了
    // 重新获取 ctl
    int recheck = ctl.get();
    // 如果线程池的状态不是 RUNNING,将任务从阻塞队列中移除
    if (! isRunning(recheck) && remove(command))
      // 并且直接拒绝策略
      reject(command);
    // 在这,说明阻塞队列有我刚刚放进去的任务
    // 查看一下工作线程数是不是 0 个
    // 如果工作线程为 0 个,需要添加一个非核心工作线程去处理阻塞队列中的任务
    // 发生这种情况有两种:
    // 1.构建线程池时,核心线程数为 0 个
    // 2.即便有核心线程,可以设置核心线程也允许超时,设置 allowCoreThreadTimeOut 为 true,代表核心线程也可以关闭
    else if (workerCountOf(recheck) == 0)
      // 为了避免阻塞队列中的任务饥饿,添加一个非核心工作线程去处理
      addWorker(null, false);
  }
  // 任务添加到阻塞队列失败
  // 构建一个非核心工作线程
  // 如果添加非核心工作线程成功,直接完事
  else if (!addWorker(command, false))
    // 添加失败,执行拒绝策略
    reject(command);
}

image-20230828000746127

addWorker() 中主要分成两大部分去看

  • 校验线程池的状态以及工作线程个数
  • 添加工作线程并且启动工作线程

校验线程池的状态以及工作线程个数

   private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  // 循环检查状态
  for (int c = ctl.get();;) {
    // Check if queue empty only if necessary.
    // 检查线程池的运行状态是否 shutdown,任务队列是否为空等状态是否正常
    if (runStateAtLeast(c, SHUTDOWN)
      && (runStateAtLeast(c, STOP)
        || firstTask != null
        || workQueue.isEmpty()))
      return false;

    // 循环更新状态,和线程数量的更新,都是使用 CAS 的模式更新
    for (;;) {
      // 检查运行的线程数是否超标
      if (workerCountOf(c)
        >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
        return false;
      // CAS 方式更新线程数量
      if (compareAndIncrementWorkerCount(c))
        // 更新成功后直接跳转到方法第一行,并且不在进入这些 for 循环中
        // 因为状态啥的已经更新成功了
        break retry;
      c = ctl.get();  // Re-read ctl
      // 如果 CAS 增加线程数失败,检查下状态是否还是和之前一样
      if (runStateAtLeast(c, SHUTDOWN))
        // 不一样了,会跳转到第一行,并会重新进入 for 循环中走一遍流程
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
    }
  }

  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    // 新建一个 worker,内部包含线程
    w = new Worker(firstTask);
    // 获取内部的线程对象
    final Thread t = w.thread;
    if (t != null) {
      // 加锁,锁定
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int c = ctl.get();

        // 二次检查线程池的状态
        if (isRunning(c) ||
          (runStateLessThan(c, STOP) && firstTask == null)) {
          // 池的状态没有问题,检查线程是否存活
          if (t.getState() != Thread.State.NEW)
            throw new IllegalThreadStateException();
          // worker 放入池的 hashset 中
          workers.add(w);
          workerAdded = true;
          int s = workers.size();
          // 并将 worker 的数量赋予池中的 largestPoolSize 成员变量
          if (s > largestPoolSize)
            largestPoolSize = s;
        }
      } finally {
        mainLock.unlock();
      }
      // worker 创建成功并加入 workers 成功
      if (workerAdded) {
        // 启动线程
        t.start();
        // 修改状态
        workerStarted = true;
      }
    }
  } finally {
    if (! workerStarted)
      // 启动线程失败,进行失败处理
      addWorkerFailed(w);
  }
  return workerStarted;
}
   // 如果可以在不超过队列容量的情况下立即插入指定的元素,则在该队列的尾部插入该元素;如果成功,则返回true;如果该队列已满,则返回false
public boolean offer(E e) {
    // 不允许元素为空
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁 保证调用 offer 方法的时候只有一个线程
    try {
        // 队列满
        if (count == items.length)
            return false;
        else {
            // 队列未满
            enqueue(e);
            return true;
        }
    } finally {
        // 释放锁 让其他线程可以调用 offer 方法
        lock.unlock();
    }
}
   // 在当前放置位置插入元素,前进和发出信号。只有在锁定的情况下才能呼叫。
private void enqueue(E e) {
    final Object[] items = this.items;
    // 元素添加到数组中
    items[putIndex] = e;
    // 当索引满了 修改为0
    if (++putIndex == items.length) putIndex = 0;
    count++;
    // 使用条件对象 notEmpty 通知 比如使用 take 方法的时候队列中没有数据 被阻塞 这个时候队列中插入了数据 需要调用 signal() 进行通知
    notEmpty.signal();
}

Woker

   private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
  /**
   * This class will never be serialized, but we provide a
   * serialVersionUID to suppress a javac warning.
   */
  private static final long serialVersionUID = 6138294804551838833L;

  /** Thread this worker is running in.  Null if factory fails. */
  final Thread thread;
  /** Initial task to run.  Possibly null. */
  Runnable firstTask;
  /** Per-thread task counter */
  volatile long completedTasks;

  // TODO: switch to AbstractQueuedLongSynchronizer and move
  // completedTasks into the lock word.

  /**
   * Creates with given first task and thread from ThreadFactory.
   * @param firstTask the first task (null if none)
   */
  Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 创建 worker 线程
    this.thread = getThreadFactory().newThread(this);
  }

  /** Delegates main run loop to outer runWorker. */
  public void run() {
    runWorker(this);
  }

  // Lock methods
  //
  // The value 0 represents the unlocked state.
  // The value 1 represents the locked state.

  private boolean isHeldExclusively() {
    return getState() != 0;
  }

  private boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  private boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock() {
    acquire(1);
  }

  public boolean tryLock() {
    return tryAcquire(1);
  }

  public void unlock() {
    release(1);
  }

  public boolean isLocked() {
    return isHeldExclusively();
  }

  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}
   final void runWorker(Worker w) {
    // 获取当前线程
    Thread wt = Thread.currentThread();
    // 获取任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 任务不为空 执行任务 如果任务为空 通过getTask()从阻塞队列中获取任务
        while (task != null || (task = getTask()) != null) {
            // 加锁 避免被 SHUTDOWN 任务也不会中断
            w.lock();
            // 如果线程池状态大于等于STOP,请确保线程被中断;如果没有,请确保线程没有中断。这需要在第二种情况下重新检查,以处理关闭在清除中断时无竞争
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 中断
                wt.interrupt();
            try {
                // 执行任务前的操作
                beforeExecute(wt, task);
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // 执行任务后的操作
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}