@@ -8,15 +8,15 @@ Netty 的时间轮 HashedWheelTimer 给出了一个粗略的定时器实现,
88
99### HashedWheelTimer 内部的数据结构
1010
11- ``` Java
12- private final HashedWheelBucket [] wheel;
11+ ``` java
12+ private final HashedWheelBucket [] wheel;
1313```
1414
1515HashedWheelTimer 的主体数据结构 wheel 是一个由多个链表所组成的数组,默认情况下该数组的大小为 512。当定时任务准备加入到时间轮中的时候,将会以其等待执行的时间为依据选择该数组上的一个具体槽位上的链表加入。
1616
17- ``` Java
18- private HashedWheelTimeout head;
19- private HashedWheelTimeout tail;
17+ ``` java
18+ private HashedWheelTimeout head;
19+ private HashedWheelTimeout tail;
2020```
2121
2222在这个 wheel 数组中,每一个槽位都是一条由 HashedWheelTimeout 所组成的链表,其中链表中的每一个节点都是一个等待执行的定时任务。
@@ -25,115 +25,117 @@ HashedWheelTimer 的主体数据结构 wheel 是一个由多个链表所组成
2525
2626在 HashedWheelTimer 中,其内部是一个单线程的 worker 线程,通过类似 eventloop 的工作模式进行定时任务的调度。
2727
28- ``` Java
28+ ``` java
2929@Override
30- public void run() {
31- // Initialize the startTime.
32- startTime = System . nanoTime();
33- if (startTime == 0 ) {
34- // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
35- startTime = 1 ;
36- }
37-
38- // Notify the other threads waiting for the initialization at start().
39- startTimeInitialized. countDown();
40-
41- do {
42- final long deadline = waitForNextTick();
43- if (deadline > 0 ) {
44- transferTimeoutsToBuckets();
45- HashedWheelBucket bucket =
46- wheel[(int ) (tick & mask)];
47- bucket. expireTimeouts(deadline);
48- tick++ ;
49- }
50- } while (WORKER_STATE_UPDATER . get(HashedWheelTimer . this ) == WORKER_STATE_STARTED );
51-
52- // Fill the unprocessedTimeouts so we can return them from stop() method.
53- for (HashedWheelBucket bucket: wheel) {
54- bucket. clearTimeouts(unprocessedTimeouts);
55- }
56- for (;;) {
57- HashedWheelTimeout timeout = timeouts. poll();
58- if (timeout == null ) {
59- break ;
60- }
61- unprocessedTimeouts. add(timeout);
62- }
30+ public void run() {
31+ // Initialize the startTime.
32+ startTime = System . nanoTime();
33+ if (startTime == 0 ) {
34+ // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
35+ startTime = 1 ;
36+ }
37+
38+ // Notify the other threads waiting for the initialization at start().
39+ startTimeInitialized. countDown();
40+
41+ do {
42+ final long deadline = waitForNextTick();
43+ if (deadline > 0 ) {
44+ transferTimeoutsToBuckets();
45+ HashedWheelBucket bucket =
46+ wheel[(int ) (tick & mask)];
47+ bucket. expireTimeouts(deadline);
48+ tick++ ;
49+ }
50+ } while (WORKER_STATE_UPDATER . get(HashedWheelTimer . this ) == WORKER_STATE_STARTED );
51+
52+ // Fill the unprocessedTimeouts so we can return them from stop() method.
53+ for (HashedWheelBucket bucket: wheel) {
54+ bucket. clearTimeouts(unprocessedTimeouts);
55+ }
56+ for (;;) {
57+ HashedWheelTimeout timeout = timeouts. poll();
58+ if (timeout == null ) {
59+ break ;
6360 }
61+ unprocessedTimeouts. add(timeout);
62+ }
63+ }
6464```
6565
66- 简单看到 HashedWheelTimer 内部的 woker 线程的 run()方法,在其首先会记录启动时间作为 startTime 作为接下来调度定时任务的时间依据,而之后会通过 CountDownLatch 来通知所有外部线程当前 worker 工作线程已经初始化完毕。之后的循环体便是当时间轮持续生效的时间里的具体调度逻辑。时间刻度是时间轮的一个重要属性,其默认为 100ms,此处的循环间隔便是时间轮的时间刻度,默认情况下就是间隔 100ms 进行一次调度循环。工作线程会维护当前工作线程具体循环了多少轮,用于定位具体执行触发时间轮数组上的哪一个位置上的链表。当时间轮准备 shutdown 的阶段,最后的代码会对未执行的任务整理到未执行的队列中。
66+ 简单看到 HashedWheelTimer 内部的 woker 线程的 run()方法,在其首先会记录启动时间作为 startTime 作为接下来调度定时任务的时间依据,而之后会通过 CountDownLatch 来通知所有外部线程当前 worker 工作线程已经初始化完毕。之后的循环体便是当时间轮持续生效的时间里的具体调度逻辑。时间刻度是时间轮的一个重要属性,其默认为 100ms,此处的循环间隔便是时间轮的时间刻度,默认情况下就是间隔 100ms 进行一次调度循环。工作线程会维护当前工作线程具体循环了多少轮,用于定位具体执行触发时间轮数组上的哪一个位置上的链表。当时间轮准备 shutdown 的阶段,最后的代码会对未执行的任务整理到未执行的队列中。
67+
6768由此可见,worker 线程的 run()方法中基本定义了工作线程的整个生命周期,从初始的初始化到循环体中的具体调度,最后到未执行任务的具体清理。整体的调度逻辑便主要在这里执行。值得注意的是,在这里的前提下,每个 HashedWheelTimer 时间轮都会有一个工作线程进行调度,所以不需要在 netty 中在每一个连接中单独使用一个 HashedWheelTimer 来进行定时任务的调度,否则可能将对性能产生影响。
6869
6970### 向 HashedWheelTimer 加入一个定时任务的流程
7071
7172当调用 HashedWheelTimer 的 newTimeout()方法的时候,即是将定时任务加入时间轮中的 api。
7273
73- ``` Java
74- @Override
75- public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
76- if (task == null ) {
77- throw new NullPointerException (" task" );
78- }
79- if (unit == null ) {
80- throw new NullPointerException (" unit" );
81- }
82- start();
83-
84- long deadline = System . nanoTime() + unit. toNanos(delay) - startTime;
85- HashedWheelTimeout timeout = new HashedWheelTimeout (this , task, deadline);
86- timeouts. add(timeout);
87- return timeout;
74+ ``` java
75+ @Override
76+ public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
77+ if (task == null ) {
78+ throw new NullPointerException (" task" );
79+ }
80+ if (unit == null ) {
81+ throw new NullPointerException (" unit" );
8882 }
83+ start();
84+
85+ long deadline = System . nanoTime() + unit. toNanos(delay) - startTime;
86+ HashedWheelTimeout timeout = new HashedWheelTimeout (this , task, deadline);
87+ timeouts. add(timeout);
88+ return timeout;
89+ }
8990```
9091
9192当此次是首次向该时间轮加入定时任务的时候,将会通过 start()方法开始执行上文所述的 worker 工作线程的启动与循环调度逻辑,这里暂且不提。之后计算定时任务触发时间相对于时间轮初始化时间的相对时间间隔 deadline,并将其包装为一个链表节点 HashedWheelTimeout ,投入到 timeouts 队列中,等待 worker 工作线程在下一轮调度循环中将其加入到时间轮的具体链表中等待触发执行,timeouts 的实现是一个 mpsc 队列,关于 mpsc 队列可以查看[ 此文] ( https://github.com/doocs/source-code-hunter/blob/master/docs/Netty/Netty%E6%8A%80%E6%9C%AF%E7%BB%86%E8%8A%82%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/MpscLinkedQueue%E9%98%9F%E5%88%97%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90.md ) ,这里也符合多生产者单消费者的队列模型。
9293
9394### HashedWheelTimer 中工作线程的具体调度
9495
95- ``` Java
96- do {
97- final long deadline = waitForNextTick();
98- if (deadline > 0 ) {
99- transferTimeoutsToBuckets();
100- HashedWheelBucket bucket =
101- wheel[(int ) (tick & mask)];
102- bucket. expireTimeouts(deadline);
103- tick++ ;
104- }
105- } while (WORKER_STATE_UPDATER . get(HashedWheelTimer . this ) == WORKER_STATE_STARTED );
96+ ``` java
97+ do {
98+ final long deadline = waitForNextTick();
99+ if (deadline > 0 ) {
100+ transferTimeoutsToBuckets();
101+ HashedWheelBucket bucket =
102+ wheel[(int ) (tick & mask)];
103+ bucket. expireTimeouts(deadline);
104+ tick++ ;
105+ }
106+ } while (WORKER_STATE_UPDATER . get(HashedWheelTimer . this ) == WORKER_STATE_STARTED );
106107```
107108
108109在 HashedWheelTimer 中的工作线程 run()方法的主要循环中,主要分为三个步骤。
109110
110- - 首先 worker 线程会通过 waitForNextTick()方法根据时间轮的时间刻度等待一轮循环的开始,在默认情况下时间轮的时间刻度是 100ms,那么此处 worker 线程也将在这个方法中 sleep 相应的时间等待下一轮循环的开始。此处也决定了时间轮的定时任务时间精度。
111- - 当 worker 线程经过相应时间间隔的 sleep 之后,也代表新的一轮调度开始。此时,会通过 transferTimeoutsToBuckets()方法将之前刚刚加入到 timeouts 队列中的定时任务放入到时间轮具体槽位上的链表中。
112-
113- ``` Java
114- for (int i = 0 ; i < 100000 ; i++ ) {
115- HashedWheelTimeout timeout = timeouts. poll();
116- if (timeout == null ) {
117- // all processed
118- break ;
119- }
120- if (timeout. state() == HashedWheelTimeout . ST_CANCELLED
121- || ! timeout. compareAndSetState(HashedWheelTimeout . ST_INIT , HashedWheelTimeout . ST_IN_BUCKET )) {
122- timeout. remove();
123- continue ;
124- }
125- long calculated = timeout. deadline / tickDuration;
126- long remainingRounds = (calculated - tick) / wheel. length;
127- timeout. remainingRounds = remainingRounds;
128-
129- final long ticks = Math . max(calculated, tick); // Ensure we don't schedule for past.
130- int stopIndex = (int ) (ticks & mask);
131-
132- HashedWheelBucket bucket = wheel[stopIndex];
133- bucket. addTimeout(timeout);
134- }
111+ 首先 worker 线程会通过 waitForNextTick()方法根据时间轮的时间刻度等待一轮循环的开始,在默认情况下时间轮的时间刻度是 100ms,那么此处 worker 线程也将在这个方法中 sleep 相应的时间等待下一轮循环的开始。此处也决定了时间轮的定时任务时间精度。
112+
113+ 当 worker 线程经过相应时间间隔的 sleep 之后,也代表新的一轮调度开始。此时,会通过 transferTimeoutsToBuckets()方法将之前刚刚加入到 timeouts 队列中的定时任务放入到时间轮具体槽位上的链表中。
114+
115+ ``` java
116+ for (int i = 0 ; i < 100000 ; i++ ) {
117+ HashedWheelTimeout timeout = timeouts. poll();
118+ if (timeout == null ) {
119+ // all processed
120+ break ;
121+ }
122+ if (timeout. state() == HashedWheelTimeout . ST_CANCELLED
123+ || ! timeout. compareAndSetState(HashedWheelTimeout . ST_INIT , HashedWheelTimeout . ST_IN_BUCKET )) {
124+ timeout. remove();
125+ continue ;
126+ }
127+ long calculated = timeout. deadline / tickDuration;
128+ long remainingRounds = (calculated - tick) / wheel. length;
129+ timeout. remainingRounds = remainingRounds;
130+
131+ final long ticks = Math . max(calculated, tick); // Ensure we don't schedule for past.
132+ int stopIndex = (int ) (ticks & mask);
133+
134+ HashedWheelBucket bucket = wheel[stopIndex];
135+ bucket. addTimeout(timeout);
136+ }
135137```
136138
137139首先,在每一轮的调度中,最多只会从 timeouts 队列中定位到时间轮 100000 个定时任务,这也是为了防止在这里耗时过久导致后面触发定时任务的延迟。在这里会不断从 timeouts 队列中获取刚加入的定时任务。具体的计算流程便是将定时任务相对于时间轮初始化时间的相对间隔与时间轮的时间刻度相除得到相对于初始化时间的具体轮数,之后便在减去当前轮数得到还需要遍历几遍整个时间轮数组得到 remainingRounds,最后将轮数与时间轮数组长度-1 相与,得到该定时任务到底应该存放到时间轮上哪个位置的链表。用具体的数组举个例子,该时间轮初始化时间为 12 点,时间刻度为 1 小时,时间轮数组长度为 8,当前时间 13 点,当向时间轮加入一个明天 13 点执行的任务的时候,首先得到该任务相对于初始化的时间间隔是 25 小时,也就是需要 25 轮调度,而当前 13 点,当前调度轮数为 1,因此还需要 24 轮调度,就需要再遍历 3 轮时间轮,因此 remainingRounds 为 3,再根据 25 与 8-1 相与的结果为 1,因此将该定时任务放置到时间轮数组下标为 1 的链表上等待被触发。这便是一次完整的定时任务加入到时间轮具体位置的计算。
138140
139- - 在 worker 线程的最后,就需要来具体执行定时任务了,首先通过当前循环轮数与时间轮数组长度-1 相与的结果定位具体触发时间轮数组上哪个位置上的链表,再通过 expireTimeouts()方法依次对链表上的定时任务进行触发执行。这里的流程就相对很简单,链表上的节点如果 remainingRounds 小于等于 0,那么就可以直接执行这个定时任务,如果 remainingRounds 大于 0,那么显然还没有到达触发的时间点,则将其-1 等待下一轮的调度之后再进行执行。在继续回到上面的例子,当 14 点来临之时,此时工作线程将进行第 2 轮的调度,将会把 2 与 8-1 进行相与得到结果 2,那么当前工作线程就会选择时间轮数组下标为 2 的链表依次判断是否需要触发,如果 remainingRounds 为 0 将会直接触发,否则将会将 remainingRounds-1 等待下一轮的执行。
141+ 在 worker 线程的最后,就需要来具体执行定时任务了,首先通过当前循环轮数与时间轮数组长度-1 相与的结果定位具体触发时间轮数组上哪个位置上的链表,再通过 expireTimeouts()方法依次对链表上的定时任务进行触发执行。这里的流程就相对很简单,链表上的节点如果 remainingRounds 小于等于 0,那么就可以直接执行这个定时任务,如果 remainingRounds 大于 0,那么显然还没有到达触发的时间点,则将其-1 等待下一轮的调度之后再进行执行。在继续回到上面的例子,当 14 点来临之时,此时工作线程将进行第 2 轮的调度,将会把 2 与 8-1 进行相与得到结果 2,那么当前工作线程就会选择时间轮数组下标为 2 的链表依次判断是否需要触发,如果 remainingRounds 为 0 将会直接触发,否则将会将 remainingRounds-1 等待下一轮的执行。
0 commit comments