0%

Android-EventBus修改纪实(三)-线程调度

Android-EventBus修改纪实(三)

持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第 5 天,点击查看活动详情


前言

在上一篇 Android-EventBus修改纪实(二) 中笔者简单分析了 EventBus 提供的 5 种线程模型的作用及各模型的使用场景与注意事项,特别是在 POSTING 线程模型下,要谨慎的使用黏性事件和必达事件。

上一篇中未对线程模型的具体实现做分析,本篇文章分析下线程模型是如何做线程调度的。

本篇文章只讨论发布普通事件的线程调度,粘性事件和必达事件的线程调度与普通事件略有不同。

线程模型

先简单回顾下上篇文章中关于线程模型的分析:

  • POSTING:对于普通事件来说,事件发布与订阅方法将在同一个线程,这是该线程调度模型的本意
  • MAIN:在主线程来分发事件,根据是否在 Android 上使用,处理逻辑不同
  • MAIN_ORDER:在主线程依次分发事件
  • BACKGROUND:在后台线程来分发事件,根据是否在 Android 上使用,处理逻辑不同。使用单线程处理,尽量不要进行耗时操作以免阻塞后台线程
  • ASYNC:在异步线程来分发事件,使用线程池处理

事件分发器

EventBus 使用 事件分发器 对事件进行线程调度,除 POSTING 线程模型外,其他 4 种线程模型都有 事件分发器 的身影,事件分发器 主要在postToSubscription 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
// 处理POSTING
case POSTING:
// 直接分发事件
invokeSubscriber(subscription, event);
break;
// 处理MAIN
case MAIN:
// 是否是主线程,在非 Android 平台上时,isMainThread 也是 true
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
// 此时 mainThreadPoster != null,可以认为是在 Android 平台上
mainThreadPoster.enqueue(subscription, event);
}
break;
// 处理MAIN_ORDERED
case MAIN_ORDERED:
// 不管是否是主线程,总是先判断是否可以入队
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
// 否则直接分发事件
invokeSubscriber(subscription, event);
}
break;
// 处理BACKGROUND
case BACKGROUND:
// 如果是主线程则入队,backgroundPoster使用单个后台线程依次分发事件,订阅方法应尽快返回以免阻塞后台线程
// 在非 Android 平台上时,isMainThread 也是 true,即在非 Android 平台上时,始终在后台线程分发事件
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
// 否则直接分发事件
invokeSubscriber(subscription, event);
}
break;
// 处理ASYNC
case ASYNC:
// 始终异步分发事件
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

postToSubscription 方法中可以看出各线程模型对应分别对应哪种 事件分发器

  • MAIN,MAIN_ORDERED:mainThreadPoster,mainThreadPoster 在Android 平台上的类型是 HandlerPoster
  • BACKGROUND:backgroundPoster,backgroundPoster 的类型是 BackgroundPoster
  • ASYNC:asyncPoster,asyncPoster 的类型是 AsyncPoster

以上事件发布器都实现了 Poster 接口:

1
2
3
4
5
6
7
8
9
10
public interface Poster {

/**
* 待发布事件入队
*
* @param subscription 订阅者和订阅方法
* @param event 待发布事件
*/
void enqueue(Subscription subscription, Object event);
}

Poster 接口是对事件分发器行为的抽象。

Poster 接口中唯一的接口方法名称 enqueue 来看,可以大概猜测到事件分发器的实现思想为:事件循环机制

既然是 事件循环,接口方法名称又是 enqueue,猜测有队列相关的实现。

待发布事件队列

每个事件分发器都有一个待发布事件队列,将待发布的事件存储在队列中,待发布事件队列是通过一种先进先出(FIFO)的单向链表实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final class PendingPostQueue {
// 链表头部元素
private PendingPost head; // 真链表

// 链表尾部元素
private PendingPost tail; // 假链表,真链表尾部元素的引用

// 插入链表尾部
synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}

// 尾部元素不为空,认为链表中有数据
if (tail != null) {
// 新的元素插入链表尾部
tail.next = pendingPost;

// 更新尾部元素引用
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}

// 通知所有等待的线程,用于`poll(int)`中`wait()`
notifyAll();
}

// 链表头部出队
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}

// 链表头部出队
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
}

待发布事件队列中有两个字段,其中 head 是真链表,标识链表的头部元素,tail 是假链表,它是真链表尾部元素的引用,标识链表的尾部元素,它的作用是方便在链表尾部插入元素,元素数据中有 next 字段指向下一个元素,下图展示了元素入队时真假链表的变化情况:

箭头向右表示元素入队

20220603122155865

  1. 初始时,headtail 均为 null
  2. 当第一个元素入队时,headtail 均为 event1
  3. 当第二个元素入队时,event2 插入 head 的尾部,tail 的引用更新为 event2
  4. 当第三个元素入队时,event3 插入 head 的尾部,tail 的引用更新为 event3

当插入更多元素时依次类推,head 为链表的头部元素,tail 为链表的尾部元素。

元素出队的情况,正好与元素入队相反,下图展示了元素出队时真假链表的变化情况:

箭头向左表示元素出队

20220603124949865

  1. 当链表中有三个元素时,出队一个元素,即取出 head 头部元素 event1,然后把 headnext 元素 event2 指向为 head,判断 head 是否为 null,如果是,则认为链表中没有数据,将 headtail 均置为 null, 否则 tail 尾部元素不变
  2. 当链表中有两个元素时,出队一个元素,即取出 head 头部元素 event2,然后把 headnext 元素 event3 指向为 head,判断 head 是否为 null,如果是,则认为链表中没有数据,将 headtail 均置为 null, 否则 tail 尾部元素不变
  3. 当链表中有一个元素时,出队一个元素,即取出 head 头部元素 event3,然后把 headnext 元素 null 指向为 head,判断 head 是否为 null,如果是,则认为链表中没有数据,将 headtail 均置为 null, 否则 tail 尾部元素不变

待发布事件队列元素

待发布事件队列的元素是 PendingPost,它是单向链表结构,也是订阅者和订阅方法与事件的包装类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Object event;
Subscription subscription;
PendingPost next;

// 私有构造方法
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}

// 获取一个元素
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}

// 释放一个元素
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}

看了 PendingPost 的源码,笔者想到了 Android 中 Message 的实现,我们在 Android 中使用 Handler 发消息时,是不是也经常使用 handler.obtainMessage() 或者使用 Message.obtain() 去获取一个 Message 的实例,Handler 使用完 Message 后会自动调用 Message#recycle() 方法回收利用这个 Message 实例,正好对应 PendingPost#releasePendingPost() 方法。Message 使用了「享元模式」达到循环利用对象,避免重复创建的目的,看来 PendingPost 也是使用了「享元模式」,在第一篇 Android-EventBus修改纪实 中我们也提到 FindState 也是使用了「享元模式」。

HandlerPoster

HandlerPoster 用于 MAINMAIN_ORDERED 线程模型,将事件分发到主线程处理。

HandlerPoster 继承自 Handler 传入 MainLooper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class HandlerPoster extends Handler implements Poster {

// 待发布事件队列
private final PendingPostQueue queue;

// 单轮最大处理时长,默认10毫秒
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;

// 是否活跃,可以理解为队列是否还有数据,并发修改由 this 守护
private boolean handlerActive;

public HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}

// 事件入队
public void enqueue(Subscription subscription, Object event) {
// 获取队列元素(事件包装类型)
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 元素入队
queue.enqueue(pendingPost);

// 不活跃时,发送一个空的消息给当前Handler
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

// 分发事件
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
// 获取处理此轮消息的开始时间
long started = SystemClock.uptimeMillis();
while (true) {
// 不加锁,快速取出头部元素,类似使用双重校验锁获取单例
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
// 如果为空再加锁获取头部元素
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
// 还是为空,重置活跃状态,退出循环
handlerActive = false;
return;
}
}
}

// 分发事件,调用订阅方法
eventBus.invokeSubscriber(pendingPost);

// 每次分发事件后都判断本轮事件分发耗时是否超过单轮最大处理时长,避免 while 死循环阻塞主线程
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
// 超过单轮最大处理时长,重新发送一个空的消息给当前Handler等待调度
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}

HandlerPoster 实现 Poster 接口中的 enqueue 方法用于事件入队,在 handleMessage 方法中调用 EventBus#invokeSubscriber 方法传入待发布事件队列元素分发事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;

// 此处回收元素
PendingPost.releasePendingPost(pendingPost);

// 判断订阅关系是否活跃
if (subscription.active) {
// 反射调用订阅方法
invokeSubscriber(subscription, event);
}
}

HandlerPoster 为避免阻塞主线程,默认单轮事件分发最多执行 10 毫秒,即 10 毫秒内可以分发多个事件,多个订阅方法的执行耗时之和超时 10 毫秒后,就会停止本轮事件分发,重新发送一个空消息给 Handler 等待执行下轮事件分发。这里的处理思路,我们可以借鉴。

BackgroundPoster

BackgroundPoster 用于 BACKGROUND 线程模型,将事件分发到后台线程。

BackgroundPoster 实现了 Runnable 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
final class BackgroundPoster implements Runnable, Poster {

// 待发布事件队列
private final PendingPostQueue queue;
private final EventBus eventBus;

// 是否活跃,可以理解为队列是否还有数据,volatile 禁止重排序,保证多线程可见性
private volatile boolean executorRunning;

BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}

// 事件入队
public void enqueue(Subscription subscription, Object event) {

// 获取队列元素(事件包装类型)
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {

// 元素入队
queue.enqueue(pendingPost);

// 不活跃时,由线程池执行此 Runnable
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}

// 事件分发
@Override
public void run() {
try {
try {
while (true) {
// 最长等待1000毫秒,取出头部元素
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
// 如果为空再加锁获取头部元素
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
// 还是为空,重置活跃状态,退出循环
executorRunning = false;
return;
}
}
}

// 分发事件,调用订阅方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
// 忽略中断事件
}
} finally {
// 此时已退出 while 循环,即认为待发布事件队列为空
executorRunning = false;
}
}
}

BackgroundPoster 同样实现了 Poster 接口中的 enqueue 方法用于事件入队,不过 BackgroundPoster 使用线程池进行事件分发,所以在 run 方法中调用 EventBus#invokeSubscriber 方法传入待发布事件队列元素分发事件。

BackgroundPoster 在处理时间分发时,第一次获取元素时调用了 poll(int) 方法,如果队列为空,最长等待1000毫秒后再次取头部元素,为何要这样实现?

这里笔者猜测是想提高事件分发效率与线程利用率,因为 BackgroundPoster 是在子线程中分发事件,如果队列为空,可以接收最长等待1000毫秒后再次取头部元素,减小在第二次加锁取头部元素还为空的几率,及减小线程切换导致的资源开销,充分利用线程资源,提高事件分发效率。

BackgroundPoster 默认使用 EventBus 中的线程池 CachedThreadPool在事件分发时,BackgroundPoster 忽略了 InterruptedException ,即没有响应线程中断,如果我们使用自定义的线程池,我们在外边关闭线程池后,这可能导致事件分发还在继续。

最后,BACKGROUND 线程模型描述中说使用单线程进行事件分发,为何在 BackgroundPoster 中却是使用的线程池呢?

其实这个问题我们很容易就解答了。因为在 BackgroundPoster 中使用了 executorRunning 变量,这个变量保证执行单轮事件分发时,都使用线程池中的一个线程,单轮事件分发完成后,新的一轮事件分发可能就使用其他的线程了。

AsyncPoster

AsyncPoster 用于 ASYNC 线程模型,将事件分发到异步线程。

AsyncPoster 实现了 RunnablePoster 接口,使用线程池实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class AsyncPoster implements Runnable, Poster {

// 待发布事件队列
private final PendingPostQueue queue;
private final EventBus eventBus;

AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}

// 事件入队
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
// 由线程池执行此 Runnable
eventBus.getExecutorService().execute(this);
}

// 事件分发
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}

AsyncPoster 的实现比较简单,调用 enqueue 方法将事件入队并交由线程池进行事件分发,run 方法处理事件分发。

AsyncPoster 默认使用 EventBus 中的线程池 CachedThreadPoolCachedThreadPool 没有核心线程,允许创建 Integer.MAX_VALUE 个工作线程且线程空闲时的存活时长为 60 秒,如果发布多个耗时较短的事件或多个耗时较长的时间,都会导致创建多个工作线程而浪费资源。

至此,EventBus 的线程调度分析完成。

总结

本文对 EventBus 的线程调度进行了分析,最后我们做下总结:

PendingPostQueue

PendingPostQueue 中,我们学到了如何使用单链表实现先进先出(FIFO)队列。

PendingPost

PendingPost 中,我们学到了「享元模式」的使用以及如何定义单链表数据结构。

HandlerPoster

HandlerPoster 中判断单轮事件分发最大处理时长机制,及时让出主线程的执行权,避免阻塞主线程;不过这个机制适用于耗时短的订阅方法,如果订阅方法耗时较长,一样会阻塞主线程。

BackgroundPoster

BackgroundPoster 中调用 poll(int) 提高事件分发效率和线程利用率,不过 BackgroundPoster 忽略了线程中断异常,使用自定义线程池时需要注意这里,最后我们解答了 BACKGROUND 线程模型描述中说使用单线程进行事件分发,而 BackgroundPoster 却是使用的线程池问题。

AsyncPoster

AsyncPoster 比较简单,一般简单的地方反而意味着是比较危险的,我们通常会看不到它的危险性,因为我们认为它足够简单。

Poster

Poster 说一下吧,设计框架时尽量使用接口或抽象类。

最后,还是那句话:无论使用哪种线程模型,在订阅方法中都应该尽量避免进行耗时操作。

happy~,希望可以帮你更好的使用 EventBus

欢迎关注我的其它发布渠道