跳至主要內容

观察者模式

Mr.Hope...大约 7 分钟

观察者是一种行为设计模式, 允许一个对象将其状态的改变通知其他对象

观察者模式提供了一种作用于任何实现了订阅者接口的对象的机制, 可对其事件进行订阅和取消订阅。

使用示例: 观察者模式在 Java 代码中很常见, 特别是在 GUI 组件中。 它提供了在不与其他对象所属类耦合的情况下对其事件做出反应的方式。

这里是核心 Java 程序库中该模式的一些示例:

识别方法: 该模式可以通过将对象存储在列表中的订阅方法, 和对于面向该列表中对象的更新方法的调用来识别。

观察者模式其实也是发布订阅模式。 针对不同的观察者需要有不同的实现方式,所以先创建一个管理者的接口,将其定义为一个抽象概念,方便后续扩展。 这个接口相当于-群(管理者)

/**
 * 观察者的顶层接口
 * @param <T>
 */
public interface ObserverInterface<T> {
    //注册监听者
    public void registerListener(T t);
    //移除监听者
    public void removeListener(T t);
    //通知监听者
    public void notifyListener(DataEvent t);
}

定义抽象的监听者接口 这个接口相当于-群成员(监听者)

/**
 * Listener的顶级接口,为了抽象Listener而存在
 */
public interface MyListener {
    void onEvent(DataEvent event);
}

定义抽象的事件接口 这个接口相当于群里面发布的通知

@Data
public abstract class DataEvent {
    private String msg;
}

创建管理者的实现类,相当于具体的群(如微信群,钉钉群)

/**
 * 循环调用方式的观察者(同步)
 */
@Component
public class LoopObserverImpl implements ObserverInterface<MyListener> {
    //监听者的注册列表
    private List<MyListener> listenerList = new ArrayList<>();
    @Override
    public void registerListener(MyListener listener) {
        listenerList.add(listener);
    }

    @Override
    public void removeListener(MyListener listener) {
        listenerList.remove(listener);
    }

    @Override
    public void notifyListener(DataEvent event) {
        for (MyListener myListener : listenerList) {
            myListener.onEvent(event);
        }
    }
}

创建两个event的实现类,一个是积分事件,一个是短信事件

/**
 * 积分事件类
 */
public class ScoreDataEvent extends DataEvent {
    private Integer score;
}

/**
 * 短信事件类
 */
public class SmsDataEvent extends DataEvent {
    private String phoneNum;
}

创建两个listener的实现类,一个是处理积分的,一个是处理短信的

/**
 * MyListener的实现类,分数监听者
 */
@Component
public class MyScoreListener implements MyListener {
    @Override
    public void onEvent(DataEvent dataEvent) {
        if (dataEvent instanceof ScoreDataEvent) {
            //...省略业务逻辑
            System.out.println("积分处理:" + dataEvent.getMsg());
        }
    }
}

/**
 * MyListener的实现类,短信监听者
 */
@Component
public class MySmsListener implements MyListener {
    @Override
    public void onEvent(DataEvent dataEvent) {
        if (dataEvent instanceof SmsDataEvent) {
            //...省略短信处理逻辑
            System.out.println("短信处理");
        }
    }
}

观察者模式的要素就到齐了,我们在main方法里面跑一下

public class Operator {
    public static void main(String[] args) {
        //通过spring的AnnotationConfigApplicationContext将com.example.demo.user.admin.design路径下的所有加了spring注解的类都扫描放入spring容器
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext("com.example.demo.user.admin.design");
        //从spring容器中获取对应bean的实例
        LoopObserverImpl loopObserver = context.getBean(LoopObserverImpl.class);
        MyScoreListener scoreL = context.getBean(MyScoreListener.class);
        MySmsListener smsL = context.getBean(MySmsListener.class);

        //向观察者中注册listener
        loopObserver.registerListener(scoreL);
        loopObserver.registerListener(smsL);
        ScoreDataEvent scoreData = new ScoreDataEvent();
        scoreData.setMsg("循环同步观察者");
        //发布积分事件,通知监听者
        loopObserver.notifyListener(scoreData);

        /*******************************************/
        //从spring容器获取QueueObserverImpl观察者
        
        QueueObserverImpl queueObserver = context.getBean(QueueObserverImpl.class);
        //向观察者中注册listener
        queueObserver.registerListener(scoreL);
        queueObserver.registerListener(smsL);
        ScoreDataEvent scoreData1 = new ScoreDataEvent();
        scoreData1.setMsg("队列异步观察者");
        //发布积分事件,通知监听者
        queueObserver.notifyListener(scoreData1);
    }
}

接下来看看下面这个新的观察者实现类和上面示例中的的观察者实现类LoopObserverImpl有什么不同吗

/**
 * 启动一个线程循环阻塞队列的观察者,可以实现解耦异步。
 */
@Component
public class QueueObserverImpl implements ObserverInterface<MyListener> {
    //监听者的注册列表
    private List<MyListener> listenerList = new ArrayList<>();
    //创建一个大小为10的阻塞队列
    private BlockingQueue<DataEvent> queue = new LinkedBlockingQueue<>(10);
    //创建一个线程池
    private ExecutorService executorService = new ScheduledThreadPoolExecutor(1, r -> {
        Thread t = new Thread(r);
        t.setName("com.kangarooking.observer.worker");
        t.setDaemon(false);
        return t;
    });
//    private ExecutorService executorService = Executors.newFixedThreadPool(1);

    @Override
    public void registerListener(MyListener listener) {
        listenerList.add(listener);
    }

    @Override
    public void removeListener(MyListener listener) {
        listenerList.remove(listener);
    }

    @Override
    public void notifyListener(DataEvent event) {
        System.out.println("向队列放入DataMsg:" + event.getMsg());
        queue.offer(event);
    }

    @PostConstruct
    public void initObserver() {
        System.out.println("初始化时启动一个线程");
        executorService.submit(() -> {
            while (true) {
                try {
                    System.out.println("循环从阻塞队列里面获取数据,take是阻塞队列没有数据就会阻塞住");
                    DataEvent dataMsg = queue.take();
                    System.out.println("从阻塞队列获取到数据:" + dataMsg.getMsg());
                    eventNotify(dataMsg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    private void eventNotify(DataEvent event) {
        System.out.println("循环所有的监听者");
        for (MyListener myListener : listenerList) {
            myListener.onEvent(event);
        }
    }
}

不同之处就是引入了阻塞队列,让通知这个操作变成异步操作,既只需要将event时间放入阻塞队列之后就可以直接返回了。不用像LoopObserverImpl要等到listener注册表循环完毕才能返回。这样就实现了通知操作和循环listener注册表的解耦和异步。

举例说明异步实现和同步实现的区别: 同步:还是团建群的例子,假如领导是保姆型领导,通知下来任务之后可能不太放心,要挨个问,小张你准备什么表演阿,大概多久能准备好鸭。小红你呢→_→。。。 异步:假如是甩手掌柜型领导,发布完消息之后他就不管了。 上面就是同步和异步的区别,同步就是领导是个保姆,挨个问挨个了解情况之后这个事情才算完。异步就是领导发布完消息就完事儿。

开源框架的实现

同步方式

spring的发布订阅就是基于同步的观察者模式: 简单来说就是将所有的监听者注册到一个列表里面,然后当发布事件时,通过循环监听者列表,在循环里面调用每个监听者的onEvent方法,每个监听者实现的在onEvent方法里面判断传入的event是否属于当前需要的event,属于就处理该事件,反之不处理。

spring的ApplicationEventMulticaster就是示例讲的观察者顶层接口

image-20240428161700260

ApplicationListener就是示例代码的监听者顶层接口

image-20240428163132977

ApplicationEventMulticastermulticastEvent方法就是通知方法,这里就是循环监听者注册表,调用每个监听者的onApplicationEvent方法(这里的invokeListener方法里面最终会调用到listener.onApplicationEvent(event);

image-20240428163851397image-20240428163932735

异步方式

nacos中有很多地方都使用到了观察者模式,如client端和server端建立连接,发布连接事件,相关监听者做相应的处理,断开连接也是一样。

在server端接收到client端的注册请求后,会发布一个注册事件的通知

在nacos-server启动的时候也是会开启一个线程做死循环,循环的去queue里面take数据,如果没有的话就会阻塞。所以死循环只有在queue里面一直有数据的时候才会一直循环,当queue里面没有数据的时候就会阻塞在queue.take();方法处。

image-20240428164306039

我们看看receiveEvent(event);方法里面做了什么,这里就体现了框架里面设计的精妙:在上面我们自己的设计中,这里应该是需要循环调用所有的listener的onApplicationEvent方法,但是当注册表中listener太多的时候就会出现(有些event可能会有多个listener需要处理)循环调用太慢的问题,这里使用多线程的处理方式,让这些调用并行处理,大大的提高了框架的事件处理效率。

image-20240428164329178