关注

响应式编程之Flow框架

一、技术背景与产生原因

1.1 响应式编程的兴起

在分布式系统和大数据时代,应用程序需要处理:

  • 高并发请求和海量数据流
  • 异步和非阻塞I/O操作
  • 实时数据处理需求

1.2 响应式流规范(Reactive Streams)

Java Flow API 是 Java 9 对响应式流规范(Reactive Streams Specification)的实现,该规范定义了:

  • 异步流处理的标准
  • 非阻塞背压(backpressure)机制
  • 四个核心接口:Publisher、Subscriber、Subscription、Processor

1.3 解决的问题

  • 背压管理:防止快速生产者淹没慢速消费者
  • 资源控制:允许消费者控制数据流量
  • 统一标准:提供跨库的互操作性

1.4 响应式编程

  1. 底层: 基于数据缓冲队列 + 消息驱动模型 + 异步回调机制(事件驱动)
  2. 编码: 流式编程 + 链式调用 + 声明式API
  3. 效果: 优雅全异步 + 消息实时处理 + 高吞量 + 占用少量资源

二、Flow API核心组件

2.1 核心概念

  • Publisher: 发布者,负责发布一系列元素给订阅者。
  • Subscriber: 订阅者,接收来自发布者的元素。
  • Subscription: 订阅关系,定义了发布者和订阅者之间的联系,包括请求元素的数量和取消订阅的操作。
  • Processor: 处理器,同时作为发布者和订阅者,可以修改或转换数据流中的元素。
组件角色核心方法作用
Flow.Publisher数据生产者void subscribe(Subscriber<? super T> subscriber)接收订阅者,负责生产并发送数据
Flow.Subscriber数据消费者onSubscribe(Subscription s)onNext(T item)onError(Throwable t)onComplete()接收并处理数据,通过Subscription反馈处理能力
Flow.Subscription订阅关系void request(long n)void cancel()管理生产者与消费者的关系,实现背压(通过request(n)请求 n 个数据)
Flow.Processor数据处理器继承PublisherSubscriber同时作为消费者和生产者,用于数据转换、过滤等中间处理

2.2 接口关系图

subscribe
onSubscribe
request
onNext
onError/onComplete
cancel
Publisher
Subscriber
Subscription
Processor
Publisher
Subscriber

2.2 接口详解

Flow 框架由四个核心接口构成,遵循“发布-订阅”模式:

1. Publisher<T> 数据发布者

发布者:负责生成数据并发送给订阅者

@FunctionalInterface
public static interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

Publisher 是数据的源头,通过 subscribe() 方法注册订阅者。当有新的订阅时,会调用订阅者的 onSubscribe() 方法并传入 Subscription 对象。

另外它还有一个较为常用的实现类SubmissionPublisher

public class SubmissionPublisher<T> implements Publisher<T>,
                                               AutoCloseable {}

2. Subscriber<T> 数据订阅者

订阅者:接收并处理数据。

public interface Subscriber<T> {
    // 订阅建立时调用,接收Subscription用于控制流
    void onSubscribe(Subscription subscription);
    
    // 接收数据项
    void onNext(T item);
    
    // 错误通知
    void onError(Throwable throwable);
    
    // 完成通知
    void onComplete();
}

3. Subscription 订阅控制

订阅关系:连接发布者和订阅者,支持背压控制

public interface Subscription {
    // 请求n个数据项
    void request(long n);
    
    // 取消订阅
    void cancel();
}
  1. Processor<T,R> 处理器(既是发布者又是订阅者)

处理器:既是订阅者又是发布者,用于数据转换。

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    // 同时具备 Subscriber 和 Publisher 的方法
}

Processor 是数据处理的中间节点,既可以订阅上游数据,也可以向下游发布处理后的数据

2.3 背压机制

背压(Backpressure)是 Flow API 的核心特性,允许**订阅者控制接收数据的速率,防止被数据淹没。**通过 Subscription.request(n) 方法,订阅者可以明确告知发布者自己准备接收多少数据。

Publisher Subscriber Subscription subscribe(Subscriber) onSubscribe(Subscription) request(n) 内部记录请求 onNext(item) 处理数据 request(1) // 请求下一个 loop [数据处理循环] onComplete() cancel() Publisher Subscriber Subscription

在异步数据流中,如果数据生产的速度超过了消费的速度,可能会导致系统资源被耗尽,如内存溢出等问题。背压机制提供了一种方式,使得消费者能够向生产者发出信号,告诉生产者减慢数据生产的速率或者暂时停止发送数据,直到消费者准备好处理更多的数据为止。

背压工作原理

  • 流量控制:背压机制本质上是一种流量控制的方法。它允许消费者根据自己的处理能力来调整从生产者那里接收数据的速率。这可以防止因快速产生的数据淹没消费者而导致的问题。
  • 反馈环路:当消费者的处理能力达到上限时,它会通过某种机制向生产者发送一个信号,表明自己不能再接受更多的数据。这个信号构成了一个反馈环路,允许生产者知道何时应该放慢数据产生速度或暂停发送数据。
  • 策略选择:不同的响应式编程库或框架可能提供了不同的背压策略供开发者选择,例如:
    • 缓冲:临时存储无法立即处理的数据项。
    • 丢弃:简单地丢弃超出消费者处理能力的数据项。
    • 错误通知:当超过一定的阈值时,向生产者发送错误通知。

三、完整示例

3.1 入门示例

package cn.tcmeta.flow;

import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

/**
 * @author: laoren
 * @description: 基础发布-订阅(含背压)
 * @version: 1.0.0
 */
public class BasicFlowExample {
    static void main() {
        // 1. 创建发布者
        try(SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
            // 2. 创建订阅者
            Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
                // 订阅控制
                private Flow.Subscription subscription;
                // 第一步:建立连接
                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    this.subscription = subscription;
                    System.out.println("✅ 订阅成功,请求 2 个数据");
                    subscription.request(2); // 背压: 先要2个数据
                }

                // 第二步:接收数据
                @Override
                public void onNext(String item) {
                    System.out.println("📩 收到数据: " + item);

                    // 模拟处理耗时操作
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }

                    System.out.println("👉 请求下一个");
                    subscription.request(1);
                }

                @Override
                public void onError(Throwable throwable) {
                    System.err.println("❌ 错误: " + throwable.getMessage());
                }

                @Override
                public void onComplete() {
                    System.out.println("🎉 数据流结束");
                }
            };

            // 3. 订阅
            publisher.subscribe(subscriber);

            // 4. 发送数据
            List<String> dataList = List.of("A", "B", "C", "D", "E", "F");
            for (String data : dataList) {
                // 发布数据, 返回值: 订阅者之间的估计最大延迟
                int accepted = publisher.submit(data);
                System.out.println("数据发布: " + data + ", 延迟: " + accepted);
            }

            // 5. 关闭发布者(触发onComplete)
            // publisher.close();
        } // 6. 自动调用publisher.close()
        
        try {
            TimeUnit.MILLISECONDS.sleep(3000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

🔑 关键点

  • 订阅者通过 request(n) 控制消费速度(背压)
  • submit() 非阻塞,可能失败(缓冲区满)
  • try-with-resources 自动关闭发布者 → 触发 onComplete

3.2 基础发布-订阅示例

定义发布者

package cn.tcmeta.flow;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * @author: laoren
 * @description: 简单数据发布者示例
 * @version: 1.0.0
 */
public class SimplePublisher {
    private final SubmissionPublisher<Integer> publisher;

    public SimplePublisher() {
        this.publisher = new SubmissionPublisher<>(
                Runnable::run,
                Flow.defaultBufferSize()
        );
    }

    public void startPublishing(){
        // 发布100个数据项
        IntStream.range(1, 101).forEach(i -> {
            System.out.println("数据发布: " + i);
            publisher.submit(i);

            try {
                TimeUnit.MILLISECONDS.sleep(30);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        });

        publisher.close();
    }

    public Flow.Publisher<Integer> getPublisher(){
        return publisher;
    }
}

定义订阅者

package cn.tcmeta.flow;

import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;

/**
 * @author: laoren
 * @description: Flow.Subscriber 订阅者
 * @version: 1.0.0
 */
public class SimpleSubscriber implements Flow.Subscriber<Integer> {
    private Flow.Subscription subscription;
    private final String name;
    private final long requestSize;
    private long receivedCount  = 0;

    public SimpleSubscriber(String name, long requestSize) {
        this.name = name;
        this.requestSize = requestSize;
    }

    /**
     * 订阅事件源, 在事件发生时,会回调这个函数
     * @param subscription a new subscription
     */
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println(name + ": 订阅建立!");
        this.subscription = subscription;
        // 初始请求一批数据
        subscription.request(requestSize);
    }

    /**
     * 接收到数据时,会回调这个函数
     * @param item the item
     */
    @Override
    public void onNext(Integer item) {
        receivedCount ++;
        System.out.println("-----------------------------------------");
        System.out.println(name + ": 接收到数据: " + item);

        // 每处理requestSize个数据后请求下一批
        if(receivedCount % requestSize == 0){
            System.out.println(name + ": 请求下一批数据 - (" + requestSize + " )" + " 个");
            subscription.request(requestSize);
        }

        // 模拟处理延迟
        try {
            TimeUnit.MILLISECONDS.sleep(1000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    /**
     * 当数据处理异常时调用
     * @param throwable the exception
     */
    @Override
    public void onError(Throwable throwable) {
        System.out.println(name + ": 发生错误: " + throwable.getMessage());
    }

    /**
     * 当数据处理完成时调用
     */
    @Override
    public void onComplete() {
        System.out.println(name + ": 数据处理完成,共接收: " + receivedCount + "个数据");
    }
}

测试代码

package cn.tcmeta.flow;

import java.util.concurrent.TimeUnit;

/**
 * @author: laoren
 * @description: TODO
 * @version: 1.0.0
 */
public class FlowBaseExample {
    static void main() {
        // 1. 创建发布者
        SimplePublisher publisher = new SimplePublisher();

        // 2. 创建订阅者
        SimpleSubscriber simpleSubscriber1 = new SimpleSubscriber("订阅者1", 5);
        SimpleSubscriber simpleSubscriber2 = new SimpleSubscriber("订阅者2", 3);

        // 3. 建立订阅关系
        publisher.getPublisher().subscribe(simpleSubscriber1);
        publisher.getPublisher().subscribe(simpleSubscriber2);

        // 4. 启动发布者
        publisher.startPublishing();
        try {
            TimeUnit.MILLISECONDS.sleep(4000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

创建SubmissionPublisher
创建SimpleSubscriber
注册订阅者到发布者
发布消息
调用onSubscribe方法
调用subscription.request(1)
调用onNext方法处理消息
再次调用subscription.request(1)
完成所有消息处理后调用onComplete方法
SubmissionPublisher SimpleSubscriber onSubscribe() request(1) onNext("Hello, Reactive World!") request(1) onComplete() SubmissionPublisher SimpleSubscriber
发布者(Publisher) 订阅关系(Subscription) 消费者(Subscriber) 建立订阅关系 订阅(subscribe) 创建Subscription并调用onSubscribe(S) 调用request(1)(请求1个数据) 数据交互过程 调用onNext(数据1) 处理数据1 调用request(1)(请求下一个数据) 调用onNext(数据2) 处理数据2 调用request(1) ...更多数据... loop [数据发送循环] 处理完成 数据发送完毕 调用onComplete() 执行完成逻辑 发布者(Publisher) 订阅关系(Subscription) 消费者(Subscriber)

应用场景

  1. 实时数据处理:例如股票市场数据、社交媒体更新等需要即时处理的信息流。
  2. 高并发Web应用:使用Flow API可以有效地管理大量的并发连接,减少服务器资源消耗。
  3. 事件驱动系统:适用于需要监听并响应特定事件的应用程序

实践经验

  • 避免阻塞操作:确保在onNext, onSubscribe, onErroronComplete方法中不执行耗时或阻塞操作,以免影响整个流的性能。
  • 合理设置请求数量:通过request(long n)方法控制从发布者获取元素的速度,防止内存溢出或性能下降。
  • 错误处理机制:良好的错误处理策略是保证系统稳定性的关键,应该正确地利用onError方法处理异常情况。

3.3 带背压控制的完整示例

背压是 Flow 框架的核心特性,允许消费者根据自身处理能力控制生产者的发送速度。下面示例展示当消费者处理速度慢于生产者时,背压如何避免数据积压。

package cn.tcmeta.flow;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class BackpressureExample {
     static void main(String[] args) throws InterruptedException {
        // 创建发布者(使用JDK提供的SubmissionPublisher)
        try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {

            // 创建慢消费者(处理速度慢于生产速度)
            SlowSubscriber subscriber = new SlowSubscriber();
            publisher.subscribe(subscriber);

            // 快速生产数据(生产速度快于消费速度)
            System.out.println("开始快速生产数据...");
            for (int i = 1; i <= 10; i++) {
                // submit()返回当前未处理的消息数(背压作用下会逐渐增加)
                int pending = publisher.submit(i);
                System.out.println("生产数据: " + i + ",当前未处理数: " + pending);
                TimeUnit.MILLISECONDS.sleep(100);  // 快速生产(100ms/个)
            }

            // 等待所有数据处理完成
            while (publisher.hasSubscribers()) {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }

    // 慢消费者(处理速度慢,展示背压效果)
    static class SlowSubscriber implements Flow.Subscriber<Integer> {
        private Flow.Subscription subscription;
        private int bufferSize = 2;  // 消费者缓冲区大小(一次最多处理2个数据)

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            // 初始请求bufferSize个数据(告知生产者自己的处理能力)
            System.out.println("消费者初始化,请求" + bufferSize + "个数据");
            subscription.request(bufferSize);
        }

        @Override
        public void onNext(Integer item) {
            System.out.println("消费者开始处理数据: " + item);

            // 模拟慢速处理(500ms/个,慢于生产速度)
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }

            System.out.println("消费者完成处理: " + item);

            // 每处理1个数据,再请求1个(维持缓冲区大小)
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("处理错误: " + throwable.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("所有数据处理完成");
        }
    }
}


在这里插入图片描述

生产者 缓冲区 消费者 初始化阶段 订阅并请求2个数据 发送数据1 发送数据2 传递数据1 传递数据2 背压生效阶段 尝试发送数据3(缓冲区满) 缓冲区已满,暂停生产 处理数据1(耗时500ms) 处理完成,请求1个数据 有空间了,可继续生产 发送数据3 传递数据3 处理数据2(耗时500ms) 处理完成,请求1个数据 有空间了,可继续生产 发送数据4 传递数据4 ...循环直到所有数据处理完成... 生产者 缓冲区 消费者

3.4 Processor处理器示例

使用Processor时行数据转换

package cn.tcmeta.flow;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * @author: laoren
 * @description: 自定义 Processor:将 Integer 转为 String
 * @version: 1.0.0
 */
public class NumberToStringProcessor extends SubmissionPublisher<String>
        implements Flow.Processor<Integer, String> {
    // 绑定关系
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // 向上游请求数据
    }

    @Override
    public void onNext(Integer item) {
        // 转换数据发布
        submit("Number: " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        closeExceptionally(throwable);
    }

    @Override
    public void onComplete() {
        close();
    }
}

测试基本功能

package cn.tcmeta.flow;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * @author: laoren
 * @description: Processor
 * @version: 1.0.0
 */
public class ProcessorExample2 {
    static void main() {
        try (var publisher = new SubmissionPublisher<Integer>();
             var processor = new NumberToStringProcessor()) {

            // 构建数据流:Publisher → Processor → Subscriber
            // 这一步是将发布者(publisher)与处理器(processor)关联起来,从而实现数据流转。
            publisher.subscribe(processor);

            // Processor -> Subscriber
            processor.subscribe(new Flow.Subscriber<>() {
                private Flow.Subscription subscription;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    this.subscription = s;
                    s.request(1);
                    System.out.println("Processor -> Subscriber");
                }

                @Override
                public void onNext(String item) {
                    System.out.println("✅ " + item);
                    this.subscription.request(1);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {
                    System.out.println("✨ 完成");
                }
            });

            // 发送消息
            IntStream.rangeClosed(1, 5).forEach(a -> {
                publisher.submit(a);
                System.out.println("数据发送了~~~ " + a);
            });
        } // 自动关闭

        try {
            TimeUnit.MILLISECONDS.sleep(2000);
        } catch (InterruptedException e) {
        }
    }
}

在这里插入图片描述

Processor是连接生产者和消费者的中间组件,同时实现PublisherSubscriber接口,用于数据转换、过滤或聚合。下面示例实现一个数据过滤和转换的 Processor。

package cn.tcmeta.flow;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class ProcessorExample {
     static void main(String[] args) throws InterruptedException {
        // 1. 创建原始数据发布者(发布整数)
        try (SubmissionPublisher<Integer> sourcePublisher = new SubmissionPublisher<>()) {
            
            // 2. 创建处理器(过滤偶数,并转换为字符串)
            FilterAndConvertProcessor processor = new FilterAndConvertProcessor();
            
            // 3. 建立发布者->处理器->消费者的连接
            sourcePublisher.subscribe(processor);  // 发布者订阅到处理器
            SimpleStringSubscriber subscriber = new SimpleStringSubscriber();
            processor.subscribe(subscriber);  // 处理器订阅到消费者
            
            // 4. 发布原始数据(1-10的整数)
            for (int i = 1; i <= 10; i++) {
                sourcePublisher.submit(i);
                System.out.println("发布原始数据: " + i);
                TimeUnit.MILLISECONDS.sleep(100);
            }
            
            // 等待处理完成
            while (sourcePublisher.hasSubscribers() || processor.hasSubscribers()) {
                TimeUnit.MILLISECONDS.sleep(200);
            }
        }
    }
    
    // 自定义处理器:过滤偶数,将奇数转换为字符串
    static class FilterAndConvertProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {
        private Flow.Subscription subscription;  // 上游订阅关系
        
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            // 向上游请求数据(初始请求3个)
            subscription.request(3);
        }
        
        @Override
        public void onNext(Integer item) {
            System.out.println("处理器接收原始数据: " + item);
            
            // 过滤:只处理奇数
            if (item % 2 != 0) {
                // 转换:整数->字符串
                String converted = "奇数-" + item;
                // 向下游发布处理后的数据
                this.submit(converted);
            }
            
            // 每处理1个,再向上游请求1个
            subscription.request(1);
        }
        
        @Override
        public void onError(Throwable throwable) {
            System.err.println("处理器出错: " + throwable.getMessage());
            // 向下游传递错误
            this.closeExceptionally(throwable);
        }
        
        @Override
        public void onComplete() {
            System.out.println("处理器完成处理");
            // 向下游传递完成信号
            this.close();
        }
    }
    
    // 字符串消费者(接收处理器转换后的数据)
    static class SimpleStringSubscriber implements Flow.Subscriber<String> {
        private Flow.Subscription subscription;
        
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            // 向处理器请求2个数据
            subscription.request(2);
        }
        
        @Override
        public void onNext(String item) {
            System.out.println("消费者接收处理后的数据: " + item);
            // 处理完成后再请求1个
            subscription.request(1);
        }
        
        @Override
        public void onError(Throwable throwable) {
            System.err.println("消费者出错: " + throwable.getMessage());
        }
        
        @Override
        public void onComplete() {
            System.out.println("消费者处理完成");
        }
    }
}

在这里插入图片描述

流程图

发布原始数据
1. 过滤(保留奇数)
2. 转换(整数→字符串)
发布处理后数据
请求数据
请求数据
源发布者(Publisher)
处理器(Processor)
消费者(Subscriber)

时序图

源发布者 处理器 消费者 建立连接 订阅 调用onSubscribe() 请求3个数据 订阅 调用onSubscribe() 请求2个数据 数据处理流程 发送数据1 过滤(1是奇数)→转换为"奇数-1" 发送"奇数-1" 请求1个数据 发送数据2 过滤(2是偶数)→丢弃 请求1个数据 发送数据3 过滤(3是奇数)→转换为"奇数-3" 发送"奇数-3" 请求1个数据 ...更多数据处理... 发送完成信号 发送完成信号 执行完成逻辑 源发布者 处理器 消费者

四、应用场景与实践

4.1 适用场景

  1. 实时数据流处理
    • 金融市场价格流
    • IoT设备数据采集
    • 实时日志处理
  2. 大数据处理管道
    • ETL数据处理
    • 数据转换和 enrichment
    • 流水线式计算
  3. 高并发消息系统
    • 消息队列消费者
    • 事件驱动架构
    • 微服务间通信
  4. 响应式Web应用
    • Server-Sent Events
    • WebSocket数据流
    • 实时UI更新

4.2 实践经验总结

4.2.1 背压策略选择

// 策略1: 丢弃最新数据(适合实时性要求高的场景)
class DropNewestStrategy implements Flow.Subscriber<Data> {
    public void onNext(Data item) {
        if (!isOverloaded()) {
            process(item);
            subscription.request(1);
        }
        // 否则丢弃数据,不请求新数据
    }
}

// 策略2: 缓冲数据(适合数据完整性要求高的场景)
class BufferStrategy implements Flow.Subscriber<Data> {
    private final Queue<Data> buffer = new ConcurrentLinkedQueue<>();
    
    public void onNext(Data item) {
        buffer.offer(item);
        if (canProcess()) {
            process(buffer.poll());
        }
        subscription.request(1);
    }
}

4.2.2 错误处理最佳实践

class ResilientSubscriber implements Flow.Subscriber<Data> {
    private Subscription subscription;
    private int retryCount = 0;
    
    public void onError(Throwable throwable) {
        if (retryCount < MAX_RETRIES) {
            retryCount++;
            System.out.println("重试 #" + retryCount);
            // 重新建立订阅
            // 需要保存对原始Publisher的引用
        } else {
            System.err.println("达到最大重试次数");
        }
    }
}

4.2.3 性能监控指标

class MonitoredPublisher<T> implements Flow.Publisher<T> {
    private final AtomicLong publishCount = new AtomicLong();
    private final AtomicLong requestCount = new AtomicLong();
    
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        // 包装订阅者以收集指标
        originalPublisher.subscribe(new Flow.Subscriber<T>() {
            public void onNext(T item) {
                publishCount.incrementAndGet();
                subscriber.onNext(item);
            }
            
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(new Flow.Subscription() {
                    public void request(long n) {
                        requestCount.addAndGet(n);
                        subscription.request(n);
                    }
                });
            }
        });
    }
    
    public double getBackpressureRatio() {
        return (double) requestCount.get() / publishCount.get();
    }
}

4.2.4 资源管理建议

  • 总是使用try-with-resources或确保调用close()
  • 监控订阅者处理时间,避免阻塞onNext()
  • 为不同的流类型使用不同的线程池

4.3 与其它技术对比

特性Java Flow APIReactorRxJavaJava Streams
背压支持✅ 内置✅ 内置✅ 内置❌ 无
异步支持❌ 同步
操作符丰富度基础丰富极其丰富中等
学习曲线平缓中等陡峭平缓
与Java集成最好(标准库)很好最好

转载自CSDN-专业IT技术社区

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/ldcigame/article/details/150963565

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--