关注

Reactive编程:数据流和观察者

在这里插入图片描述

在这里插入图片描述

第二部分:Reactive编程核心概念

2.1 数据流(Data Stream)

2.1.1 数据流的基本概念

数据流(Data Stream)是Reactive编程的核心概念之一,它代表一系列按时间顺序排列的事件或数据项的集合。在传统的命令式编程中,数据通常是静态的,程序通过顺序执行指令来处理数据。而在Reactive编程中,数据被视为动态的、持续变化的流,程序通过订阅这些流来响应数据的变化。

数据流的特点包括:

  1. 时间连续性:数据流中的事件是按时间顺序发生的。
  2. 异步性:数据项可能在任何时间点到达,不受程序控制流约束。
  3. 不可变性:流中的数据一旦发出就不能更改,只能通过转换操作生成新流。
  4. 可组合性:多个数据流可以通过操作符(Operators)进行组合、转换和过滤。

2.1.2 数据流的类型

在Reactive编程中,数据流可以分为以下几种类型:

(1) 冷流(Cold Stream)
  • 特点:数据流的生成由订阅者触发,每个订阅者都会收到完整的数据序列。
  • 示例:从文件读取数据、HTTP请求返回的响应流。
  • 代码示例(RxJS)
    import { of } from 'rxjs';
    
    const coldStream$ = of(1, 2, 3); // 冷流
    coldStream$.subscribe(value => console.log(`Subscriber 1: ${value}`));
    coldStream$.subscribe(value => console.log(`Subscriber 2: ${value}`));
    
    // 输出:
    // Subscriber 1: 1
    // Subscriber 1: 2
    // Subscriber 1: 3
    // Subscriber 2: 1
    // Subscriber 2: 2
    // Subscriber 2: 3
    
(2) 热流(Hot Stream)
  • 特点:数据流独立于订阅者存在,订阅者只能收到订阅后发出的数据。
  • 示例:鼠标移动事件、WebSocket实时消息。
  • 代码示例(RxJS)
    import { fromEvent, interval } from 'rxjs';
    
    const hotStream$ = fromEvent(document, 'click'); // 热流(鼠标点击事件)
    setTimeout(() => {
      hotStream$.subscribe(event => console.log(`Subscriber: Click at ${event.timeStamp}`));
    }, 3000);
    
    // 3秒后订阅,只能收到3秒后的点击事件
    
(3) 有限流(Finite Stream)
  • 特点:数据流会在某个时刻结束(如HTTP请求完成)。
  • 示例:API请求返回的单个响应。
(4) 无限流(Infinite Stream)
  • 特点:数据流可能永远不会结束(如传感器数据、实时股票行情)。
  • 示例interval 生成的定时数据流。

2.1.3 数据流的操作符

Reactive编程提供丰富的操作符(Operators)来处理数据流,常见的操作符包括:

类别操作符示例功能描述
创建流of, from, interval从静态数据、数组或时间间隔创建流
转换流map, scan, buffer对数据项进行转换或累积计算
过滤流filter, take, skip根据条件筛选或限制数据项
组合流merge, concat, zip合并多个流的数据
错误处理catchError, retry捕获和处理流中的错误
高级调度debounceTime, throttle控制数据流的发射频率

代码示例(RxJS 操作符组合)

import { fromEvent } from 'rxjs';
import { map, filter, debounceTime } from 'rxjs/operators';

const input = document.querySelector('input');
fromEvent(input, 'input')
  .pipe(
    map(event => event.target.value), // 提取输入值
    filter(text => text.length > 3), // 过滤长度≤3的输入
    debounceTime(500) // 防抖(500ms内无新输入才发射)
  )
  .subscribe(value => console.log(`Search for: ${value}`));

2.1.4 数据流的实际应用

(1) 实时搜索建议
// 使用RxJS实现搜索框自动补全
searchInput$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => fetch(`/api/search?q=${query}`))
).subscribe(results => updateUI(results));
(2) 游戏开发(角色移动)
// 使用键盘事件流控制角色移动
const keyPress$ = fromEvent(document, 'keydown');
const move$ = keyPress$.pipe(
  filter(key => ['ArrowUp', 'ArrowDown'].includes(key.code)),
  map(key => key.code === 'ArrowUp' ? 1 : -1)
);

move$.subscribe(delta => character.y += delta * SPEED);
(3) 金融交易数据聚合
// 合并多个股票行情流
const stock1$ = stockFeed('AAPL');
const stock2$ = stockFeed('GOOG');

merge(stock1$, stock2$)
  .pipe(bufferTime(1000)) // 每1秒聚合一次
  .subscribe(prices => calculatePortfolioValue(prices));

2.1.5 数据流的背压问题(Backpressure)

当数据生产速度超过消费速度时,系统可能因资源耗尽而崩溃。Reactive编程通过以下策略处理背压:

  1. 丢弃策略throttlesample
  2. 缓冲策略bufferwindow
  3. 反馈控制:响应式流规范(Reactive Streams)的request(n)机制

代码示例(背压处理)

// Reactor(Java)中的背压控制
Flux.range(1, 1000)
    .onBackpressureBuffer(100) // 缓冲100个元素
    .subscribe(
        value -> processSlowly(value),
        err -> handleError(err),
        () -> System.out.println("Done")
    );

2.2 观察者模式(Observer Pattern)

2.2.1 观察者模式基础

观察者模式是Reactive编程的底层设计模式,它定义了一对多的依赖关系:当一个对象(Subject)的状态改变时,所有依赖它的对象(Observers)会自动收到通知并更新。

模式角色
角色描述
Subject维护观察者列表,提供注册/注销方法,通知状态变化
Observer定义更新接口,接收Subject的通知并执行响应逻辑
ConcreteSubject具体的被观察对象,存储状态并在变化时通知观察者
ConcreteObserver具体的观察者,实现更新逻辑

2.2.2 观察者模式实现

经典实现(Java)
// 主题接口
interface Subject {
    void registerObserver(Observer o);
    void removeObserver(Observer o);
    void notifyObservers();
}

// 具体主题(温度传感器)
class TemperatureSensor implements Subject {
    private List<Observer> observers = new ArrayList<>();
    private float temperature;

    public void setTemperature(float temp) {
        this.temperature = temp;
        notifyObservers();
    }

    @Override
    public void registerObserver(Observer o) {
        observers.add(o);
    }

    @Override
    public void notifyObservers() {
        for (Observer o : observers) {
            o.update(temperature);
        }
    }
}

// 观察者接口
interface Observer {
    void update(float temperature);
}

// 具体观察者(温度显示器)
class TemperatureDisplay implements Observer {
    @Override
    public void update(float temp) {
        System.out.println("当前温度: " + temp + "°C");
    }
}

// 使用示例
public class Main {
    public static void main(String[] args) {
        TemperatureSensor sensor = new TemperatureSensor();
        sensor.registerObserver(new TemperatureDisplay());

        sensor.setTemperature(25.5f); // 输出:当前温度: 25.5°C
    }
}
Reactive扩展(RxJS)
// 创建一个可观察对象(Subject)
const subject = new rxjs.Subject();

// 订阅观察者
const subscription1 = subject.subscribe(
  value => console.log(`Observer 1: ${value}`)
);

const subscription2 = subject.subscribe(
  value => console.log(`Observer 2: ${value}`)
);

// 发送数据
subject.next('Hello');
subject.next('World');

// 输出:
// Observer 1: Hello
// Observer 2: Hello
// Observer 1: World
// Observer 2: World

2.2.3 观察者模式与Reactive编程的关系

  1. 数据流即Subject:Reactive框架中的Observable本质上是增强版的Subject,支持多播和操作符。
  2. 观察者即Subscriber:订阅者通过subscribe()方法注册回调,相当于观察者模式的update()
  3. 扩展能力
    • 多播(Multicast):一个数据流被多个观察者共享
    • 生命周期管理:complete()error()通知
    • 操作符链式调用:mapfilter等转换操作

2.2.4 观察者模式的实际应用

(1) 用户界面事件处理
// 按钮点击事件观察
const button = document.getElementById('myButton');
const click$ = fromEvent(button, 'click');

click$.subscribe(event => {
  console.log('Button clicked at:', event.timeStamp);
});
(2) 状态管理(Redux Store)
// Redux的Store本质上是一个Subject
const store = createStore(reducer);
store.subscribe(() => {
  console.log('State changed:', store.getState());
});

store.dispatch({ type: 'INCREMENT' });
(3) WebSocket实时通信
const socket = new WebSocket('ws://example.com');
const message$ = new Subject();

socket.onmessage = event => {
  message$.next(event.data);
};

message$.subscribe(data => {
  console.log('Received:', data);
});

2.2.5 观察者模式的优缺点

优点
  • 松耦合:Subject和Observer之间无直接依赖
  • 动态关系:可运行时添加/删除观察者
  • 广播通信:一次状态变更可通知多个观察者
缺点
  • 内存泄漏风险:未正确注销观察者会导致引用残留
  • 通知顺序不可控:观察者被调用的顺序可能影响系统行为
  • 调试困难:数据流动的隐式传播可能增加调试复杂度

2.2.6 观察者模式的变体

变体描述Reactive实现
发布-订阅模式引入事件通道,解耦发布者和订阅者RxJS的Subject
响应式属性(Binding)自动同步对象属性变化(如Vue的v-model)MobX的observable
数据总线(Event Bus)全局事件中心,任意组件可发布/订阅事件Vue的EventEmitter

总结

  • 数据流是Reactive编程的核心抽象,代表随时间变化的事件序列,可通过操作符进行灵活转换。
  • 观察者模式是Reactive系统的底层机制,通过订阅/通知机制实现数据变化的自动传播。
  • 两者结合使得Reactive编程能够高效处理异步、实时数据,适用于从UI交互到分布式系统的广泛场景。

转载自CSDN-专业IT技术社区

版权声明:本文为博主原创文章,遵循 CC 4.0 BY 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/sixpp/article/details/146636931

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--