-
Notifications
You must be signed in to change notification settings - Fork 176
Expand file tree
/
Copy path11-triggerflow-04_data_flow.py
More file actions
62 lines (51 loc) · 2.03 KB
/
Copy path11-triggerflow-04_data_flow.py
File metadata and controls
62 lines (51 loc) · 2.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData
async def triggerflow_state_flow():
flow = TriggerFlow(name="step-04-state-flow")
async def prepare_user(data: TriggerFlowRuntimeData):
await data.async_set_state("user", {"id": "u-001", "role": "admin"})
return data.input
async def prepare_env(data: TriggerFlowRuntimeData):
await data.async_set_state("env", {"name": "prod"})
return data.input
async def summarize(data: TriggerFlowRuntimeData):
await data.async_set_state(
"summary",
{
"input": data.input,
"user": data.get_state("user"),
"env": data.get_state("env"),
},
)
flow.to(prepare_user).to(prepare_env).to(summarize)
execution = flow.create_execution()
await execution.async_start("deploy")
state = await execution.async_close()
assert state["summary"]["user"]["id"] == "u-001"
print(state["summary"])
if __name__ == "__main__":
asyncio.run(triggerflow_state_flow())
# Expected output:
# {'input': 'deploy', 'user': {'id': 'u-001', 'role': 'admin'}, 'env': {'name': 'prod'}}
#
# How it works:
# State is a flat dict shared across the entire execution. Any chunk can read values
# written by earlier chunks via data.get_state("key"). prepare_user and prepare_env
# each write one key, then summarize reads both and assembles a combined snapshot.
# This pattern avoids threading intermediate results through return values when multiple
# upstream chunks each contribute one piece of context.
#
# Flow:
# async_start("deploy")
# |
# v
# prepare_user -> state["user"] = {"id": "u-001", "role": "admin"} (returns data.input)
# |
# v
# prepare_env -> state["env"] = {"name": "prod"} (returns data.input)
# |
# v
# summarize -> reads state["user"] + state["env"] via data.get_state()
# state["summary"] = {"input": "deploy", "user": {...}, "env": {...}}
# |
# async_close() -> prints state["summary"]