Concurrency, Fan‑In/Fan‑Out & Graph‑Level Orchestration¶
AetherGraph provides Python‑first concurrency that works from reactive agents to scheduled DAGs. You can orchestrate parallelism naturally in Python, while the runtime enforces safe scheduling and per‑run concurrency caps.
1. @graph_fn — Pythonic Concurrency for Reactive Agents¶
@graph_fn functions execute through normal Python async semantics. Plain Python awaits run directly on the event loop, while any @tool calls inside a @graph_fn become implicit nodes managed by the agent’s internal scheduler.
Example: bounded fan‑out using a semaphore
import asyncio
from aethergraph import graph_fn
sem = asyncio.Semaphore(4) # cap concurrent jobs (user-managed)
async def run_capped(fn, **kw):
async with sem:
return await fn(**kw)
@graph_fn(name="batch_agent")
async def batch_agent(items: list[str], *, context):
async def one(x):
await context.channel().send_text(f"processing {x}")
return {"y": x.upper()}
# fan‑out with manual cap
tasks = [run_capped(one, x=v) for v in items]
results = await asyncio.gather(*tasks)
# fan‑in
return {"ys": [r["y"] for r in results]}
Notes:
- Plain Python steps execute immediately — not capped by the scheduler.
@toolcalls are scheduled and counted toward the agent’s concurrency cap throughmax_concurrency(default = 4).- You can override per‑run limits by passing
max_concurrency=<int>torun()orrun_async()or usegraph_fn(.., max_concurrency=<int>). - For nested or composed agents, effective concurrency multiplies; use semaphores or pools to control load.
- Ideal for reactive, exploratory agents or mixed I/O + compute logic.
2. @graphify — Scheduler‑Controlled Static DAGs¶
In static DAGs built with @graphify, every @tool call becomes a node in a TaskGraph. Concurrency is automatically managed by the runtime scheduler, respecting per‑run limits.
Minimal fan‑in/fan‑out example:
from aethergraph import graphify, tool
@tool(outputs=["result"])
async def pick(items: list[int], index: int):
return {"result": items[index]}
@tool(outputs=["out"])
async def work(x: int):
print(f"Working on {x}...")
return {"out": x * 2}
@tool(outputs=["sum"])
async def reduce_sum(xs: list[int]):
return {"sum": sum(xs)}
@graphify(name="map_reduce", inputs=["vals"], outputs=["sum"])
def map_reduce(vals):
results = [pick(items=vals, index=i) for i in range(len(vals))] # We need use a tool to extract values as vals is a ref not a list
outs = [work(x=v.result) for v in results] # fan‑out
total = reduce_sum(xs=[o.out for o in outs]) # fan‑in
return {"sum": total.sum}
Key points:
- The scheduler enforces
max_concurrencyautomatically (default = 4). - You can override per‑run limits by passing
max_concurrency=<int>torun(), orrun_async(). - Static DAG concurrency is global and consistent across all tool nodes.
- Each node runs once dependencies resolve; no explicit
awaitis required.
3. Graph‑Level Orchestration Patterns¶
All orchestration in AetherGraph is just Python. You can run sequentially or concurrently using standard async primitives.
A) Sequential orchestration (plain Python)¶
res1 = await graph_fn1(a=1, max_concurrency=N) # graph-level concurrency
res2 = await graph_fn2(b=2, max_concurrency=N)
B) Concurrent graph_fn runs (async‑friendly)¶
res1, res2 = await asyncio.gather(
graph_fn1(a=1, max_concurrency=N),
graph_fn2(b=2, max_concurrency=N),
)
C) Concurrent graph runner (works for both graph_fn and graphify)¶
from aethergraph.runner import run_async
res1, res2 = await asyncio.gather(
run_async(graph1, inputs={"a": 1}, max_concurrency=8),
run_async(graph2, inputs={"b": 2}, max_concurrency=2),
)
Default concurrency for each graph is 4, but you can override it per call with
max_concurrencyin eitherrun()orrun_async(). Becareful of global concurrency limit. Use semaphores or pools to control load. Do not userunner.run()for concurrent graph runs.
4. Concurrency Comparison¶
| Aspect | @graph_fn (Reactive) |
@graphify (Static) |
|---|---|---|
| Concurrency Control | Automatic via scheduler (max_concurrency) |
Automatic via scheduler (max_concurrency) |
| Default Limit | Default 4 per run, multiply with nested calls | Default 4 per run |
| Plain Python Awaitables | Run immediately, outside scheduler | Not applicable (only tool nodes) |
| Nested Calls | Supported | Not yet supported |
| Failure Behavior | Caught at runtime; user decides | Scheduler stops on first error (configurable) |
| Use Case | Agents, exploration, hybrid control | Pipelines, batch workflows, reproducible DAGs |
Takeaways¶
- Reactive vs Deterministic:
graph_fnfor interactive exploration;graphifyfor reproducible pipelines. - Fan‑In/Fan‑Out: Async patterns in
graph_fn; data edges ingraphify. - Concurrency Control: Default cap = 4; override per run with
max_concurrency. - Scalability: Local schedulers per agent; a global scheduler orchestrates multiple runs.
- Everything is Python: The runtime extends standard async execution into persistent, inspectable DAG scheduling.