ScheduledThreadPoolExecutor Source

ScheduledThreadPoolExecutor 源码分析

Posted by Mickey on July 18, 2019

ScheduledThreadPoolExecutor 可以用于注册任务,这些任务会在将来的某些阶段被调度触发(这篇博客只讲主要的思路)

ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor,对 ThreadPoolExecutor 感兴趣的可以移步ThreadPoolExecutor 源码分析,ScheduledThreadPoolExecutor 使用 DelayedWorkQueue 作为 BlockingQueue,DelayedWorkQueue 中存放的元素是 ScheduledFutureTask

DelayedWorkQueue 其实就是一个堆,按照 ScheduledFutureTask 的触发时间排序

schedule

schedule 方法会在 delay 时间之后,执行 command 的 run 方法,首先,生成一个 ScheduledFutureTask 实栗,然后调用 delayedExecute 方法将 ScheduledFutureTask 实例加入 DelayedWorkQueue 的堆,ensurePrestart 方法会在线程池中新启一个 Worker 线程

public ScheduledFuture<?> schedule(Runnable command,
                                    long delay,
                                    TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                        triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

ScheduledFutureTask

ScheduledFutureTask 是 DelayedWorkQueue 中存放的实栗,继承于 FutureTask,对 FutureTask 不熟悉的同学可以移步FutureTask 源码分析,我们知道 ThreadPoolExecutor 的 Worker 线程会从 BlockingQueue 中获取 task,调用 task 的 run 方法,这里 ScheduledFutureTask 的 run 方法会调用 FutureTask 的 run 方法,调用 Runnable 参数的 run 方法

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
    // Task 被触发的时间
    private long time;
    
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
    }

    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }
}

DelayedWorkQueue 的 take 方法

在 ThreadPoolExecutor 的 getTask 方法中,会调用 BlockingQueue 的 take/poll 方法,如下所示,可以看到,take 方法获取堆中下标为 0 的 ScheduledFutureTask 实例,判断是否到了触发的时间,没有则调用 available.await() 进行等待,到达了则执行 finishPoll 方法,将 ScheduledFutureTask 实例出堆并返回

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}