TaskGraph – Runtime Graph Representation¶
TaskGraph is the runtime representation of a graph in AetherGraph. It combines:
- A structural spec (
TaskGraphSpec) – nodes, dependencies, metadata. - A mutable state (
TaskGraphState) – node statuses, outputs, patches. - Ephemeral runtime nodes (
TaskNodeRuntime) – convenience wrappers used by the scheduler and tools.
This page documents the most commonly used APIs on TaskGraph.
You typically will not directly use
TaskGraphmethod except for inspection. Using@graph_fnandgraphifyto create graph is preferred.
1. Construction & Core Attributes¶
Classmethods¶
TaskGraph.new_run(spec: TaskGraphSpec, *, run_id: str | None = None, **kwargs) -> TaskGraph
TaskGraph.from_spec(spec: TaskGraphSpec, *, state: TaskGraphState | None = None) -> TaskGraph
new_run(...)– convenience to create a fresh run with a newrun_idand an emptyTaskGraphState(all nodes start inPENDINGexcept the inputs node, which is set toDONE).from_spec(...)– construct aTaskGraphfrom an existing spec and optional state (used for resuming or inspecting previous runs).
Key attributes
graph.spec: TaskGraphSpec– structural definition.graph.state: TaskGraphState– statuses, outputs, patches, bound inputs.graph.graph_id: str– alias forspec.graph_id.graph.nodes: list[TaskNodeRuntime]– list of runtime node wrappers.graph._runtime_nodes: dict[str, TaskNodeRuntime]– internal node table (id → runtime node).
Typical construction: new run vs resume
# New run from a spec
spec = ... # TaskGraphSpec from graphify / storage
G = TaskGraph.new_run(spec)
# Resume with existing state
state = ... # TaskGraphState loaded from storage
G_resumed = TaskGraph.from_spec(spec, state=state)
You rarely instantiate TaskGraph directly; use new_run or from_spec (or runner helpers) instead.
2. Node Access & Selection¶
Direct access¶
node(self, node_id: str) -> TaskNodeRuntime
@property
nodes(self) -> list[TaskNodeRuntime]
node_ids(self) -> list[str]
get_by_id(self, node_id: str) -> str
node(node_id)– get the runtime node (raises if not found).nodes– list of all runtime nodes.node_ids()– list of node IDs.get_by_id()– returns the same ID or raises if missing (useful when normalizing selectors).
Indexed finders¶
get_by_alias(alias: str) -> str
find_by_label(label: str) -> list[str]
find_by_logic(logic_prefix: str, *, first: bool = False) -> list[str] | str | None
find_by_display(name_prefix: str, *, first: bool = False) -> list[str] | str | None
These use metadata created at build time (e.g., via call_tool(..., _alias=..., _labels=[...], _name=...)).
get_by_alias("sum1")→ node id for aliassum1orKeyError.find_by_label("critical")→ all node ids tagged with that label.find_by_logic("tool_name")→ nodes whose logic name equals or starts withtool_name.find_by_display("My Step")→ nodes whose display name equals or starts with"My Step".
Unified selector DSL¶
select(selector: str, *, first: bool = False) -> str | list[str] | None
pick_one(selector: str) -> str
pick_all(selector: str) -> list[str]
Selector mini‑DSL:
"@alias"→ by alias."#label"→ by label (may return many)."id:<id>"→ exact id."logic:<prefix>"→ logic name prefix."name:<prefix>"→ display name prefix."/regex/"→ regex onnode_id.- anything else → prefix match on
node_id.
Selector examples
# Single node by alias
target_id = graph.pick_one("@sum1")
# All nodes with a label
critical_ids = graph.pick_all("#critical")
# First node whose logic name starts with "normalize_"
nid = graph.select("logic:normalize_", first=True)
# Regex on node id
debug_nodes = graph.pick_all("/debug_.*/")
Use selectors when building debug tooling, partial resets, or visualization filters.
3. Read‑only Views¶
view(self) -> GraphView
list_nodes(self, exclude_internal: bool = True) -> list[str]
-
view()– returns aGraphViewwith: -
graph_id, nodes(specs),node_status(derived map: node id →NodeStatus),metadata.list_nodes(exclude_internal=True)– list node ids, optionally excluding internal nodes (ids starting with_).
Inspecting a graph view
v = graph.view()
print(v.graph_id)
print(v.node_status) # {"node_1": NodeStatus.DONE, ...}
GraphView is a snapshot for inspection / APIs; it does not expose mutation methods.
4. Graph Mutation (Patches)¶
Dynamic graph edits are represented as patches in TaskGraphState.
patch_add_or_replace_node(node_spec: dict[str, Any]) -> None
patch_remove_node(node_id: str) -> None
patch_add_dependency(node_id: str, dependency_id: str) -> None
patch_add_or_replace_node– add a new node or replace an existing one (payload is a plain dict convertible toTaskNodeSpec).patch_remove_node– remove a node by id.patch_add_dependency– add a new dependency edge.
Each method:
- appends a
GraphPatchentry tostate.patchesand incrementsstate.rev, - notifies observers via
on_patch_applied, - rebuilds
_runtime_nodesfor the effective view.
These APIs are intended for advanced dynamic graph editing and patch flows; many users won’t need them directly.
5. Topology & Subgraphs¶
dependents(node_id: str) -> list[str]
topological_order() -> list[str]
get_subgraph_nodes(start_node_id: str) -> list[str]
get_upstream_nodes(start_node_id: str) -> list[str]
dependents(nid)– all nodes that listnidas a dependency.topological_order()– a topological sort of all nodes (raises on cycles).get_subgraph_nodes(start)–startplus all nodes reachable downstream (dependents).get_upstream_nodes(start)–startplus all nodes it depends on (upstream).
Working with subgraphs
# All nodes that can be affected if you change `node_a`
forward = graph.get_subgraph_nodes("node_a")
# All nodes that must run before `node_b`
upstream = graph.get_upstream_nodes("node_b")
These helpers are typically used for partial reset, impact analysis, or visualization filters.
6. State Mutation & Reset¶
async def set_node_status(self, node_id: str, status: NodeStatus) -> None
async def set_node_outputs(self, node_id: str, outputs: dict[str, Any]) -> None
async def reset_node(self, node_id: str, *, preserve_outputs: bool = False)
async def reset(
self,
node_ids: list[str] | None = None,
*,
recursive: bool = True,
direction: str = "forward",
preserve_outputs: bool = False,
) -> dict[str, Any]
set_node_status– update a node’s status and notify observers (on_node_status_change).set_node_outputs– update a node’s outputs and notify observers (on_node_output_change).reset_node– reset a single node toPENDING, optionally keeping outputs.-
reset– reset all or part of the graph: -
node_ids=None→ reset all nodes (except the synthetic inputs node). recursive=True, direction="forward"→ also reset all dependents.recursive=True, direction="backward"→ reset upstream dependencies.
Partial reset patterns
# Reset a node and everything that depends on it
await graph.reset(node_ids=["step_3"], recursive=True, direction="forward")
# Reset only a single node, keeping its outputs
await graph.reset(node_ids=["step_3"], recursive=False, preserve_outputs=True)
# Reset entire graph (except inputs)
await graph.reset(node_ids=None)
These methods are used by runners / UIs to implement retry, rerun from here, and what-if operations.
7. IO Definition & Binding¶
IO APIs¶
declare_inputs(
*,
required: Iterable[str] | None = None,
optional: dict[str, Any] | None = None,
) -> None
expose(name: str, value: Ref | Any) -> None
require_outputs(*names: str) -> None
io_signature(include_values: bool = False) -> dict[str, Any]
-
declare_inputs(...)– declares graph-level inputs: -
required– names that must be provided when binding inputs. optional– names with default values (modeled viaParamSpec).-
expose(name, value)– declare a graph output: -
valuecan be a Ref (to node outputs or inputs) or a literal. require_outputs(...)– sanity check for required outputs (uses internal_io_outputs).-
io_signature(include_values=False)– summarized IO description: -
inputs.required/inputs.optional(names and defaults). outputs.keys– names of exposed outputs.outputs.bindings– raw bindings (Refs or literals).outputs.values– optional resolved values (wheninclude_values=True).
Binding of actual input values happens via the runner, which calls the internal
_validate_and_bind_inputs(...)helper.
Inspect IO signature
sig = graph.io_signature()
print(sig["inputs"]["required"])
print(sig["outputs"]["keys"])
# After a run, you can inspect resolved output values
full = graph.io_signature(include_values=True)
print(full["outputs"]["values"])
The IO signature is useful for APIs, UIs, and tooling that needs to describe how to call a graph without inspecting internals.
8. Observers & Notifications¶
add_observer(observer: Any) -> None
Observers are objects that can implement any of the following methods:
on_node_status_change(runtime_node)on_node_output_change(runtime_node)on_inputs_bound(graph)on_patch_applied(graph, patch)
They are invoked whenever the corresponding events occur.
Lightweight observer usage
class PrintObserver:
def on_node_status_change(self, node):
print("status", node.node_id, node.state.status)
graph.add_observer(PrintObserver())
Observers are the main extension point for logging, metrics, and live UI updates.
9. Diff & Persistence Helpers¶
Diffing¶
diff(other: TaskGraph) -> dict[str, Any]
- Compare two graphs with the same
graph_id. -
Returns a dict with:
-
"added": list of node ids present only inother. "removed": list of node ids present only inself."modified": node ids whose dependencies or metadata differ.
Basic diff usage
d = graph_v2.diff(graph_v1)
print("added", d["added"])
print("modified", d["modified"])
Useful for visualizing evolution, reviewing patches, or migration tooling.
Spec serialization¶
spec_json(self) -> dict[str, Any]
- Returns a JSON‑safe representation of the spec (
TaskGraphSpec) using_dataclass_to_plain. - Storage/layout is left to callers (file, DB, etc.).
10. Debug & Visualization¶
Human‑readable summary¶
pretty(self, *, max_nodes: int = 20, max_width: int = 100) -> str
__str__(self) -> str
-
pretty(...)– a compact, human‑friendly summary including: -
graph id, node count, observer count;
- IO signature summary;
- state summary;
- a small table of nodes with id, type, status, dependencies count, and logic.
__str__– usespretty(max_nodes=12, max_width=96)forprint(graph).
Quick debug print
print(graph) # uses __str__
print(graph.pretty()) # full summary
This is the fastest way to get an overview of a graph in a REPL or log.
Visualization helpers¶
At the bottom of the module, these are attached as methods:
TaskGraph.to_dot = to_dot
TaskGraph.visualize = visualize
TaskGraph.ascii_overview = ascii_overview
graph.to_dot(...)– export a DOT representation.graph.visualize(...)– high‑level helper for rich visualizations (see Visualization docs).graph.ascii_overview(...)– ASCII summary for terminals / logs.
High‑level usage (shape only)
dot_str = graph.to_dot()
print(graph.ascii_overview())
# graph.visualize(...) # see visualization docs for options
Exact options and rendering backends are described on the Visualization page.
11. Summary¶
TaskGraphties together spec, state, and runtime node table.- Use
new_run/from_specto construct graphs; use selectors (pick_one,pick_all) to locate nodes. - IO is declared via
declare_inputs/exposeand inspected viaio_signature. - Topology helpers (
dependents,get_subgraph_nodes,get_upstream_nodes) support partial reset and analysis. - State mutation APIs (
set_node_status,set_node_outputs,reset) underpin runners and interactive tooling. - Patches, diff, observers, and visualization helpers are advanced tools for dynamic graphs, UIs, and diagnostics.