Launching Agent Runs with the context Method¶
Aethergraph provides a flexible interface for managing agent runs through the NodeContext.runner() object. This approach goes beyond simple nested calls to graph_fn, allowing you to spawn, monitor, and control runs with fine-grained operations. The runner() method is designed for general use with any agent, and it supplies metadata that supports seamless integration with the AG web UI.
Async Run¶
You can initiate an agent run asynchronously using the spawn_run method. This "fire and forget" approach lets you start a run without waiting for its completion.
spawn_run(graph_id, *, inputs, session_id, tags, ...)
Submit a child run and return immediately with its run id.
This method calls run_manager.submit_run(...) and applies defaults from
this facade when optional arguments are not provided.
Examples:
Spawn with default context identity/session:
run_id = await runner().spawn_run(
"my-graph",
inputs={"prompt": "hello"},
)
Spawn with explicit metadata overrides:
run_id = await runner().spawn_run(
"my-graph",
inputs={"payload": {"a": 1}},
tags=["batch", "priority"],
visibility=RunVisibility.inline,
agent_id="agent-123",
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_id
|
str
|
Registered graph identifier to execute. |
required |
inputs
|
dict[str, Any]
|
Graph input payload for the child run. |
required |
session_id
|
str | None
|
Optional session id override. |
None
|
tags
|
list[str] | None
|
Optional run tags. |
None
|
visibility
|
RunVisibility | None
|
Optional visibility override. |
None
|
origin
|
RunOrigin | None
|
Optional origin override. |
None
|
importance
|
RunImportance | None
|
Optional importance override. |
None
|
agent_id
|
str | None
|
Optional agent id override. |
None
|
app_id
|
str | None
|
Optional app id override. |
None
|
run_id
|
str | None
|
Optional explicit run id. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Created child run id. |
Notes
If origin is not provided, it defaults to agent when an effective
agent id exists; otherwise app.
To monitor the progress or completion of an asynchronous run, use the wait_run method. This allows you to block until the run finishes or a timeout occurs.
context.wait_run(run_id, *, timeout_s)
Wait for a run to reach a terminal state.
This method delegates to run_manager.wait_run(...).
Examples:
Wait for a run record:
record = await runner().wait_run(run_id)
Wait and also collect outputs:
record, outputs = await runner().wait_run(
run_id,
timeout_s=30,
return_outputs=True,
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Run identifier to wait on. |
required |
timeout_s
|
float | None
|
Optional timeout in seconds. |
None
|
return_outputs
|
bool
|
If true, return |
False
|
Returns:
| Type | Description |
|---|---|
RunRecord | tuple[RunRecord, dict[str, Any] | None]
|
RunRecord | tuple[RunRecord, dict[str, Any] | None]:
Final run record, or |
Notes
Output availability depends on in-process execution context.
Kick off a Run and Wait¶
If you need to start a run and wait for its result in a single step, the context.run_and_wait method provides a convenient solution. Note this method blocks the process and does not honor the concurrency limit.
run_and_wait(graph_id, *, inputs, session_id, tags, ...)
Run a child graph as a tracked run and wait for completion.
This method delegates to run_manager.run_and_wait(...) and returns the
completed run id plus execution outputs and wait metadata.
Examples:
Wait for a child graph:
run_id, outputs, has_waits, continuations = await runner().run_and_wait(
"my-graph",
inputs={"x": 1},
)
Wait with explicit run metadata:
run_id, outputs, has_waits, continuations = await runner().run_and_wait(
"my-graph",
inputs={"x": 1},
tags=["child"],
run_id="run-custom-001",
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_id
|
str
|
Registered graph identifier to execute. |
required |
inputs
|
dict[str, Any]
|
Graph input payload for the child run. |
required |
session_id
|
str | None
|
Optional session id override. |
None
|
tags
|
list[str] | None
|
Optional run tags. |
None
|
visibility
|
RunVisibility | None
|
Optional visibility override. |
None
|
origin
|
RunOrigin | None
|
Optional origin override. |
None
|
importance
|
RunImportance | None
|
Optional importance override. |
None
|
agent_id
|
str | None
|
Optional agent id override. |
None
|
app_id
|
str | None
|
Optional app id override. |
None
|
run_id
|
str | None
|
Optional explicit run id. |
None
|
Returns:
| Type | Description |
|---|---|
tuple[str, dict[str, Any] | None, bool, list[dict[str, Any]]]
|
tuple[str, dict[str, Any] | None, bool, list[dict[str, Any]]]:
|
Notes
This method uses count_slot=False to avoid nested deadlock behavior
in orchestration paths.
Cancellation¶
Aethergraph also supports run cancellation, allowing you to terminate a run that is no longer needed or is taking too long.
cancel_run(run_id)
Request best-effort cancellation for a run id.
This method delegates to run_manager.cancel_run(...).
Examples:
Cancel a spawned run:
await runner().cancel_run(run_id)
Cancel based on condition:
if should_abort:
await runner().cancel_run(run_id)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Run identifier to cancel. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
None |
None
|
Cancellation is requested asynchronously. |
Notes
Cancellation may not be immediate; scheduler termination is best-effort.