Skip to content

Launching Agent Runs with the context Method

Aethergraph provides a flexible interface for managing agent runs through the NodeContext.runner() object. This approach goes beyond simple nested calls to graph_fn, allowing you to spawn, monitor, and control runs with fine-grained operations. The runner() method is designed for general use with any agent, and it supplies metadata that supports seamless integration with the AG web UI.


Async Run

You can initiate an agent run asynchronously using the spawn_run method. This "fire and forget" approach lets you start a run without waiting for its completion.

spawn_run(graph_id, *, inputs, session_id, tags, ...)

Submit a child run and return immediately with its run id.

This method calls run_manager.submit_run(...) and applies defaults from this facade when optional arguments are not provided.

Examples:

Spawn with default context identity/session:

run_id = await runner().spawn_run(
    "my-graph",
    inputs={"prompt": "hello"},
)

Spawn with explicit metadata overrides:

run_id = await runner().spawn_run(
    "my-graph",
    inputs={"payload": {"a": 1}},
    tags=["batch", "priority"],
    visibility=RunVisibility.inline,
    agent_id="agent-123",
)

Parameters:

Name Type Description Default
graph_id str

Registered graph identifier to execute.

required
inputs dict[str, Any]

Graph input payload for the child run.

required
session_id str | None

Optional session id override.

None
tags list[str] | None

Optional run tags.

None
visibility RunVisibility | None

Optional visibility override.

None
origin RunOrigin | None

Optional origin override.

None
importance RunImportance | None

Optional importance override.

None
agent_id str | None

Optional agent id override.

None
app_id str | None

Optional app id override.

None
run_id str | None

Optional explicit run id.

None

Returns:

Name Type Description
str str

Created child run id.

Notes

If origin is not provided, it defaults to agent when an effective agent id exists; otherwise app.

To monitor the progress or completion of an asynchronous run, use the wait_run method. This allows you to block until the run finishes or a timeout occurs.

context.wait_run(run_id, *, timeout_s)

Wait for a run to reach a terminal state.

This method delegates to run_manager.wait_run(...).

Examples:

Wait for a run record:

record = await runner().wait_run(run_id)

Wait and also collect outputs:

record, outputs = await runner().wait_run(
    run_id,
    timeout_s=30,
    return_outputs=True,
)

Parameters:

Name Type Description Default
run_id str

Run identifier to wait on.

required
timeout_s float | None

Optional timeout in seconds.

None
return_outputs bool

If true, return (record, outputs) tuple.

False

Returns:

Type Description
RunRecord | tuple[RunRecord, dict[str, Any] | None]

RunRecord | tuple[RunRecord, dict[str, Any] | None]: Final run record, or (record, outputs) when requested.

Notes

Output availability depends on in-process execution context.

Kick off a Run and Wait

If you need to start a run and wait for its result in a single step, the context.run_and_wait method provides a convenient solution. Note this method blocks the process and does not honor the concurrency limit.

run_and_wait(graph_id, *, inputs, session_id, tags, ...)

Run a child graph as a tracked run and wait for completion.

This method delegates to run_manager.run_and_wait(...) and returns the completed run id plus execution outputs and wait metadata.

Examples:

Wait for a child graph:

run_id, outputs, has_waits, continuations = await runner().run_and_wait(
    "my-graph",
    inputs={"x": 1},
)

Wait with explicit run metadata:

run_id, outputs, has_waits, continuations = await runner().run_and_wait(
    "my-graph",
    inputs={"x": 1},
    tags=["child"],
    run_id="run-custom-001",
)

Parameters:

Name Type Description Default
graph_id str

Registered graph identifier to execute.

required
inputs dict[str, Any]

Graph input payload for the child run.

required
session_id str | None

Optional session id override.

None
tags list[str] | None

Optional run tags.

None
visibility RunVisibility | None

Optional visibility override.

None
origin RunOrigin | None

Optional origin override.

None
importance RunImportance | None

Optional importance override.

None
agent_id str | None

Optional agent id override.

None
app_id str | None

Optional app id override.

None
run_id str | None

Optional explicit run id.

None

Returns:

Type Description
tuple[str, dict[str, Any] | None, bool, list[dict[str, Any]]]

tuple[str, dict[str, Any] | None, bool, list[dict[str, Any]]]: (run_id, outputs, has_waits, continuations) from the completed child run.

Notes

This method uses count_slot=False to avoid nested deadlock behavior in orchestration paths.

Cancellation

Aethergraph also supports run cancellation, allowing you to terminate a run that is no longer needed or is taking too long.

cancel_run(run_id)

Request best-effort cancellation for a run id.

This method delegates to run_manager.cancel_run(...).

Examples:

Cancel a spawned run:

await runner().cancel_run(run_id)

Cancel based on condition:

if should_abort:
    await runner().cancel_run(run_id)

Parameters:

Name Type Description Default
run_id str

Run identifier to cancel.

required

Returns:

Name Type Description
None None

Cancellation is requested asynchronously.

Notes

Cancellation may not be immediate; scheduler termination is best-effort.