Skip to content

Runner API – run_async & run

The runner serves as the unified entry point for executing:

  • a GraphFunction (created with @graph_fn), or
  • a static TaskGraph (constructed via graphify, builder, or storage).

Internally, it initializes a RuntimeEnv, configures services, and manages a ForwardScheduler with customizable retry and concurrency options.


Runner APIs

Use these APIs to start new runs. For nested execution of graphs or agents, refer to the Context Run API.

run_async(target, inputs, identity, **rt_overrides)

Execute a TaskGraph or GraphFunction asynchronously with optional persistence and resumability.

This method handles environment setup, cold-resume from persisted state (if available), input validation, scheduling, and output resolution. It supports both fresh runs and resuming incomplete runs, automatically wiring up persistence observers and enforcing snapshot policies.

Examples:

Running a graph function:

result = await run_async(my_graph_fn, {"x": 1, "y": 2})

Running a TaskGraph with custom run ID and identity:

result = await run_async(
    my_task_graph,
    {"input": 42},
    run_id="custom-run-123",
    identity=my_identity,     # Only used with API requests. Ignored when running locally.
    max_concurrency=8
)

Parameters:

Name Type Description Default
target

The TaskGraph, GraphFunction, or builder to execute.

required
inputs dict[str, Any] | None

Dictionary of input values for the graph.

None
identity RequestIdentity | None

Optional RequestIdentity for user/session context.

None
**rt_overrides

Optional runtime overrides for environment and execution. Recognized runtime overrides include:

  • run_id (str): Custom run identifier.
  • session_id (str): Session identifier for grouping runs.
  • agent_id (str): Agent identifier for provenance.
  • app_id (str): Application identifier for provenance.
  • retry (RetryPolicy): Custom retry policy.
  • max_concurrency (int): Maximum number of concurrent tasks.
  • Any additional container attributes supported by your environment.
{}

Returns:

Name Type Description
dict

The resolved outputs of the graph, or a status dict if waiting on continuations.

Raises:

Type Description
GraphHasPendingWaits

If the graph is waiting on external events and outputs are not ready.

TypeError

If the target is not a valid TaskGraph or GraphFunction.

Notes
  • Speficially for GraphFunctions, you can directly use await graph_fn(**inputs) without needing run_async.
  • graph_fn is not resumable; use TaskGraphs for persistence and recovery features.
  • when using graph for persistence/resumability, ensure your outputs are JSON-serializable, for examples:

    • primitive types (str, int, float, bool, None)
    • lists/dicts of primitive types

    • graph that can be resumed with JSON-serializable outputs:

      @graphify(...)
      def my_graph(...):
          ...
          return {"result": 42, "data": [1, 2, 3], "info": None} # valid JSON-serializable output
      

    • graph that cannot be resumed due to non-JSON-serializable outputs:
      @graphify(...)
      def my_graph(...):
          ...
          return {"chekpoint": torch.pt, "file": open("data.bin", "rb")} # invalid outputs for resuming (but valid for fresh runs)
      
    • Despite this, you can still use graph without persistence features; just avoid resuming such graphs.
run(target, inputs, identity, **rt_overrides)

Execute a target graph node synchronously with the provided inputs.

This function submits the execution of a target node (or graph) to the event loop, allowing for asynchronous execution while providing a synchronous interface. Runtime configuration overrides can be supplied as keyword arguments.

Examples:

Running a graph node with default inputs:

future = run(my_node)
result = future.result()

Running with custom inputs and runtime overrides:

future = run(my_node, inputs={"x": 42}, timeout=10)
output = future.result()

Parameters:

Name Type Description Default
target

The graph node or callable to execute.

required
inputs dict[str, Any] | None

Optional dictionary of input values to pass to the node.

None
identity RequestIdentity | None

Optional RequestIdentity for user/session context.

None
**rt_overrides

Additional keyword arguments to override runtime configuration.

{}

Returns:

Type Description

concurrent.futures.Future: A future representing the asynchronous execution of the node.

Notes
  • This function is suitable for use in synchronous contexts where asynchronous execution is desired.
  • It is recommended to use asynchronous execution directly when possible for better performance and responsiveness.