12观察者模式

一个对象以来多个对象,一个对象状态改变时,通知多个对象
一个对象成为被观察者,多个对象称为观察者

不同实现方式:
进程内实现:同步阻塞、异步非阻塞(Google Guava EventBus事件总线)
跨进程实现:消息队列

经典实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public interface Subject {

void registerObserver(Observer observer);//attach
void removeObserver(Observer observer);//detach
void notifyObservers(Message message);
}
public interface Observer {
void update(Message message);
}
public class ConcreteSubject implements Subject {
private List<Observer> observers = new ArrayList<Observer>();
@Override
public void registerObserver(Observer observer) {
observers.add(observer);
}
@Override
public void removeObserver(Observer observer) {
observers.remove(observer);
}
@Override
public void notifyObservers(Message message) {
for (Observer observer : observers) {
observer.update(message);
}
}
}
public class ConcreteObserverOne implements Observer {
@Override
public void update(Message message) {
//TODO: 获取消息通知,执行自己的逻辑...
System.out.println("ConcreteObserverOne is notified.");
}
}
public class ConcreteObserverTwo implements Observer {
@Override
public void update(Message message) {
//TODO: 获取消息通知,执行自己的逻辑...
System.out.println("ConcreteObserverTwo is notified.");
}
}
public class Demo {
public static void main(String[] args) {
ConcreteSubject subject = new ConcreteSubject();
subject.registerObserver(new ConcreteObserverOne());
subject.registerObserver(new ConcreteObserverTwo());
subject.notifyObservers(new Message());
}
}

EventBus使用

EventBus、AsyncEventBus
EventBus#register()注册
EventBus#unregister()删除某个观察者
EventBus#post()发消息
@Subscribe注解,定义接收事件类型

1
2
3
4
5
6
7
8
9
10
public DObserver {
//...省略其他属性和方法...

@Subscribe
public void f1(PMsg event) { //... }

@Subscribe
public void f2(QMsg event) { //... }
}

实现

Subscribe

标明观察者中的哪个函数可以接收消息

1
2
3
4
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Beta
public @interface Subscribe {}

ObserverAction

表示@Subscribe 注解的方法
target表示观察者类,method表示方法。主要用在ObserverRegistry观察者注册表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ObserverAction {
private Object target;
private Method method;
public ObserverAction(Object target, Method method) {
this.target = Preconditions.checkNotNull(target);
this.method = method;
this.method.setAccessible(true);
}
public void execute(Object event) { // event是method方法的参数
try {
method.invoke(target, event);
} catch (InvocationTargetException | IllegalAccessException e) {
e.printStackTrace();
}
}
}

ObserverRegistry

Observer注册表
CopyOnWriteArraySet解决并发写问题

EventBus

实现阻塞同步的观察者模式
MoreExecutors.directExecutor()是Google Guava提供的工具类是单线程。这么实现是为了跟AsyncEventBus统一代码逻辑,代码复用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class EventBus {
private Executor executor;
private ObserverRegistry registry = new ObserverRegistry();
public EventBus() {
this(MoreExecutors.directExecutor());
}
protected EventBus(Executor executor) {
this.executor = executor;
}
public void register(Object object) {
registry.register(object);
}
public void post(Object event) {
List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
for (ObserverAction observerAction : observerActions) {
executor.execute(new Runnable() {
@Override
public void run() {
observerAction.execute(event);
}
});
}
}
}

AsyncEventBus

实现异步非阻塞的观察者模式

1
2
3
4
5
public class AsyncEventBus extends EventBus {
public AsyncEventBus(Executor executor) {
super(executor);
}
}