关注

如何使用LangGraph4j工作流改造项目流程实践

项目进行AI工作流改造

Demo演示

引入依赖:

<!-- LangGraph4j -->
<dependency>
  <groupId>org.bsc.langgraph4j</groupId>
  <artifactId>langgraph4j-core</artifactId>
  <version>1.6.0-rc2</version>
</dependency>

参考官方文档,编写LangGraph4j的demo,可以看看工作流的大概要实现的内容:

1.定义状态:

// Define the state for our graph
class SimpleState extends AgentState {
    public static final String MESSAGES_KEY = "messages";

    // Define the schema for the state.
    // MESSAGES_KEY will hold a list of strings, and new messages will be appended.
    public static final Map<String, Channel<?>> SCHEMA = Map.of(
        MESSAGES_KEY, Channels.appender(ArrayList::new)
    );

    public SimpleState(Map<String, Object> initData) {
        super(initData);
    }

    public List<String> messages() {
        return this.<List<String>>value("messages")
        .orElse( List.of() );
    }
}

2.定义请求节点和响应节点:

// Node that adds a greeting
class GreeterNode implements NodeAction<SimpleState> {
    @Override
    public Map<String, Object> apply(SimpleState state) {
        System.out.println("GreeterNode executing. Current messages: " + state.messages());
        return Map.of(SimpleState.MESSAGES_KEY, "Hello from GreeterNode!");
    }
}
// Node that adds a response
class ResponderNode implements NodeAction<SimpleState> {
    @Override
    public Map<String, Object> apply(SimpleState state) {
        System.out.println("ResponderNode executing. Current messages: " + state.messages());
        List<String> currentMessages = state.messages();
        if (currentMessages.contains("Hello from GreeterNode!")) {
            return Map.of(SimpleState.MESSAGES_KEY, "Acknowledged greeting!");
        }
        return Map.of(SimpleState.MESSAGES_KEY, "No greeting found.");
    }
}

3.最后定义工作流结构:

public class SimpleGraphApp {

    public static void main(String[] args) throws GraphStateException {
        // Initialize nodes
        GreeterNode greeterNode = new GreeterNode();
        ResponderNode responderNode = new ResponderNode();

        // Define the graph structure
        var stateGraph = new StateGraph<>(SimpleState.SCHEMA, initData -> new SimpleState(initData))
        .addNode("greeter", node_async(greeterNode))
        .addNode("responder", node_async(responderNode))
        // Define edges
        .addEdge(START, "greeter") // Start with the greeter node
        .addEdge("greeter", "responder")
        .addEdge("responder", END)   // End after the responder node
        ;
        // Compile the graph
        var compiledGraph = stateGraph.compile();

        // 打印出工作流可视化编排---文本绘图方法能力
        GraphRepresentation graph = stateGraph.getGraph(GraphRepresentation.Type.MERMAID, "demo");
        System.out.println(graph.toString());

        // Run the graph
        // The `stream` method returns an AsyncGenerator.
        // For simplicity, we'll collect results. In a real app, you might process them as they arrive.
        // Here, the final state after execution is the item of interest.

        for (var item : compiledGraph.stream( Map.of( SimpleState.MESSAGES_KEY, "Let's, begin!" ) ) ) {

            System.out.println( item );
        }

    }
}

执行查看效果:

工作流在项目中的运用改造

项目改造思路流程:

以AI生成代码网站场景为例,

用户输入提示词 -> Agent通过工具调用从不同的渠道获取图片素材 -> 内容图片pexels网页搜索 -> 插画图片undraw抓取 -> 文本绘图+上传到COS -> AI生成或MCP服务Logo等设计图片 -> 提示词增强关联图片内容到原始提示词 -> 智能路由Agent选择模式生成网站(原生HTML、多文件、Vue工程) -> 利用搜索到的图片和根据上一步确认的模式生成网站 -> 文件保存项目构建打包

工作流开发步骤:

定义工作流结构 -> 定义状态 -> 定义工作节点(先通过假数据进行模拟状态流转)-> 开发真实的工作节点 -> 工作流中使用节点,提供完整的工作流服务

可以使用AI生成工作流结构,提示词如下:

帮我生成 LangGraph4j 工作流的代码

## 工作流的流程描述

// ... 补充具体的流程

## 要求

先生成基础的工作流结构代码,每个工作节点中只输出一句信息就够了,不用真正实现具体的业务逻辑。

## 参考信息

官方文档:@https://langgraph4j.github.io/langgraph4j/core/low_level/
示例工作流实现:@https://github.com/langgraph4j/langgraph4j-examples/blob/main/langchain4j/adaptive-rag/src/main/java/dev/langchain4j/adaptiverag/AdaptiveRag.java

定义简单工作流图结构

代码如下:

import lombok.extern.slf4j.Slf4j;
import org.bsc.langgraph4j.CompiledGraph;
import org.bsc.langgraph4j.GraphRepresentation;
import org.bsc.langgraph4j.GraphStateException;
import org.bsc.langgraph4j.NodeOutput;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;

import java.util.Map;

import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;

/**
 * 简化版网站生成工作流应用 - 使用 MessagesState
 */
@Slf4j
public class SimpleWorkflowApp {

    /**
     * 创建工作节点的通用方法
     */
    static AsyncNodeAction<MessagesState<String>> makeNode(String message) {
        return node_async(state -> {
            log.info("执行节点: {}", message);
            return Map.of("messages", message);
        });
    }

    public static void main(String[] args) throws GraphStateException {
        // 创建工作流图
        CompiledGraph<MessagesState<String>> workflow = new MessagesStateGraph<String>()
        // 添加节点
        .addNode("image_collector", makeNode("获取图片素材"))
        .addNode("prompt_enhancer", makeNode("增强提示词"))
        .addNode("router", makeNode("智能路由选择"))
        .addNode("code_generator", makeNode("网站代码生成"))
        .addNode("project_builder", makeNode("项目构建"))

        // 添加边
        .addEdge(START, "image_collector")                // 开始 -> 图片收集
        .addEdge("image_collector", "prompt_enhancer")    // 图片收集 -> 提示词增强
        .addEdge("prompt_enhancer", "router")             // 提示词增强 -> 智能路由
        .addEdge("router", "code_generator")              // 智能路由 -> 代码生成
        .addEdge("code_generator", "project_builder")     // 代码生成 -> 项目构建
        .addEdge("project_builder", END)                  // 项目构建 -> 结束

        // 编译工作流
        .compile();

        log.info("开始执行工作流");

        GraphRepresentation graph = workflow.getGraph(GraphRepresentation.Type.MERMAID);
        log.info("工作流图: \n{}", graph.content());

        // 执行工作流
        int stepCounter = 1;
        for (NodeOutput<MessagesState<String>> step : workflow.stream(Map.of())) {
            log.info("--- 第 {} 步完成 ---", stepCounter);
            log.info("步骤输出: {}", step);
            stepCounter++;
        }

        log.info("工作流执行完成!");
    }
}

定义状态

官方提供的状态类默认是MessagesState消息列表结构,但是很明显我们是需要维护的状态是包含多个字段,应该是一个Map或类的结构,因此我们需要自定义一个WorkflowContext状态上下文类。而且为了和LangGraph4j工作流图需要的AgentState兼容,我们可以将WorkflowContext对象当作一个key/value存放到MessagesState中,需要时通过state.data().getKey获取。

定义图片资源对象和图片枚举类,以及自定义WorkflowContext上下文类,代码如下:

@Getter
public enum ImageCategoryEnum {

    CONTENT("内容图片", "CONTENT"),
    LOGO("LOGO图片", "LOGO"),
    ILLUSTRATION("插画图片", "ILLUSTRATION"),
    ARCHITECTURE("架构图片", "ARCHITECTURE");


    private final String text;

    private final String value;

    ImageCategoryEnum(String text, String value) {
        this.text = text;
        this.value = value;
    }

    /**
     * 根据 value 获取枚举
     *
     * @param value 枚举值的value
     * @return 枚举值
     */
    public static ImageCategoryEnum getEnumByValue(String value) {
        if (ObjUtil.isEmpty(value)) {
            return null;
        }
        for (ImageCategoryEnum anEnum : ImageCategoryEnum.values()) {
            if (anEnum.value.equals(value)) {
                return anEnum;
            }
        }
        return null;
    }
}
/**
 * 图片资源对象
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ImageResource implements Serializable {

    /**
     * 图片类别
     */
    private ImageCategoryEnum category;

    /**
     * 图片描述
     */
    private String description;

    /**
     * 图片地址
     */
    private String url;

    @Serial
    private static final long serialVersionUID = 1L;
}
/**
 * 工作流上下文 - 存储所有状态信息
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowContext implements Serializable {

    /**
     * WorkflowContext 在 MessagesState 中的存储key
     */
    public static final String WORKFLOW_CONTEXT_KEY = "workflowContext";

    /**
     * 当前执行步骤
     */
    private String currentStep;

    /**
     * 用户原始输入的提示词
     */
    private String originalPrompt;

    /**
     * 图片资源字符串
     */
    private String imageListStr;

    /**
     * 图片资源列表
     */
    private List<ImageResource> imageList;

    /**
     * 增强后的提示词
     */
    private String enhancedPrompt;

    /**
     * 代码生成类型
     */
    private CodeGenTypeEnum generationType;

    /**
     * 生成的代码目录
     */
    private String generatedCodeDir;

    /**
     * 构建成功的目录
     */
    private String buildResultDir;

    /**
     * 错误信息
     */
    private String errorMessage;

    @Serial
    private static final long serialVersionUID = 1L;

    // ========== 上下文操作方法 ==========

    /**
     * 从 MessagesState 中获取 WorkflowContext
     */
    public static WorkflowContext getContext(MessagesState<String> state) {
        return (WorkflowContext) state.data().get(WORKFLOW_CONTEXT_KEY);
    }

    /**
     * 将 WorkflowContext 保存到 MessagesState 中
     */
    public static Map<String, Object> saveContext(WorkflowContext context) {
        return Map.of(WORKFLOW_CONTEXT_KEY, context);
    }
}

修改工作流图结构,引入状态完善简单工作流图,不进行状态流转代码如下:

/**
 * 简化版带状态定义的工作流 - 只定义状态结构,不实现具体流转
 */
@Slf4j
public class SimpleStatefulWorkflowApp {

    /**
     * 创建带状态感知的工作节点
     */
    static AsyncNodeAction<MessagesState<String>> makeStatefulNode(String nodeName, String message) {
        return node_async(state -> {
            WorkflowContext context = WorkflowContext.getContext(state);
            log.info("执行节点: {} - {}", nodeName, message);
            // 只记录当前步骤,不做具体的状态流转
            if (context != null) {
                context.setCurrentStep(nodeName);
            }
            return WorkflowContext.saveContext(context);
        });
    }

    public static void main(String[] args) throws GraphStateException {
        // 创建工作流图
        CompiledGraph<MessagesState<String>> workflow = new MessagesStateGraph<String>()
        // 添加节点 - 使用带状态感知的节点
        .addNode("image_collector", makeStatefulNode("image_collector", "获取图片素材"))
        .addNode("prompt_enhancer", makeStatefulNode("prompt_enhancer", "增强提示词"))
        .addNode("router", makeStatefulNode("router", "智能路由选择"))
        .addNode("code_generator", makeStatefulNode("code_generator", "网站代码生成"))
        .addNode("project_builder", makeStatefulNode("project_builder", "项目构建"))

        // 添加边
        .addEdge(START, "image_collector")
        .addEdge("image_collector", "prompt_enhancer")
        .addEdge("prompt_enhancer", "router")
        .addEdge("router", "code_generator")
        .addEdge("code_generator", "project_builder")
        .addEdge("project_builder", END)

        // 编译工作流
        .compile();

        // 初始化 WorkflowContext - 只设置基本信息
        WorkflowContext initialContext = WorkflowContext.builder()
        .originalPrompt("创建一个小楼的个人博客网站")
        .currentStep("初始化")
        .build();

        log.info("初始输入: {}", initialContext.getOriginalPrompt());
        log.info("开始执行工作流");

        // 显示工作流图
        GraphRepresentation graph = workflow.getGraph(GraphRepresentation.Type.MERMAID);
        log.info("工作流图:\n{}", graph.content());

        // 执行工作流
        int stepCounter = 1;
        for (NodeOutput<MessagesState<String>> step : workflow.stream(Map.of(WorkflowContext.WORKFLOW_CONTEXT_KEY, initialContext))) {
            log.info("--- 第 {} 步完成 ---", stepCounter);
            // 显示当前状态
            WorkflowContext currentContext = WorkflowContext.getContext(step.state());
            if (currentContext != null) {
                log.info("当前步骤上下文: {}", currentContext);
            }
            stepCounter++;
        }
        log.info("工作流执行完成!");
    }
}

定义工作节点(并使用Mock假数据来模拟状态流转)

定义图片收集节点,代码如下:

@Slf4j
public class ImageCollectorNode {
    public static AsyncNodeAction<MessagesState<String>> create() {
        return node_async(state -> {
            WorkflowContext context = WorkflowContext.getContext(state);
            log.info("执行节点: 图片收集");

            // TODO: 实际执行图片收集逻辑

            // 简单的假数据
            List<ImageResource> imageList = Arrays.asList(
                ImageResource.builder()
                .category(ImageCategoryEnum.CONTENT)
                .description("假数据图片1")
                .url("https://www.codefather.cn/logo.png")
                .build(),
                ImageResource.builder()
                .category(ImageCategoryEnum.LOGO)
                .description("假数据图片2")
                .url("https://www.codefather.cn/logo.png")
                .build()
            );

            // 更新状态
            context.setCurrentStep("图片收集");
            context.setImageList(imageList);
            log.info("图片收集完成,共收集 {} 张图片", imageList.size());
            return WorkflowContext.saveContext(context);
        });
    }
}

定义提示词增强节点,代码如下

@Slf4j
public class PromptEnhancerNode {
    public static AsyncNodeAction<MessagesState<String>> create() {
        return node_async(state -> {
            WorkflowContext context = WorkflowContext.getContext(state);
            log.info("执行节点: 提示词增强");

            // TODO: 实际执行提示词增强逻辑

            // 简单的假数据
            String enhancedPrompt = "这是增强后的假数据提示词";

            // 更新状态
            context.setCurrentStep("提示词增强");
            context.setEnhancedPrompt(enhancedPrompt);
            log.info("提示词增强完成");
            return WorkflowContext.saveContext(context);
        });
    }
}

定义智能路由节点,代码如下:

@Slf4j
public class RouterNode {
    public static AsyncNodeAction<MessagesState<String>> create() {
        return node_async(state -> {
            WorkflowContext context = WorkflowContext.getContext(state);
            log.info("执行节点: 智能路由");

            // TODO: 实际执行智能路由逻辑

            // 简单的假数据
            CodeGenTypeEnum generationType = CodeGenTypeEnum.HTML;
            // 更新状态
            context.setCurrentStep("智能路由");
            context.setGenerationType(generationType);
            log.info("路由决策完成,选择类型: {}", generationType.getText());
            return WorkflowContext.saveContext(context);
        });
    }
}

定义代码生成节点,代码如下:

@Slf4j
public class CodeGeneratorNode {
    public static AsyncNodeAction<MessagesState<String>> create() {
        return node_async(state -> {
            WorkflowContext context = WorkflowContext.getContext(state);
            log.info("执行节点: 代码生成");

            // TODO: 实际执行代码生成逻辑

            // 简单的假数据
            String generatedCodeDir = "/tmp/generated/fake-code";
            // 更新状态
            context.setCurrentStep("代码生成");
            context.setGeneratedCodeDir(generatedCodeDir);
            log.info("代码生成完成,目录: {}", generatedCodeDir);
            return WorkflowContext.saveContext(context);
        });
    }
}

定义项目构建节点,代码如下:

@Slf4j
public class ProjectBuilderNode {
    public static AsyncNodeAction<MessagesState<String>> create() {
        return node_async(state -> {
            WorkflowContext context = WorkflowContext.getContext(state);
            log.info("执行节点: 项目构建");

            // TODO: 实际执行项目构建逻辑

            // 简单的假数据
            String buildResultDir = "/tmp/build/fake-build";

            // 更新状态
            context.setCurrentStep("项目构建");
            context.setBuildResultDir(buildResultDir);
            log.info("项目构建完成,结果目录: {}", buildResultDir);
            return WorkflowContext.saveContext(context);
        });
    }
}

工作流图应用各个工作节点(模拟流转)

@Slf4j
public class WorkflowApp {

    public static void main(String[] args) throws GraphStateException {
        // 创建工作流图
        CompiledGraph<MessagesState<String>> workflow = new MessagesStateGraph<String>()
        // 添加节点 - 使用真实的工作节点
        .addNode("image_collector", ImageCollectorNode.create())
        .addNode("prompt_enhancer", PromptEnhancerNode.create())
        .addNode("router", RouterNode.create())
        .addNode("code_generator", CodeGeneratorNode.create())
        .addNode("project_builder", ProjectBuilderNode.create())
        // 添加边
        .addEdge(START, "image_collector")
        .addEdge("image_collector", "prompt_enhancer")
        .addEdge("prompt_enhancer", "router")
        .addEdge("router", "code_generator")
        .addEdge("code_generator", "project_builder")
        .addEdge("project_builder", END)
        // 编译工作流
        .compile();

        // 初始化 WorkflowContext - 只设置基本信息
        WorkflowContext initialContext = WorkflowContext.builder()
        .originalPrompt("创建一个小楼的个人博客网站")
        .currentStep("初始化")
        .build();
        log.info("初始输入: {}", initialContext.getOriginalPrompt());
        log.info("开始执行工作流");

        // 显示工作流图
        GraphRepresentation graph = workflow.getGraph(GraphRepresentation.Type.MERMAID);
        log.info("工作流图:\n{}", graph.content());

        // 执行工作流
        int stepCounter = 1;
        for (NodeOutput<MessagesState<String>> step : workflow.stream(Map.of(WorkflowContext.WORKFLOW_CONTEXT_KEY, initialContext))) {
            log.info("--- 第 {} 步完成 ---", stepCounter);
            // 显示当前状态
            WorkflowContext currentContext = WorkflowContext.getContext(step.state());
            if (currentContext != null) {
                log.info("当前步骤上下文: {}", currentContext);
            }
            stepCounter++;
        }
        log.info("工作流执行完成!");
    }
}

依次开发真实工作节点

此处根据业务需求进行开发节点。

最后在工作流中使用节点

原本的WorkflowApp工作流是使用main进行模拟运行,现在使用真实工作节点进行工作流改造:

@Slf4j
public class CodeGenWorkflow {

    /**
     * 创建完整的工作流
     */
    public CompiledGraph<MessagesState<String>> createWorkflow() {
        try {
            return new MessagesStateGraph<String>()
            // 添加节点 - 使用完整实现的节点
            .addNode("image_collector", ImageCollectorNode.create())
            .addNode("prompt_enhancer", PromptEnhancerNode.create())
            .addNode("router", RouterNode.create())
            .addNode("code_generator", CodeGeneratorNode.create())
            .addNode("project_builder", ProjectBuilderNode.create())

            // 添加边
            .addEdge(START, "image_collector")
            .addEdge("image_collector", "prompt_enhancer")
            .addEdge("prompt_enhancer", "router")
            .addEdge("router", "code_generator")
            .addEdge("code_generator", "project_builder")
            .addEdge("project_builder", END)

            // 编译工作流
            .compile();
        } catch (GraphStateException e) {
            throw new BusinessException(ErrorCode.OPERATION_ERROR, "工作流创建失败");
        }
    }

    /**
     * 执行工作流
     */
    public WorkflowContext executeWorkflow(String originalPrompt) {
        CompiledGraph<MessagesState<String>> workflow = createWorkflow();

        // 初始化 WorkflowContext
        WorkflowContext initialContext = WorkflowContext.builder()
        .originalPrompt(originalPrompt)
        .currentStep("初始化")
        .build();

        GraphRepresentation graph = workflow.getGraph(GraphRepresentation.Type.MERMAID);
        log.info("工作流图:\n{}", graph.content());
        log.info("开始执行代码生成工作流");

        WorkflowContext finalContext = null;
        int stepCounter = 1;
        for (NodeOutput<MessagesState<String>> step : workflow.stream(
            Map.of(WorkflowContext.WORKFLOW_CONTEXT_KEY, initialContext))) {
            log.info("--- 第 {} 步完成 ---", stepCounter);
            // 显示当前状态
            WorkflowContext currentContext = WorkflowContext.getContext(step.state());
            if (currentContext != null) {
                finalContext = currentContext;
                log.info("当前步骤上下文: {}", currentContext);
            }
            stepCounter++;
        }
        log.info("代码生成工作流执行完成!");
        return finalContext;
    }
}

最后编写单元测试进行工作流效果测试:

@SpringBootTest
class CodeGenWorkflowTest {

    @Test
    void testTechBlogWorkflow() {
        WorkflowContext result = new CodeGenWorkflow().executeWorkflow("创建一个技术博客网站,需要展示编程教程和系统架构");
        Assertions.assertNotNull(result);
        System.out.println("生成类型: " + result.getGenerationType());
        System.out.println("生成的代码目录: " + result.getGeneratedCodeDir());
        System.out.println("构建结果目录: " + result.getBuildResultDir());
    }

    @Test
    void testCorporateWorkflow() {
        WorkflowContext result = new CodeGenWorkflow().executeWorkflow("创建企业官网,展示公司形象和业务介绍");
        Assertions.assertNotNull(result);
        System.out.println("生成类型: " + result.getGenerationType());
        System.out.println("生成的代码目录: " + result.getGeneratedCodeDir());
        System.out.println("构建结果目录: " + result.getBuildResultDir());
    }

    @Test
    void testVueProjectWorkflow() {
        WorkflowContext result = new CodeGenWorkflow().executeWorkflow("创建一个Vue前端项目,包含用户管理和数据展示功能");
        Assertions.assertNotNull(result);
        System.out.println("生成类型: " + result.getGenerationType());
        System.out.println("生成的代码目录: " + result.getGeneratedCodeDir());
        System.out.println("构建结果目录: " + result.getBuildResultDir());
    }

    @Test
    void testSimpleHtmlWorkflow() {
        WorkflowContext result = new CodeGenWorkflow().executeWorkflow("创建一个简单的个人主页");
        Assertions.assertNotNull(result);
        System.out.println("生成类型: " + result.getGenerationType());
        System.out.println("生成的代码目录: " + result.getGeneratedCodeDir());
        System.out.println("构建结果目录: " + result.getBuildResultDir());
    }
}

可以多测试几个看看效果:

转载自CSDN-专业IT技术社区

原文链接:https://blog.csdn.net/TheChosenOnev/article/details/157980397

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--