本脚本展示了如何使用 llama_index 库中的工作流(Workflow)系统来构建和运行复杂的流程管理逻辑。通过定义不同的事件类型、步骤函数以及上下文对象,可以实现从简单的线性流程到并发执行、事件收集、嵌套工作流等多种复杂的工作流模式
顺序工作流工作流通过继承 Workflow 类实现,这个类包含几个步骤,并且每个步骤使用装饰器 @step 装饰
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from llama_index.core.workflow import ( StartEvent, StopEvent, Workflow, step, ) class MyWorkflow (Workflow ): @step async def my_step (self,ev:StartEvent )->StopEvent: return StopEvent(result="hello world" ) w=MyWorkflow(timeout=10 ,verbose=False ) result=await w.run() print (result)
hello world
以上工作流定义了开头步骤(StartEvent)和结束标志(StopEvent),这很重要 LlamaIndex 通过以下代码将工作流可视化,可视化内容存在 html,使用浏览器打开即可
1 2 from llama_index.utils.workflow import draw_all_possible_flowsdraw_all_possible_flows(MyWorkflow, filename="basic_workflow.html" )
进一步地,我们通过定义不同的 Event,构建多级的工作流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from llama_index.core.workflow import ( StartEvent, StopEvent, Workflow, step, Event, ) class FirstEvent (Event ): first_output: str class SecondEvent (Event ): second_output: str class MyWorkflow (Workflow ): @step async def step_one (self, ev: StartEvent ) -> FirstEvent: print (ev.first_input) return FirstEvent(first_output="First step complete." ) @step async def step_two (self, ev: FirstEvent ) -> SecondEvent: print (ev.first_output) return SecondEvent(second_output="Second step complete." ) @step async def step_three (self, ev: SecondEvent ) -> StopEvent: print (ev.second_output) return StopEvent(result="Workflow complete." ) w = MyWorkflow(timeout=10 , verbose=False ) result = await w.run(first_input="Start the workflow." ) print (result)
Start the workflow. First step complete. Second step complete. Workflow complete.
总结 :
WorkFLow 是一个起点为 StartEvent,终点为 StopEvent 的流程 通过定义不同的 Event,构建其多级的工作流 工作流内支持变量的流转 分支循环工作流以上例子,只展示了顺序执行的工作流,对于复杂的分支、循环工作流如何构建呢?还是一样,首先定义监听事件 Event,通过不同的指定方式来构建不同的工作流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class LoopEvent (Event ): loop_output: str import randomclass LoopWorkflow (Workflow ): @step async def step_one (self, ev: StartEvent | LoopEvent ) -> FirstEvent | LoopEvent: if random.randint(0 , 1 ) == 0 : print ("Bad thing happened" ) return LoopEvent(loop_output="Back to step one." ) else : print ("Good thing happened" ) return FirstEvent(first_output="First step complete." ) @step async def step_two (self, ev: FirstEvent ) -> SecondEvent: print (ev.first_output) return SecondEvent(second_output="Second step complete." ) @step async def step_three (self, ev: SecondEvent ) -> StopEvent: print (ev.second_output) return StopEvent(result="Workflow complete." ) w = LoopWorkflow(timeout=10 , verbose=False ) result = await w.run() print (result)draw_all_possible_flows(LoopWorkflow, filename="loop_workflow.html" )
1 2 3 4 5 6 7 8 9 10 11 12 13 Bad thing happened Bad thing happened Bad thing happened Good thing happened First step complete. Second step complete. Workflow complete. <class 'NoneType '> <class '__main__.FirstEvent '> <class '__main__.LoopEvent '> <class 'llama_index.core.workflow.events.StopEvent '> <class '__main__.SecondEvent '> loop_workflow. Html
多次运行代码,可以发现代码工作流流程有两种:
A: one[FirstEvent]->two[SecondEvent]-three[StopEvent] B: 随机次数 one [LoopEvent]-> A 也就是说:下一步执行那个动作,取决于当前返回事件和动作的监听事件,比如动作 two 监听 FirstEvent,只要当前返回事件是 FirstEvent,下一步就会执行 two,这一点和 MetaGPT 定义 WorkFlow 的概念类似
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 class BranchA1Event (Event ): payload: str class BranchA2Event (Event ): payload: str class BranchB1Event (Event ): payload: str class BranchB2Event (Event ): payload: str class BranchWorkflow (Workflow ): @step async def start (self, ev: StartEvent ) -> BranchA1Event | BranchB1Event: if random.randint(0 , 1 ) == 0 : print ("Go to branch A" ) return BranchA1Event(payload="Branch A" ) else : print ("Go to branch B" ) return BranchB1Event(payload="Branch B" ) @step async def step_a1 (self, ev: BranchA1Event ) -> BranchA2Event: print (ev.payload) return BranchA2Event(payload=ev.payload) @step async def step_b1 (self, ev: BranchB1Event ) -> BranchB2Event: print (ev.payload) return BranchB2Event(payload=ev.payload) @step async def step_a2 (self, ev: BranchA2Event ) -> StopEvent: print (ev.payload) return StopEvent(result="Branch A complete." ) @step async def step_b2 (self, ev: BranchB2Event ) -> StopEvent: print (ev.payload) return StopEvent(result="Branch B complete." ) w = BranchWorkflow(timeout=10 , verbose=False ) result = await w.run() print (result)draw_all_possible_flows(BranchWorkflow, filename="branch_workflow.html" )
1 2 3 4 5 6 7 8 9 10 11 12 Go to branch A Branch A Branch A Branch A complete. <class 'NoneType '> <class '__main__.BranchA1Event '> <class '__main__.BranchB1Event '> <class '__main__.BranchA2Event '> <class 'llama_index.core.workflow.events.StopEvent '> <class '__main__.BranchB2Event '> <class 'llama_index.core.workflow.events.StopEvent '> branch_workflow. Html
工作流上下文在以上例子中,可以看见变量状态随着流程进行传递,如果要在未直接连接的步骤之间传递数据,则需要通过两者之间的所有步骤传递数据。这会使您的代码更难阅读和维护 为了解决这个问题,为工作流引入一个全局可用的对象 Context
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from llama_index.core.workflow import ( StartEvent, StopEvent, Workflow, step, Event, Context, ) class SetupEvent (Event ): query: str class StepTwoEvent (Event ): query: str class StatefulFlow (Workflow ): @step async def start ( self, ctx: Context, ev: StartEvent ) -> SetupEvent | StepTwoEvent: db = await ctx.get("some_database" , default=None ) if db is None : print ("Need to load data" ) return SetupEvent(query=ev.query) return StepTwoEvent(query=ev.query) @step async def setup (self, ctx: Context, ev: SetupEvent ) -> StartEvent: await ctx.set ("some_database" , [1 , 2 , 3 ]) return StartEvent(query=ev.query) @step async def step_two (self, ctx: Context, ev: StepTwoEvent ) -> StopEvent: print ("Data is " , await ctx.get("some_database" )) return StopEvent(result=await ctx.get("some_database" )) w = StatefulFlow(timeout=10 , verbose=False ) result = await w.run(query="Some query" ) print (result)
Need to load data Data is [1, 2, 3] [1, 2, 3]
流式处理事件LlamaIndex 中的 WorkFlow 的 Context 支持传递事件,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import nest_asyncionest_asyncio.apply() from llama_index.core.workflow import ( StartEvent, StopEvent, Workflow, step, Event, Context, ) import asynciofrom llama_index.utils.workflow import draw_all_possible_flowsfrom llama_index.llms.ollama import Ollama
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class FirstEvent (Event ): first_output: str class SecondEvent (Event ): second_output: str response: str class ProgressEvent (Event ): msg: str class MyWorkflow (Workflow ): @step async def step_one (self, ctx: Context, ev: StartEvent ) -> FirstEvent: ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening" )) return FirstEvent(first_output="First step complete." ) @step async def step_two (self, ctx: Context, ev: FirstEvent ) -> SecondEvent: llm = Ollama( model="qwen2.5:latest" , request_timeout=360.0 , base_url='http://localhost:11434' ) generator = await llm.astream_complete( "用中文给我讲2个笑话,每个笑话不超过15个字" ) async for response in generator: ctx.write_event_to_stream(ProgressEvent(msg=response.delta)) return SecondEvent( second_output="Second step complete, full response attached" , response=str (response), ) @step async def step_three (self, ctx: Context, ev: SecondEvent ) -> StopEvent: ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening" )) return StopEvent(result="Workflow complete." )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 async def main (): w = MyWorkflow(timeout=30 , verbose=True ) handler = w.run(first_input="Start the workflow." ) async for ev in handler.stream_events(): if isinstance (ev, ProgressEvent): print (ev.msg) final_result = await handler print ("Final result" , final_result) draw_all_possible_flows(MyWorkflow, filename="streaming_workflow.html" ) if __name__ == "__main__" : asyncio.run(main())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 Running step step_one Step step_ one produced event FirstEventRunning step step_two Step one is happening 1 . 老 婆 做饭 咸 , 加 点 水 稀 释 一下 。 2 . why did the tomato turn red ? Because it saw the salad dressing ! Step step_ two produced event SecondEventRunning step step_three Step step_ three produced event StopEventStep three is happening Final result Workflow complete. \<class 'NoneType '> \<class '__main__.FirstEvent '> \<class 'llama_index.core.workflow.events.StopEvent '> \<class '__main__.SecondEvent '> streaming_workflow.html
工作流的并发执行Llamaindex 的工作流可以被并发执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import randomclass StepTwoEvent (Event ): query: str class StepTwoEvent (Event ): query: str class ParallelFlow (Workflow ): @step async def start (self, ctx: Context, ev: StartEvent ) -> StepTwoEvent: ctx.send_event(StepTwoEvent(query="Query 1" )) ctx.send_event(StepTwoEvent(query="Query 2" )) ctx.send_event(StepTwoEvent(query="Query 3" )) @step(num_workers=4 ) async def step_two (self, ctx: Context, ev: StepTwoEvent ) -> StopEvent: print ("Running slow query " , ev.query) await asyncio.sleep(random.randint(1 , 5 )) return StopEvent(result=ev.query)
1 2 3 4 5 6 7 8 Async def main (): W = ParallelFlow (timeout=30 , verbose=True ) await w.run () Draw_all_possible_flows (ParallelFlow, filename="parallel_workflow. Html" ) If __name__ == "__main__" : Asyncio.Run (main ())
1 2 3 4 5 6 7 8 9 10 11 12 13 Running step start Step start produced no event Running step step_two Running slow query Query 1 Running step step_ twoRunning slow query Query 2 Running step step_two Running slow query Query 3 Step step_ two produced event StopEvent<class 'NoneType '> <class '__main__.StepTwoEvent '> <class 'llama_index.core.workflow.events.StopEvent '> Parallel_workflow. Html
以上流程,step_two 每次接受一个结果就执行,这在某些情况下适合,在某些情况下需要等待所有调用完成再执行,此时通过 collect_events 来判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 Class StepThreeEvent (Event): Result: str Class ConcurrentFlow (Workflow): @step Async def start (self , ctx: Context, ev: StartEvent) -> StepTwoEvent: Ctx. Send_event (StepTwoEvent (query="Query 1" )) Ctx. Send_event (StepTwoEvent (query="Query 2" )) Ctx. Send_event (StepTwoEvent (query="Query 3" )) @step (num_workers=4 ) Async def step_two (self , ctx: Context, ev: StepTwoEvent) -> StepThreeEvent: Print ("Running query " , ev. Query) Await asyncio.Sleep (random.Randint (1 , 5 )) Return StepThreeEvent (result=ev. Query) @step Async def step_three (self , ctx: Context, ev: StepThreeEvent) -> StopEvent: Result = ctx. Collect_events (ev, [StepThreeEvent] * 3 ) If result is None : Return None Print ('result: ' , result) Return StopEvent (result="Done" ) Async def main (): W = ConcurrentFlow (timeout=30 , verbose=True ) await w.run () Draw_all_possible_flows (ConcurrentFlow, filename="concurrent_workflow. Html" ) If __name__ == "__main__" : Asyncio.Run (main ())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Running step start Step start produced no event Running step step_two Running query Query 1 Running step step_ twoRunning query Query 2 Running step step_two Running query Query 3 Step step_ two produced event StepThreeEventStep step_two produced event StepThreeEvent Running step step_ threeStep step_three produced no event Running step step_ threeStep step_three produced no event Step step_ two produced event StepThreeEventRunning step step_three Result: [StepThreeEvent (result='Query 1'), StepThreeEvent (result='Query 2'), StepThreeEvent (result='Query 3')] Step step_ three produced event StopEvent<class 'NoneType '> <class '__main__.StepTwoEvent '> <class 'llama_index.core.workflow.events.StopEvent '> <class '__main__.StepThreeEvent '> Concurrent_workflow. Html
以上过程是等待同类型的 3 个事件,其实也可以等待不同类型的事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 Class StepAEvent (Event): Query: str Class StepBEvent (Event): Query: str Class StepCEvent (Event): Query: str Class StepACompleteEvent (Event): Result: str Class StepBCompleteEvent (Event): Result: str Class StepCCompleteEvent (Event): Result: str Class ConcurrentFlow 2 (Workflow): @step Async def start ( Self, ctx: Context, ev: StartEvent ) -> StepAEvent | StepBEvent | StepCEvent: Ctx. Send_event (StepAEvent (query="Query 1" )) Ctx. Send_event (StepBEvent (query="Query 2" )) Ctx. Send_event (StepCEvent (query="Query 3" )) @step Async def step_a (self , ctx: Context, ev: StepAEvent) -> StepACompleteEvent: Print ("Doing something A-ish" ) Return StepACompleteEvent (result=ev. Query) @step Async def step_b (self , ctx: Context, ev: StepBEvent) -> StepBCompleteEvent: Print ("Doing something B-ish" ) Return StepBCompleteEvent (result=ev. Query) @step Async def step_c (self , ctx: Context, ev: StepCEvent) -> StepCCompleteEvent: Print ("Doing something C-ish" ) Return StepCCompleteEvent (result=ev. Query) @step Async def step_three ( Self, Ctx: Context, Ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent, ) -> StopEvent: Print ("Received event " , ev. Result) If ( Ctx. Collect_events ( Ev, [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent], ) Is None ): Return None Return StopEvent (result="Done" ) Async def main (): W = ConcurrentFlow 2 (timeout=30 , verbose=True ) await w.run () Draw_all_possible_flows (ConcurrentFlow 2 , filename="concurrent 2_workflow. Html" ) If __name__ == "__main__" : Asyncio.Run (main ())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Running step start Step start produced no event Running step step_a Doing something A-ish Step step_ a produced event StepACompleteEventRunning step step_b Doing something B-ish Step step_ b produced event StepBCompleteEventRunning step step_c Doing something C-ish Step step_ c produced event StepCCompleteEventRunning step step_three Received event Query 1 Step step_ three produced no eventRunning step step_three Received event Query 2 Step step_ three produced no eventRunning step step_three Received event Query 3 Step step_ three produced event StopEvent<class 'NoneType '> <class '__main__.StepAEvent '> <class '__main__.StepBEvent '> <class '__main__.StepCEvent '> <class '__main__.StepACompleteEvent '> <class '__main__.StepBCompleteEvent '> <class '__main__.StepCCompleteEvent '> <class 'llama_index.core.workflow.events.StopEvent '> concurrent 2_workflow. Html
继承自定义工作流类自定义工作流是继承 WorkFlow 类,其实自定义的工作流也是可以作为其他类的基类被继承,相当于实现了一个基础工作流,继承这个工作流再实现一个更加复杂的子类工作流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 From llama_index. Core. Workflow import ( StartEvent, StopEvent, Workflow, Step, Event, Context, ) Class Step 2 Event (Event): Query: str Class Step 3 Event (Event): Query: str Class MainWorkflow (Workflow): @step Async def start (self , ev: StartEvent) -> Step 2 Event: Print ("Starting up" ) Return Step 2 Event (query=ev. Query) @step Async def step_two (self , ev: Step 2 Event) -> Step 3 Event: Print ("Sending an email" ) Return Step 3 Event (query=ev. Query) @step Async def step_three (self , ev: Step 3 Event) -> StopEvent: Print ("Finishing up" ) Return StopEvent (result=ev. Query) W = MainWorkflow (timeout=10 , verbose=False ) result = await w.run (query="Initial query" ) Print (result)
Starting up
Sending an email
Finishing up
Initial query
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Class Step 2 BEvent (Event): Query: str Class CustomWorkflow (MainWorkflow): @step Async def step_two (self , ev: Step 2 Event) -> Step 2 BEvent: Print ("Sending an email" ) Return Step 2 BEvent (query=ev. Query) @step Async def step_two_b (self , ev: Step 2 BEvent) -> Step 3 Event: Print ("Also sending a text message" ) Return Step 3 Event (query=ev. Query) W = CustomWorkflow (timeout=10 , verbose=False ) result = await w.run (query="Initial query" ) Print (result)
Starting up Sending an email Also sending a text message Finishing up Initial query
嵌套工作流将某个工作流嵌入到已存在的工作流中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 From llama_index. Core. Workflow import ( StartEvent, StopEvent, Workflow, Step, Event, Context, ) From llama_index. Utils. Workflow import draw_all_possible_flows Class Step 2 Event (Event): Query: str Class MainWorkflow (Workflow): @step Async def start ( Self, ctx: Context, ev: StartEvent, reflection_workflow: Workflow ) -> Step 2 Event: Print ("Need to run reflection" ) Res = await reflection_workflow.Run (query=ev. Query) Return Step 2 Event (query=res) @step Async def step_two (self , ctx: Context, ev: Step 2 Event) -> StopEvent: Print ("Query is " , ev. Query) Return StopEvent (result=ev. Query) Class ReflectionFlow (Workflow): @step Async def sub_start (self , ctx: Context, ev: StartEvent) -> StopEvent: Print ("Doing custom reflection" ) Return StopEvent (result="Improved query" ) W = MainWorkflow (timeout=10 , verbose=False ) w.add_workflows (reflection_workflow=ReflectionFlow ()) result = await w.run (query="Initial query" ) Print (result)
Need to run reflection Doing custom reflection Query is Improved query Improved query