· 大家好,我是 同学小张 ,日常分享AI知识和实战案例
· 欢迎 点赞 + 关注 👏,持续学习 ,持续干货输出 。
· +v: jasper_8017 一起交流💬,一起进步💪
上篇文章(【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互 )中我们简单使用了一下AgentScope的Pipeline模块,方便地实现了一个多智能体交互应用。今天我们来深入Pipeline的源码,来看下AgentScope都提供了哪些类型的Pipeline,以及它的实现原理是什么。
0. Pipeline概念简介 我的个人简单理解:AgentScope为了方便大家对智能体间交互逻辑的编排,特地封装了 Pipeline 模块,其中包含了一系列地 Pipeline ,就像编程语言中的控制结构:顺序结构、条件分支、循环结构等。利用这些 Pipeline ,大家可以很方便地实现多智能体间的交互逻辑控制。
1. Pipeline的基类:PipelineBase
class PipelineBase ( Operator ): r """Base interface of all pipelines. The pipeline is a special kind of operator that includes multiple operators and the interaction logic among them. """ def __init__ ( self ) -> None : self . participants : List [ Any ] = [] @abstractmethod def __call__ ( self , x : Optional [ dict ] = None ) -> dict : """Define the actions taken by this pipeline. Args: x (Optional[`dict`], optional): Dialog history and some environment information Returns: `dict`: The pipeline's response to the input. """
基类只是实现了一个Pipeline的基本框架,所有类型的Pipeline都继承自这个基类,然后重写自己的__call__
函数。
从这个基类的说明中也可以看到AgentScope对Pipeline的定义:The pipeline is a special kind of operator that includes multiple operators and the interaction logic among them.
。Pipeline组合了一系列的operators和这些operators之间的交互逻辑。所谓的operators,其实就是我们所认识的Agent。
2. SequentialPipeline
这个Pipeline用来实现类似编程语言中的顺序执行结构。上篇文章中已经写过其源码了,所以在这里不再展开,感兴趣的可以去看一下:【AI Agent系列】【阿里AgentScope框架】2. Pipeline模块入门:使用Pipeline模块实现最简单的多智能体交互
3. IfElsePipeline
这个Pipeline用来实现类似编程语言中的 if-else
分支结构。
3.1 初始化参数 class IfElsePipeline ( PipelineBase ): def __init__ ( self , condition_func : Callable [[ dict ], bool ], if_body_operators : Operators , else_body_operators : Operators = placeholder , ) -> None : self . condition_func = condition_func self . if_body_operator = if_body_operators self . else_body_operator = else_body_operators self . participants = [ self . if_body_operator ] + [ self . else_body_operator ] def __call__ ( self , x : Optional [ dict ] = None ) -> dict : return ifelsepipeline ( condition_func = self . condition_func , if_body_operators = self . if_body_operator , else_body_operators = self . else_body_operator , x = x , )
其初始化接收三个参数,比较好理解:
3.2 实现原理 重写__call__
函数,调用了 ifelsepipeline
函数:
def ifelsepipeline ( condition_func : Callable , if_body_operators : Operators , else_body_operators : Operators = placeholder , x : Optional [ dict ] = None , ) -> dict : if condition_func ( x ): return _operators ( if_body_operators , x ) else : return _operators ( else_body_operators , x )
里面 if condition_func(x)
来判断是否满足设置的判断条件,然后选择执行 if_body_operators
还是 else_body_operators
。
这里的 if_body_operators
或者 else_body_operators
可以是一系列的 operators
,通过 _operators
函数来进行判断和执行:
def _operators ( operators : Operators , x : Optional [ dict ] = None ) -> dict : """Syntactic sugar for executing a single operator or a sequence of operators.""" if isinstance ( operators , Sequence ): return sequentialpipeline ( operators , x ) else : return operators ( x )
从这个函数的实现可以看到,如果operators
是一系列operator
,则会对它们执行 sequentialpipeline
顺序结构。
4. SwitchPipeline
这个Pipeline用来实现类似编程语言中的 switch-case
分支结构。
4.1 初始化参数 class SwitchPipeline ( PipelineBase ): def __init__ ( self , condition_func : Callable [[ dict ], Any ], case_operators : Mapping [ Any , Operators ], default_operators : Operators = placeholder , ) -> None : self . condition_func = condition_func self . case_operators = case_operators self . default_operators = default_operators self . participants = list ( self . case_operators . values ()) + [ self . default_operators , ] def __call__ ( self , x : Optional [ dict ] = None ) -> dict : return switchpipeline ( condition_func = self . condition_func , case_operators = self . case_operators , default_operators = self . default_operators , x = x , )
该Pipeline的初始化接收三个参数:
4.2 实现原理 重写__call__
函数,调用了 switchpipeline
函数:
def switchpipeline ( condition_func : Callable [[ Any ], Any ], case_operators : Mapping [ Any , Operators ], default_operators : Operators = placeholder , x : Optional [ dict ] = None , ) -> dict : target_case = condition_func ( x ) if target_case in case_operators : return _operators ( case_operators [ target_case ], x ) else : return _operators ( default_operators , x )
首先是 target_case = condition_func(x)
,根据判断条件找出当前的case条件,然后根据case条件找出需要执行的operators(case_operators[target_case]
),通过 _operators
函数来进行顺序执行。
5. 总结 今天这篇文章我们主要通过阅读源码,学习了AgentScope中Pipeline模块的基类、顺序Pipeline和条件Pipeline的实现。所谓的顺序Pipeline就是将Agent按顺序执行,消息按顺序传递。条件Pipeline就是用户给出判定条件,以及每种条件下应该运行的Agents,然后在满足某种条件的时候顺序执行该条件下的Agents。
如果觉得本文对你有帮助,麻烦点个赞和关注呗 ~~~点击上方公众号,关注↑↑↑
· 大家好,我是 同学小张 ,日常分享AI知识和实战案例
· 欢迎 点赞 + 关注 👏,持续学习 ,持续干货输出 。
· +v: jasper_8017 一起交流💬,一起进步💪。
公众号内文章一览