LlamaIndex 基础 07-workflow

本脚本展示了如何使用 llama_index 库中的工作流(Workflow)系统来构建和运行复杂的流程管理逻辑。通过定义不同的事件类型、步骤函数以及上下文对象,可以实现从简单的线性流程到并发执行、事件收集、嵌套工作流等多种复杂的工作流模式

顺序工作流

工作流通过继承 Workflow 类实现,这个类包含几个步骤,并且每个步骤使用装饰器 @step 装饰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
)

class MyWorkflow(Workflow):

@step
async def my_step(self,ev:StartEvent)->StopEvent:
return StopEvent(result="hello world")

w=MyWorkflow(timeout=10,verbose=False)
result=await w.run()
print(result)

hello world

以上工作流定义了开头步骤(StartEvent)和结束标志(StopEvent),这很重要
LlamaIndex 通过以下代码将工作流可视化,可视化内容存在 html,使用浏览器打开即可

1
2
from llama_index.utils.workflow import draw_all_possible_flows
draw_all_possible_flows(MyWorkflow, filename="basic_workflow.html")

2024-12-19_1054111

进一步地,我们通过定义不同的 Event,构建多级的工作流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
)

# 定义事件
class FirstEvent(Event):
first_output: str

class SecondEvent(Event):
second_output: str

# 定义工作流
class MyWorkflow(Workflow):
@step
async def step_one(self, ev: StartEvent) -> FirstEvent:
print(ev.first_input)
return FirstEvent(first_output="First step complete.")

@step
async def step_two(self, ev: FirstEvent) -> SecondEvent:
print(ev.first_output)
return SecondEvent(second_output="Second step complete.")

@step
async def step_three(self, ev: SecondEvent) -> StopEvent:
print(ev.second_output)
return StopEvent(result="Workflow complete.")

w = MyWorkflow(timeout=10, verbose=False)
result = await w.run(first_input="Start the workflow.")
print(result)

Start the workflow.
First step complete.
Second step complete.
Workflow complete.

总结

  • WorkFLow 是一个起点为 StartEvent,终点为 StopEvent 的流程
  • 通过定义不同的 Event,构建其多级的工作流
  • 工作流内支持变量的流转

分支循环工作流

以上例子,只展示了顺序执行的工作流,对于复杂的分支、循环工作流如何构建呢?还是一样,首先定义监听事件 Event,通过不同的指定方式来构建不同的工作流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 循环工作流
class LoopEvent(Event):
loop_output: str

# 定义工作流
import random
class LoopWorkflow(Workflow):
@step
async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
if random.randint(0, 1) == 0:
print("Bad thing happened")
return LoopEvent(loop_output="Back to step one.")
else:
print("Good thing happened")
return FirstEvent(first_output="First step complete.")

@step
async def step_two(self, ev: FirstEvent) -> SecondEvent:
print(ev.first_output)
return SecondEvent(second_output="Second step complete.")

@step
async def step_three(self, ev: SecondEvent) -> StopEvent:
print(ev.second_output)
return StopEvent(result="Workflow complete.")

w = LoopWorkflow(timeout=10, verbose=False)
result = await w.run()
print(result)

draw_all_possible_flows(LoopWorkflow, filename="loop_workflow.html")
1
2
3
4
5
6
7
8
9
10
11
12
13
Bad thing happened
Bad thing happened
Bad thing happened
Good thing happened
First step complete.
Second step complete.
Workflow complete.
<class 'NoneType'>
<class '__main__.FirstEvent'>
<class '__main__.LoopEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.SecondEvent'>
loop_workflow. Html

多次运行代码,可以发现代码工作流流程有两种:

  • A: one[FirstEvent]->two[SecondEvent]-three[StopEvent]
  • B: 随机次数 one [LoopEvent]-> A

也就是说:下一步执行那个动作,取决于当前返回事件和动作的监听事件,比如动作 two 监听 FirstEvent,只要当前返回事件是 FirstEvent,下一步就会执行 two,这一点和 MetaGPT 定义 WorkFlow 的概念类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 分支工作流
class BranchA1Event(Event):
payload: str

class BranchA2Event(Event):
payload: str

class BranchB1Event(Event):
payload: str

class BranchB2Event(Event):
payload: str

class BranchWorkflow(Workflow):
@step
async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
if random.randint(0, 1) == 0:
print("Go to branch A")
return BranchA1Event(payload="Branch A")
else:
print("Go to branch B")
return BranchB1Event(payload="Branch B")

@step
async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
print(ev.payload)
return BranchA2Event(payload=ev.payload)

@step
async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
print(ev.payload)
return BranchB2Event(payload=ev.payload)

@step
async def step_a2(self, ev: BranchA2Event) -> StopEvent:
print(ev.payload)
return StopEvent(result="Branch A complete.")

@step
async def step_b2(self, ev: BranchB2Event) -> StopEvent:
print(ev.payload)
return StopEvent(result="Branch B complete.")

w = BranchWorkflow(timeout=10, verbose=False)
result = await w.run()
print(result)

draw_all_possible_flows(BranchWorkflow, filename="branch_workflow.html")
1
2
3
4
5
6
7
8
9
10
11
12
Go to branch A
Branch A
Branch A
Branch A complete.
<class 'NoneType'>
<class '__main__.BranchA1Event'>
<class '__main__.BranchB1Event'>
<class '__main__.BranchA2Event'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.BranchB2Event'>
<class 'llama_index.core.workflow.events.StopEvent'>
branch_workflow. Html

工作流上下文

在以上例子中,可以看见变量状态随着流程进行传递,如果要在未直接连接的步骤之间传递数据,则需要通过两者之间的所有步骤传递数据。这会使您的代码更难阅读和维护
为了解决这个问题,为工作流引入一个全局可用的对象 Context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context,
)

class SetupEvent(Event):
query: str

class StepTwoEvent(Event):
query: str

class StatefulFlow(Workflow):
@step
async def start(
self, ctx: Context, ev: StartEvent
) -> SetupEvent | StepTwoEvent:
db = await ctx.get("some_database", default=None)
if db is None:
print("Need to load data")
return SetupEvent(query=ev.query)

# do something with the query
return StepTwoEvent(query=ev.query)

@step
async def setup(self, ctx: Context, ev: SetupEvent) -> StartEvent:
# load data
await ctx.set("some_database", [1, 2, 3])
return StartEvent(query=ev.query)

@step
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
# do something with the data
print("Data is ", await ctx.get("some_database"))

return StopEvent(result=await ctx.get("some_database"))

w = StatefulFlow(timeout=10, verbose=False)
result = await w.run(query="Some query")
print(result)

Need to load data
Data is [1, 2, 3]
[1, 2, 3]

流式处理事件

LlamaIndex 中的 WorkFlow 的 Context 支持传递事件,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import nest_asyncio
nest_asyncio.apply()

from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context,
)
import asyncio
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.llms.ollama import Ollama
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class FirstEvent(Event):
first_output: str

class SecondEvent(Event):
second_output: str
response: str

class ProgressEvent(Event):
msg: str

class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
return FirstEvent(first_output="First step complete.")

@step
async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
llm = Ollama(
model="qwen2.5:latest",
request_timeout=360.0,
base_url='http://localhost:11434')

generator = await llm.astream_complete(
"用中文给我讲2个笑话,每个笑话不超过15个字"
)
async for response in generator:
# Allow the workflow to stream this piece of response
ctx.write_event_to_stream(ProgressEvent(msg=response.delta))
return SecondEvent(
second_output="Second step complete, full response attached",
response=str(response),
)

@step
async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
return StopEvent(result="Workflow complete.")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def main():
w = MyWorkflow(timeout=30, verbose=True)
handler = w.run(first_input="Start the workflow.")

async for ev in handler.stream_events():
if isinstance(ev, ProgressEvent):
print(ev.msg)

final_result = await handler
print("Final result", final_result)

draw_all_possible_flows(MyWorkflow, filename="streaming_workflow.html")

if __name__ == "__main__":
asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
Running step step_one
Step step_one produced event FirstEvent
Running step step_two
Step one is happening
1
.


做饭







一下


2
.

why
did
the
tomato
turn
red
?
Because
it
saw
the
salad
dressing
!
Step step_two produced event SecondEvent
Running step step_three
Step step_three produced event StopEvent
Step three is happening
Final result Workflow complete.
\<class 'NoneType'>
\<class '__main__.FirstEvent'>
\<class 'llama_index.core.workflow.events.StopEvent'>
\<class '__main__.SecondEvent'>
streaming_workflow.html

工作流的并发执行

Llamaindex 的工作流可以被并发执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import random

# 一次性发出多个事件
class StepTwoEvent(Event):
query: str

class StepTwoEvent(Event):
query: str

class ParallelFlow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
ctx.send_event(StepTwoEvent(query="Query 1"))
ctx.send_event(StepTwoEvent(query="Query 2"))
ctx.send_event(StepTwoEvent(query="Query 3"))

@step(num_workers=4)
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
print("Running slow query ", ev.query)
await asyncio.sleep(random.randint(1, 5))

return StopEvent(result=ev.query)
1
2
3
4
5
6
7
8
Async def main ():
W = ParallelFlow (timeout=30, verbose=True)
await w.run ()

Draw_all_possible_flows (ParallelFlow, filename="parallel_workflow. Html")

If __name__ == "__main__":
Asyncio.Run (main ())
1
2
3
4
5
6
7
8
9
10
11
12
13
Running step start
Step start produced no event
Running step step_two
Running slow query Query 1
Running step step_two
Running slow query Query 2
Running step step_two
Running slow query Query 3
Step step_two produced event StopEvent
<class 'NoneType'>
<class '__main__.StepTwoEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
Parallel_workflow. Html

以上流程,step_two 每次接受一个结果就执行,这在某些情况下适合,在某些情况下需要等待所有调用完成再执行,此时通过 collect_events 来判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Class StepThreeEvent (Event):
Result: str

Class ConcurrentFlow (Workflow):
@step
Async def start (self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
Ctx. Send_event (StepTwoEvent (query="Query 1"))
Ctx. Send_event (StepTwoEvent (query="Query 2"))
Ctx. Send_event (StepTwoEvent (query="Query 3"))

@step (num_workers=4)
Async def step_two (self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
Print ("Running query ", ev. Query)
Await asyncio.Sleep (random.Randint (1, 5))
Return StepThreeEvent (result=ev. Query)

@step
Async def step_three (self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
# wait until we receive 3 events
Result = ctx. Collect_events (ev, [StepThreeEvent] * 3)
If result is None:
Return None

# do something with all 3 results together
Print ('result: ', result)
Return StopEvent (result="Done")

Async def main ():
W = ConcurrentFlow (timeout=30, verbose=True)
await w.run ()

Draw_all_possible_flows (ConcurrentFlow, filename="concurrent_workflow. Html")

If __name__ == "__main__":
Asyncio.Run (main ())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Running step start
Step start produced no event
Running step step_two
Running query Query 1
Running step step_two
Running query Query 2
Running step step_two
Running query Query 3
Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Running step step_three
Step step_three produced no event
Running step step_three
Step step_three produced no event
Step step_two produced event StepThreeEvent
Running step step_three
Result: [StepThreeEvent (result='Query 1'), StepThreeEvent (result='Query 2'), StepThreeEvent (result='Query 3')]
Step step_three produced event StopEvent
<class 'NoneType'>
<class '__main__.StepTwoEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.StepThreeEvent'>
Concurrent_workflow. Html

以上过程是等待同类型的 3 个事件,其实也可以等待不同类型的事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
Class StepAEvent (Event):
Query: str

Class StepBEvent (Event):
Query: str

Class StepCEvent (Event):
Query: str

Class StepACompleteEvent (Event):
Result: str

Class StepBCompleteEvent (Event):
Result: str

Class StepCCompleteEvent (Event):
Result: str

Class ConcurrentFlow 2 (Workflow):
@step
Async def start (
Self, ctx: Context, ev: StartEvent
) -> StepAEvent | StepBEvent | StepCEvent:
Ctx. Send_event (StepAEvent (query="Query 1"))
Ctx. Send_event (StepBEvent (query="Query 2"))
Ctx. Send_event (StepCEvent (query="Query 3"))

@step
Async def step_a (self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
Print ("Doing something A-ish")
Return StepACompleteEvent (result=ev. Query)

@step
Async def step_b (self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
Print ("Doing something B-ish")
Return StepBCompleteEvent (result=ev. Query)

@step
Async def step_c (self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
Print ("Doing something C-ish")
Return StepCCompleteEvent (result=ev. Query)

@step
Async def step_three (
Self,
Ctx: Context,
Ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
) -> StopEvent:
Print ("Received event ", ev. Result)

# wait until we receive 3 events
If (
Ctx. Collect_events (
Ev,
[StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
)
Is None
):
Return None

# do something with all 3 results together
Return StopEvent (result="Done")

Async def main ():
W = ConcurrentFlow 2 (timeout=30, verbose=True)
await w.run ()

Draw_all_possible_flows (ConcurrentFlow 2, filename="concurrent 2_workflow. Html")

If __name__ == "__main__":
Asyncio.Run (main ())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Running step start
Step start produced no event
Running step step_a
Doing something A-ish
Step step_a produced event StepACompleteEvent
Running step step_b
Doing something B-ish
Step step_b produced event StepBCompleteEvent
Running step step_c
Doing something C-ish
Step step_c produced event StepCCompleteEvent
Running step step_three
Received event Query 1
Step step_three produced no event
Running step step_three
Received event Query 2
Step step_three produced no event
Running step step_three
Received event Query 3
Step step_three produced event StopEvent
<class 'NoneType'>
<class '__main__.StepAEvent'>
<class '__main__.StepBEvent'>
<class '__main__.StepCEvent'>
<class '__main__.StepACompleteEvent'>
<class '__main__.StepBCompleteEvent'>
<class '__main__.StepCCompleteEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
concurrent 2_workflow. Html

继承自定义工作流类

自定义工作流是继承 WorkFlow 类,其实自定义的工作流也是可以作为其他类的基类被继承,相当于实现了一个基础工作流,继承这个工作流再实现一个更加复杂的子类工作流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
From llama_index. Core. Workflow import (
StartEvent,
StopEvent,
Workflow,
Step,
Event,
Context,
)

Class Step 2 Event (Event):
Query: str

Class Step 3 Event (Event):
Query: str

Class MainWorkflow (Workflow):
@step
Async def start (self, ev: StartEvent) -> Step 2 Event:
Print ("Starting up")
Return Step 2 Event (query=ev. Query)

@step
Async def step_two (self, ev: Step 2 Event) -> Step 3 Event:
Print ("Sending an email")
Return Step 3 Event (query=ev. Query)

@step
Async def step_three (self, ev: Step 3 Event) -> StopEvent:
Print ("Finishing up")
Return StopEvent (result=ev. Query)

W = MainWorkflow (timeout=10, verbose=False)
result = await w.run (query="Initial query")
Print (result)
Starting up
Sending an email
Finishing up
Initial query
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Class Step 2 BEvent (Event):
Query: str

Class CustomWorkflow (MainWorkflow):
@step
Async def step_two (self, ev: Step 2 Event) -> Step 2 BEvent:
Print ("Sending an email")
Return Step 2 BEvent (query=ev. Query)

@step
Async def step_two_b (self, ev: Step 2 BEvent) -> Step 3 Event:
Print ("Also sending a text message")
Return Step 3 Event (query=ev. Query)

W = CustomWorkflow (timeout=10, verbose=False)
result = await w.run (query="Initial query")
Print (result)

Starting up
Sending an email
Also sending a text message
Finishing up
Initial query

嵌套工作流

将某个工作流嵌入到已存在的工作流中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
From llama_index. Core. Workflow import (
StartEvent,
StopEvent,
Workflow,
Step,
Event,
Context,
)
From llama_index. Utils. Workflow import draw_all_possible_flows

Class Step 2 Event (Event):
Query: str

Class MainWorkflow (Workflow):
@step
Async def start (
Self, ctx: Context, ev: StartEvent, reflection_workflow: Workflow
) -> Step 2 Event:
Print ("Need to run reflection")
Res = await reflection_workflow.Run (query=ev. Query)

Return Step 2 Event (query=res)

@step
Async def step_two (self, ctx: Context, ev: Step 2 Event) -> StopEvent:
Print ("Query is ", ev. Query)
# do something with the query here
Return StopEvent (result=ev. Query)

Class ReflectionFlow (Workflow):
@step
Async def sub_start (self, ctx: Context, ev: StartEvent) -> StopEvent:
Print ("Doing custom reflection")
Return StopEvent (result="Improved query")

W = MainWorkflow (timeout=10, verbose=False)
w.add_workflows (reflection_workflow=ReflectionFlow ())
result = await w.run (query="Initial query")
Print (result)

Need to run reflection
Doing custom reflection
Query is Improved query
Improved query