2018-11-28 · Develop

定时任务调度 Timer

什么是定时任务: 在指定时间指定时间间隔 执行 指定任务 指定的次数 。时间间隔的单位可以是分钟、小时、日、月、周及以上的任意组合。

schedule-job-java-timer

一个极简的栗子:

    public static void main(String[] args) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        System.out.println(sdf.format(System.currentTimeMillis()));

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println(sdf.format(System.currentTimeMillis()));
            }
        }, 2 * 1000L, 1000L);
    }

上面是一个定时器的极简栗子,实现每隔一秒打印当前时间。
TimerTask 继承至 Runable ,主要定义时间到时需要执行的任务。
Timer 是一个定时调度器,在指定时间和间隔去执行一个任务 TimerTask

看下定时器 Timer 的主要方法:

java-timer-uml

  1. public void schedule(TimerTask task, long delay) 经过 delay(ms) 后执行一次 task
  2. public void schedule(TimerTask task, Date time) 到达指定时间 time 后执行一次 task
  3. public void schedule(TimerTask task, long delay, long period) 经过 delay(ms) 后以 period(ms) 作为时间间隔循环执行 task
  4. public void schedule(TimerTask task, Date firstTime, long period) 到达指定时间 time 后以 period(ms) 作为时间间隔循环执行 task task
  5. public void scheduleAtFixedRate(TimerTask task, long delay, long period) 同方法3,具体不同后面分析
  6. public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) 同方法4,具体不同后面分析
  7. public void cancel() 取消所有的任务
  8. public int purge() 从 TaskQueue 中移除所有 cancel 了的 Task

schedule 与 scheduleAtFixedRate 的区别:

不同之处主要体现在两种情况下:

测试代码:

    public static void main(String[] args) {

        final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        System.out.println(sdf.format(System.currentTimeMillis()));

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("schedule => " + sdf.format(System.currentTimeMillis()));
            }
        }, new Date(System.currentTimeMillis() - 6 * 1000L), 1000L);

        new Timer().scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                System.out.println("scheduleAtFixedRate => " + sdf.format(System.currentTimeMillis()));
            }
        }, new Date(System.currentTimeMillis() - 6 * 1000L), 1000L);
    }

结果输出:

2018-09-12 23:21:54
schedule => 2018-09-12 23:21:54
scheduleAtFixedRate => 2018-09-12 23:21:54
scheduleAtFixedRate => 2018-09-12 23:21:54
scheduleAtFixedRate => 2018-09-12 23:21:54
scheduleAtFixedRate => 2018-09-12 23:21:54
scheduleAtFixedRate => 2018-09-12 23:21:54
scheduleAtFixedRate => 2018-09-12 23:21:54
scheduleAtFixedRate => 2018-09-12 23:21:54
schedule => 2018-09-12 23:21:55
scheduleAtFixedRate => 2018-09-12 23:21:55
schedule => 2018-09-12 23:21:56
scheduleAtFixedRate => 2018-09-12 23:21:56
scheduleAtFixedRate => 2018-09-12 23:21:57
schedule => 2018-09-12 23:21:57
scheduleAtFixedRate => 2018-09-12 23:21:58
schedule => 2018-09-12 23:21:58
...

从输出结果可以看出在首次计划执行的时间早于当前时间时:
schedule 以当前时间为首次执行时间
scheduleAtFixedRate 会试图在首次执行时将前面需要执行的次数补齐,及首次执行了 1 + 6 = 7 次,这样就需要考虑并发的情况。

测试代码:

    public static void main(String[] args) {

        final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        System.out.println(sdf.format(System.currentTimeMillis()));

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3 * 1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("schedule => " + sdf.format(scheduledExecutionTime()));
            }
        }, new Date(), 1000L);

        new Timer().scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3 * 1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("scheduleAtFixedRate => " + sdf.format(scheduledExecutionTime()));
            }
        }, new Date(), 1000L);
    }

执行结果:

2018-09-12 23:48:09
scheduleAtFixedRate => 2018-09-12 23:48:09
schedule => 2018-09-12 23:48:09
scheduleAtFixedRate => 2018-09-12 23:48:10
schedule => 2018-09-12 23:48:12
scheduleAtFixedRate => 2018-09-12 23:48:11
schedule => 2018-09-12 23:48:15
scheduleAtFixedRate => 2018-09-12 23:48:12
schedule => 2018-09-12 23:48:18
scheduleAtFixedRate => 2018-09-12 23:48:13
schedule => 2018-09-12 23:48:21
scheduleAtFixedRate => 2018-09-12 23:48:14
schedule => 2018-09-12 23:48:24
...

可以看出 schedule 的预计时间间隔是3s,而 scheduleAtFixedRate 的预计时间间隔是1s
schedule 下一次执行的时间是上次实际执行完成的时间点,执行时间会不断的延后。
scheduleAtFixedRate 下一次执行的时间相对于上一次开始的时间点,因此执行时间一般不会延后,需要考虑并发的情况。

TimerTask 的主要方法

java-timer-task-uml

  1. public boolean cancel() 取消当前任务
  2. public long scheduledExecutionTime() 获取计算执行时间

源码解读

TimerTask

TimerTask 继承 Runnable 除了上述两个方法外,就只有 Task 的状态:

    /**
     * This task has not yet been scheduled.
     */
    static final int VIRGIN = 0;

    /**
     * This task is scheduled for execution.  If it is a non-repeating task,
     * it has not yet been executed.
     */
    static final int SCHEDULED   = 1;

    /**
     * This non-repeating task has already executed (or is currently
     * executing) and has not been cancelled.
     */
    static final int EXECUTED    = 2;

    /**
     * This task has been cancelled (with a call to TimerTask.cancel).
     */
    static final int CANCELLED   = 3;

VIRGIN 表示 Task 刚刚被创建
SCHEDULED 表示 Task 已经被加入 TaskQueue 中,等待调度
EXECUTED 表示 Task 已经被执行
CANCELLED 表示 Task 已经被取消

TaskQueue

TaskQueue 是用来存储 Task 的队列,内部使用数组实现的最小堆,最近需要执行的 Task 在堆顶,在进行任务调度时只需判断堆顶元素时间是否满足即可,所以效率极高

    /**
     * Adds a new task to the priority queue.
     */
    void add(TimerTask task) {
        // Grow backing store if necessary
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);

        queue[++size] = task;
        fixUp(size);
    }

    /**
     * Establishes the heap invariant (described above) assuming the heap
     * satisfies the invariant except possibly for the leaf-node indexed by k
     * (which may have a nextExecutionTime less than its parent's).
     *
     * This method functions by "promoting" queue[k] up the hierarchy
     * (by swapping it with its parent) repeatedly until queue[k]'s
     * nextExecutionTime is greater than or equal to that of its parent.
     */
    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

    /**
     * Establishes the heap invariant (described above) in the subtree
     * rooted at k, which is assumed to satisfy the heap invariant except
     * possibly for node k itself (which may have a nextExecutionTime greater
     * than its children's).
     *
     * This method functions by "demoting" queue[k] down the hierarchy
     * (by swapping it with its smaller child) repeatedly until queue[k]'s
     * nextExecutionTime is less than or equal to those of its children.
     */
    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size &&
                queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

    /**
     * Establishes the heap invariant (described above) in the entire tree,
     * assuming nothing about the order of the elements prior to the call.
     */
    void heapify() {
        for (int i = size/2; i >= 1; i--)
            fixDown(i);
    }

TimerThread

TimerThread 是调度和执行 Task 的线程

    public void run() {
        try {
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }

    /**
     * The main timer loop.  (See class comment.)
     */
    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }

TimerThread 循环判断 queue 是否为空:

  1. 如果为空和可能有新 task 加入队列,则等待。
  2. 如果 wait 被唤醒后还是为空,则表示 wait 是被 timer 的 cancel 唤醒,跳出循环。
  3. 如果 queue 不为空,从 queue 中取出最近即将到时间的 task,判断 task 的状态。
    • 如果 task 是取消状态, 移除 task 继续循环。
    • 如果 task 是其他状态, 则判断 task 的执行时间是否已经到了。
      • 如果还没到,则计算目标调度时间和当前时间的差值 delta,继续 wait delta 毫秒,wait 时间到之后会结束本次循环,在下一次循环中。
      • 如果没有新的更早的 task 加入,则当前的 task 将会被执行。

Timer

Timer 主要注意以下几个方面:

当向Timer中增加新的Task:

    /**
     * Schedule the specified timer task for execution at the specified
     * time with the specified period, in milliseconds.  If period is
     * positive, the task is scheduled for repeated execution; if period is
     * zero, the task is scheduled for one-time execution. Time is specified
     * in Date.getTime() format.  This method checks timer state, task state,
     * and initial execution time, but not period.
     *
     * @throws IllegalArgumentException if <tt>time</tt> is negative.
     * @throws IllegalStateException if task was already scheduled or
     *         cancelled, timer was cancelled, or timer thread terminated.
     * @throws NullPointerException if {@code task} is null
     */
    private void sched(TimerTask task, long time, long period) {
        if (time < 0)
            throw new IllegalArgumentException("Illegal execution time.");

        // Constrain value of period sufficiently to prevent numeric
        // overflow while still being effectively infinitely large.
        if (Math.abs(period) > (Long.MAX_VALUE >> 1))
            period >>= 1;

        synchronized(queue) {
            if (!thread.newTasksMayBeScheduled)
                throw new IllegalStateException("Timer already cancelled.");

            synchronized(task.lock) {
                if (task.state != TimerTask.VIRGIN)
                    throw new IllegalStateException(
                        "Task already scheduled or cancelled");
                task.nextExecutionTime = time;
                task.period = period;
                task.state = TimerTask.SCHEDULED;
            }

            queue.add(task);
            if (queue.getMin() == task)
                queue.notify();
        }
    }

将 task 加入 queue 中后判断if (queue.getMin() == task)最近的 task 是否是新加入的 task, 如果是新加入的 task, 则需要唤醒 TimeThread 的 wait。如果不唤醒的话, 等 TimeThread 自然结束的话, 新加入的 task 有可能会过期。

Timer 的 cancel 也会唤醒 TaskThread 的 wait:

    /**
     * Terminates this timer, discarding any currently scheduled tasks.
     * Does not interfere with a currently executing task (if it exists).
     * Once a timer has been terminated, its execution thread terminates
     * gracefully, and no more tasks may be scheduled on it.
     *
     * <p>Note that calling this method from within the run method of a
     * timer task that was invoked by this timer absolutely guarantees that
     * the ongoing task execution is the last task execution that will ever
     * be performed by this timer.
     *
     * <p>This method may be called repeatedly; the second and subsequent
     * calls have no effect.
     */
    public void cancel() {
        synchronized(queue) {
            thread.newTasksMayBeScheduled = false;
            queue.clear();
            queue.notify();  // In case queue was already empty.
        }
    }

将 newTasksMayBeScheduled 设置为 false 的同时,调用 queue 的 notify 方法,唤醒正在等待 queue 的线程。 如果不调用 notify() 的话,
在 queue 为空的情况下, TimeThread 会造成死循环的情况。

TimeThread ==> mainLoop() ==>
while (true) {
    ...
    // Wait for queue to become non-empty
    while (queue.isEmpty() && newTasksMayBeScheduled)
        queue.wait();
    if (queue.isEmpty())
        break; // Queue is empty and will forever remain; die
    ...
}

**Timer 引用过期和 queue 为空,在 GC 前唤醒 wait **

    /**
     * This object causes the timer's task execution thread to exit
     * gracefully when there are no live references to the Timer object and no
     * tasks in the timer queue.  It is used in preference to a finalizer on
     * Timer as such a finalizer would be susceptible to a subclass's
     * finalizer forgetting to call it.
     */
    private final Object threadReaper = new Object() {
        protected void finalize() throws Throwable {
            synchronized(queue) {
                thread.newTasksMayBeScheduled = false;
                queue.notify(); // In case queue is empty.
            }
        }
    };

在 timer 没有对象引用的时候,加上 queue 中没有 task 时, timer 对象会被 GC 回收, 所以对象 threadReaper 也会也会被回收, 回收前调用 finalize 方法, 把 newTasksMayBeScheduled 设置为 false ,并且唤起正在 wait 的 TimerThread 线程。


参考文档: