分享

【阿里AgentScope框架】4. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 循环结构

 小张学AI 2024-05-14 发布于山东

前面我们已经初步学会了使用AgentScope中的Pipeline模块,并深入源码,阅读了其中的顺序结构Pipeline和条件分支Pipeline的实现代码。今天,我们来学习另一种经典的Pipeline结构 - 循环结构Pipeline。同样深入源码,了解其背后的实现逻辑。

目前AgentScope还处于快速迭代阶段,本文源码版本为:Successfully installed agentscope-0.0.4a0

0. 推荐前置阅读

(1)Pipeline入门 & 顺序结构Pipeline源码详解:【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互

(2)Pipeline基类 & 条件结构Pipeline源码详解:【AI Agent系列】【阿里AgentScope框架】3. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 顺序结构与条件分支

1. ForLoopPipeline

这个Pipeline用来实现类似编程语言中的 for 循环结构。

1.1 初始化

class ForLoopPipeline(PipelineBase):
    def __init__(
        self,
        loop_body_operators: Operators,
        max_loop: int,
        break_func: Callable[[dict], bool] = lambda _: False,
    
):
        self.loop_body_operators = loop_body_operators
        self.max_loop = max_loop
        self.break_func = break_func
        self.participants = [self.loop_body_operators]

    def __call__(self, x: Optional[dict] = None) -> dict:
        return forlooppipeline(
            loop_body_operators=self.loop_body_operators,
            max_loop=self.max_loop,
            break_func=self.break_func,
            x=x,
        )

该Pipeline的初始化接收三个参数:

  • · loop_body_operators:需要循环的operators

  • · max_loop:最大循环次数

  • · break_func:跳出循环的条件

1.2 实现原理

重写__call__函数,调用了 forlooppipeline 函数:

def forlooppipeline(
    loop_body_operators: Operators,
    max_loop: int,
    break_func: Callable[[dict], bool] = lambda _: False,
    x: Optional[dict] = None,
) -> dict:
    for _ in range(max_loop):
        # loop body
        x = _operators(loop_body_operators, x)
        # check condition
        if break_func(x):
            break
    return x  # type: ignore[return-value]

实现原理比较简单,一个for循环,for循环中为 x = _operators(loop_body_operators, x),关于 _operators函数,我们在上篇文章已经看过源码,它就是将 loop_body_operators 执行 sequentialpipeline顺序结构。

然后if break_func(x)语句,用来判断是否提前到达了停止条件,如果到达了,则直接跳出循环。

2. WhileLoopPipeline

这个Pipeline用来实现类似编程语言中的 while 循环结构。

1.1 初始化

class WhileLoopPipeline(PipelineBase):
    def __init__(
        self,
        loop_body_operators: Operators,
        condition_func: Callable[[int, dict], bool] = lambda _, __: False,
    
):
        self.condition_func = condition_func
        self.loop_body_operators = loop_body_operators
        self.participants = [self.loop_body_operators]

    def __call__(self, x: Optional[dict] = None) -> dict:
        return whilelooppipeline(
            loop_body_operators=self.loop_body_operators,
            condition_func=self.condition_func,
            x=x,
        )

该Pipeline的初始化接收两个参数:

  • · loop_body_operators:需要循环的operators

  • · condition_func:跳出循环的条件

1.2 实现原理

重写__call__函数,调用了 whilelooppipeline 函数:

def whilelooppipeline(
    loop_body_operators: Operators,
    condition_func: Callable[[int, Any], bool] = lambda _, __: False,
    x: Optional[dict] = None,
) -> dict:
    i = 0
    while condition_func(i, x):
        # loop body
        x = _operators(loop_body_operators, x)
        # check condition
        i += 1
    return x  # type: ignore[return-value]

实现原理也比较简单,一个while循环,进入while循环的条件是 condition_func。循环中为 x = _operators(loop_body_operators, x),即将 loop_body_operators 执行 sequentialpipeline顺序结构。

里面的 i 变量,没看懂其存在的意义是什么,只是用来计数吗?但是又没有最大循环次数设置进来,这个 i 变量也没有传递出去。

3. 总结

本文我们学习了AgentScope框架Pipeline模块中的两种循环Pipeline,其实现原理都是比较简单的,简单理解下,可以将循环内的operators理解成一系列函数,这些函数放在了for循环或while循环中。有过一点编程经验的同学相信很容易理解。

如果觉得本文对你有帮助,麻烦点个赞和关注呗 ~~~点击上方公众号,关注↑↑↑


  • · 大家好,我是 同学小张,日常分享AI知识和实战案例

  • · 欢迎 点赞 + 关注 👏,持续学习持续干货输出

  • · +v: jasper_8017 一起交流💬,一起进步💪。

公众号内文章一览

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多