Tutorial 2: Static Graphs with @graphify¶
@graphify turns a plain Python function into a graph builder. Instead of executing immediately (like @graph_fn), it builds a deterministic TaskGraph from @tool calls — a DAG you can inspect, persist, and run later.
đź§ Mental Model¶
@graph_fn→ executes now (reactive, context‑rich)
@graphify→ builds first, runs later (deterministic DAG)
- Each
@toolcall becomes a node in the DAG. - Edges are formed by data flow and optional ordering via
_after=[…]. - You get reproducibility, inspectability, and clean fan‑in/fan‑out.
Note: Access runtime services (
channel,llm,memory) through tools in static graphs. If you need directcontext.*calls inline, use@graph_fn.
1. Key Rules (short)¶
- Only
@toolcalls are allowed as steps in a@graphifybuilder. Use plain Python only to wire values or format the graph (no side‑effects); such code will not appear as nodes. - Build ≠Run. Calling a
@graphifyfunction returns a TaskGraph. Use a runner to execute it. - Async supported. Tools can be sync or async; the runner provides both sync and async entry points.
- Resumption requires stable IDs. Give important nodes a fixed
*_idand reuse the samerun_idwhen resuming. - Outputs: Return a dict of JSON‑serializable values for resumption. Large/binary data → save via
artifacts()and return a reference. (Full rules live in the API page.)
Related:
@graph_fncan also emit an implicit graph when you call@tools inside it. Use_afterto enforce ordering there too, and inspect the last run’s captured graph withgraph_fn.last_graph.
2. Shapes (tools & graphify)¶
@tool shape (suggested)¶
from aethergraph import tool
@tool(name="load_csv", outputs=["rows"]) # names become handle fields. Name is optional running locally
def load_csv(path: str) -> dict: # return dict matching outputs
# ... load and parse ...
return {"rows": rows}
@toolcan be sync or async, they all run as async internally.- Declare
outputs=[...]. Returned dict must contain those keys.
@graphify shape (suggested)¶
from aethergraph import graphify
@graphify(name="etl", inputs=["csv_path"], outputs=["nrows"]) # declarative I/O
def etl(csv_path: str):
raw = load_csv(path=csv_path) # node
# ... add more tool calls ...
return {"nrows": len(raw.rows)} # JSON-serializable outputs
graphifyis always a sync function. Noawaitallowed inside the builder.- Use
_after=...to force ordering when no data edge exists. - Calling
etl()builds aTaskGraph; it does not run. - Run using
run(...)/run_async(...)withinputs={...}.
3. Minimal Example — Build → Run¶
from aethergraph import graphify, tool
from aethergraph.runner import run
@tool(outputs=["doubled"])
def double(x: int) -> dict:
return {"doubled": x * 2}
@tool(outputs=["shifted"])
def add_ten(x: int) -> dict:
return {"shifted": x + 10}
@graphify(name="tiny_pipeline", inputs=["x"], outputs=["y"])
def tiny_pipeline(x: int):
a = double(x=x) # node A
b = add_ten(x=a.doubled) # node B depends on A via data edge
return {"y": b.shifted}
# Build (no execution yet)
G = tiny_pipeline() # → TaskGraph
# Run (sync helper, useful in Jupyter notebook)
result = run(G, inputs={"x": 7})
print(result) # {'y': 24}
Try
max_concurrency=1vs>1in the runner if your tools are async and parallelizable.
4. Ordering Without Data Edges — _after¶
@tool(outputs=["ok"])
def fetch() -> dict: ...
@tool(outputs=["done"])
def train() -> dict: ...
@graphify(name="seq", inputs=[], outputs=["done"])
def seq():
a = fetch()
b = train(_after=a) # force run-after without wiring data
return {"done": b.done}
- Use a single node or a list
_after=[a, b].
5. Resume a Run — Stable _id + run_id¶
Resumption lets you continue a partially-completed graph without redoing finished nodes. This is useful for flaky I/O or long pipelines.
- Assign deterministic IDs to nodes with
_id="..."in your tools. - Reuse the same
run_idwhen invoking the runner again. - Indefinite waits (e.g., human input) are supported via dedicated wait tools and are covered in the Channels & Wait Models tutorial—this section uses a non‑channel example.
from aethergraph import graphify, tool
from aethergraph.core.runtime.graph_runner import run_async
import random
@tool(outputs=["ok"])
def prepare() -> dict:
# Pretend to set up workspace/artifacts
return {"ok": True}
@tool(outputs=["value"])
def flaky_compute(x: int) -> dict:
# Simulate a transient failure half the time
if random.random() < 0.5:
raise RuntimeError("transient error — try resuming")
return {"value": x * 2}
@tool(outputs=["ok"])
def finalize(v: int) -> dict:
# Commit final result (e.g., write an artifact)
return {"ok": True}
@graphify(name="resumable_pipeline", inputs=["x"], outputs=["y"])
def resumable_pipeline(x: int):
s1 = prepare(_id="prepare_1")
s2 = flaky_compute(x=x, _after=s1, _id="flaky_2") # may fail on first run
s3 = finalize(v=s2.value, _after=s2, _id="finalize_3")
return {"y": s2.value}
# First run may fail while computing 'flaky_2'...
# await run_async(resumable_pipeline(), inputs={"x": 21}, run_id="run-abc")
# Re-run with the SAME run_id to resume from the failed node (prepare_1 is skipped):
# await run_async(resumable_pipeline(), inputs={"x": 21}, run_id="run-abc")
Keep
_ids stable to allow the engine to match nodes. If a node fails or is interrupted, resuming with the samerun_idwill continue from the last successful checkpoint.Use json-serializable output in
@toolso that Aethergraph can reload previous outputs; otherwise resumption may fail.
6. Inspect Before/After Running¶
Once you have a TaskGraph (e.g., G = tiny_pipeline()), you can:
print(G.pretty()) # readable node table
print(G.ascii_overview()) # compact topology
print(G.topological_order())
# Graph metadata
sig = tiny_pipeline.io() # declared inputs/outputs
spec = tiny_pipeline.spec() # full GraphSpec (nodes, edges, metadata)
# Export (if enabled)
dot = G.to_dot() # Graphviz DOT text
# G.visualize() # render to image if your env supports it
7. Practical Tips¶
- Keep nodes small and typed: expose clear outputs (e.g.,
outputs=["clean"]). - Use JSON‑serializable returns; store big/binary as artifacts.
- Prefer
_afterfor control edges instead of fake data plumb‑through. - No nested static graphs (don’t call one
@graphifyfrom another). Use tools or run graphs separately. - Async tools +
max_concurrencyunlock parallel speedups.
8. Summary¶
@graphifymaterializes a static DAG from@toolcalls.- Build with the function call; run with the runner (sync or async).
- For resumption, use stable
_idper node and replay with the samerun_id. - Inspect graphs via
pretty(),ascii_overview(),.io(),.spec(), andto_dot().
Use
@graphifyfor pipelines and reproducible experiments; stick with@graph_fnfor interactive, context‑heavy agents.