Skip to content

Trigger Facade

context.triggers() returns a TriggerFacade instance that enables scheduling graph runs via cron expressions, intervals, specific times, or external events. The trigger engine scans for new tasks every 5 seconds by default.

Note: The current trigger engine is optimized for simple tasks (up to a few hundred). For scheduled tasks in the AG UI, use the Schedule section without running a graph.

0. TriggerConfig

TriggerConfig is the base class for all trigger types and configurations. It's used in the create() method for general-purpose triggers. In most cases, you won't need to interact with this dataclass directly.

TriggerConfig

Configuration for trigger execution.

Attributes:

Name Type Description
kind TriggerKind

The type of trigger to execute.

cron_expr str | None

Cron expression for scheduled triggers. Defaults to None.

run_at datetime | None

Specific datetime to run the trigger. Defaults to None.

event_key str | None

Key identifier for event-based triggers. Defaults to None.

interval_seconds int | None

Interval in seconds for periodic triggers. Defaults to None.

memory_level ScopeLevel | None

Scope level for trigger memory/state management. Defaults to None.

tz str | None

Timezone for cron triggers (e.g., "America/New_York"). Defaults to UTC if not set.

1. Scheduling

create(*, graph_id, default_inputs, config, trigger_name)

Create a trigger from an explicit TriggerConfig.

This is the generic entrypoint used by convenience helpers (cron, at, after, interval, event) and delegates to trigger_service.create_from_scope(...) using the bound scope.

Examples:

Create a one-shot trigger:

trig = await context.triggers().create(
    graph_id="reminder-graph",
    default_inputs={"message": "Ping me"},
    config=TriggerConfig(kind="one_shot", run_at=run_at_utc),
    trigger_name="single-reminder",
)

Create an event trigger:

trig = await context.triggers().create(
    graph_id="billing-graph",
    default_inputs={"source": "billing"},
    config=TriggerConfig(kind="event", event_key="invoice.paid"),
)

Parameters:

Name Type Description Default
graph_id str

Graph to execute when the trigger fires.

required
default_inputs dict[str, Any]

Base inputs merged into submitted runs.

required
config TriggerConfig

Trigger configuration payload describing kind and timing.

required
trigger_name str | None

Optional human-readable trigger label.

None

Returns:

Name Type Description
TriggerRecord TriggerRecord

Persisted trigger record returned by the trigger service.

Notes

config.memory_level is currently accepted for API compatibility but is not consumed by this facade method.

cron(*, graph_id, default_inputs, cron_expr, tz, memory_level, trigger_name)

Create a cron-based recurring trigger.

This helper builds a TriggerConfig(kind="cron", ...) and forwards to create(...).

Examples:

Weekday 9AM local trigger:

trig = await context.triggers().cron(
    graph_id="daily-report",
    default_inputs={"team": "ops"},
    cron_expr="0 9 * * 1-5",
    tz="America/Los_Angeles",
)

Hourly trigger in UTC:

trig = await context.triggers().cron(
    graph_id="sync-graph",
    default_inputs={},
    cron_expr="0 * * * *",
    trigger_name="hourly-sync",
)

Parameters:

Name Type Description Default
graph_id str

Graph to execute when schedule fires.

required
default_inputs dict[str, Any]

Base run inputs for each fire.

required
cron_expr str

Cron expression interpreted by backend scheduler.

required
tz str | None

Optional timezone for cron interpretation; defaults to UTC when omitted by the service.

None
memory_level ScopeLevel | None

Reserved scope-level hint for future behavior.

None
trigger_name str | None

Optional human-readable trigger label.

None

Returns:

Name Type Description
TriggerRecord TriggerRecord

Persisted cron trigger record.

at(*, graph_id, default_inputs, run_at, tz, memory_level, trigger_name)

Create a one-shot trigger scheduled for an absolute datetime.

This helper builds TriggerConfig(kind="one_shot", run_at=...) and forwards to create(...).

Examples:

Schedule with timezone-aware UTC datetime:

trig = await context.triggers().at(
    graph_id="notify-graph",
    default_inputs={"message": "check deployment"},
    run_at=run_at_utc,
    trigger_name="deploy-reminder",
)

Schedule with explicit timezone label:

trig = await context.triggers().at(
    graph_id="reminder-graph",
    default_inputs={"message": "standup"},
    run_at=run_at_local,
    tz="America/New_York",
)

Parameters:

Name Type Description Default
graph_id str

Graph to execute at the scheduled instant.

required
default_inputs dict[str, Any]

Base run inputs.

required
run_at datetime

Absolute fire datetime.

required
tz str | None

Optional timezone hint for normalization by service logic.

None
memory_level ScopeLevel | None

Reserved scope-level hint for future behavior.

None
trigger_name str | None

Optional human-readable trigger label.

None

Returns:

Name Type Description
TriggerRecord TriggerRecord

Persisted one-shot trigger record.

Notes

For naive datetimes, timezone interpretation is delegated to service behavior and trigger timezone settings.

after(*, graph_id, default_inputs, delay_seconds, memory_level, trigger_name)

Schedule a one-shot trigger that fires after a delay.

This helper computes run_at in UTC as now + delay_seconds and delegates to at(...).

Examples:

Fire after 5 minutes:

trig = await context.triggers().after(
    graph_id="cleanup-graph",
    default_inputs={"job": "stale-cache"},
    delay_seconds=300,
)

Request immediate scheduling:

trig = await context.triggers().after(
    graph_id="notify-graph",
    default_inputs={"message": "run now"},
    delay_seconds=0,
)

Parameters:

Name Type Description Default
graph_id str

Graph to execute when delay elapses.

required
default_inputs dict[str, Any]

Base run inputs.

required
delay_seconds float

Delay in seconds before first fire.

required
memory_level ScopeLevel | None

Reserved scope-level hint for future behavior.

None
trigger_name str | None

Optional human-readable trigger label.

None

Returns:

Name Type Description
TriggerRecord TriggerRecord

Persisted one-shot trigger record with computed run_at.

Notes

Non-positive delay values are coerced to "as soon as possible" by using the current UTC timestamp.

interval(*, graph_id, default_inputs, interval_seconds, memory_level, trigger_name)

Create an interval trigger that repeats every N seconds.

This helper builds TriggerConfig(kind="interval", interval_seconds=...) and forwards to create(...).

Examples:

Run every minute:

trig = await context.triggers().interval(
    graph_id="heartbeat-graph",
    default_inputs={"source": "scheduler"},
    interval_seconds=60,
)

Run every 10 minutes with a name:

trig = await context.triggers().interval(
    graph_id="sync-graph",
    default_inputs={"full": False},
    interval_seconds=600,
    trigger_name="periodic-sync",
)

Parameters:

Name Type Description Default
graph_id str

Graph to execute on each interval tick.

required
default_inputs dict[str, Any]

Base run inputs.

required
interval_seconds int

Repeat interval in seconds.

required
memory_level ScopeLevel | None

Reserved scope-level hint for future behavior.

None
trigger_name str | None

Optional human-readable trigger label.

None

Returns:

Name Type Description
TriggerRecord TriggerRecord

Persisted interval trigger record.

Notes

Validation of minimum/maximum interval values is delegated to service or storage policies.

event(*, graph_id, default_inputs, event_key, memory_level, trigger_name)

Create an event-driven trigger for a specific event key.

This helper builds TriggerConfig(kind="event", event_key=...) and forwards to create(...). Event triggers fire only when fire_event(...) is invoked with the same key.

Examples:

Create a billing event trigger:

trig = await context.triggers().event(
    graph_id="billing-graph",
    default_inputs={"origin": "webhook"},
    event_key="invoice.paid",
)

Create with explicit label:

trig = await context.triggers().event(
    graph_id="audit-graph",
    default_inputs={},
    event_key="user.deleted",
    trigger_name="audit-user-delete",
)

Parameters:

Name Type Description Default
graph_id str

Graph to execute when matching events are fired.

required
default_inputs dict[str, Any]

Base run inputs merged with event payload.

required
event_key str

Event routing key used for matching.

required
memory_level ScopeLevel | None

Reserved scope-level hint for future behavior.

None
trigger_name str | None

Optional human-readable trigger label.

None

Returns:

Name Type Description
TriggerRecord TriggerRecord

Persisted event trigger record.

Notes

Event payload merge policy is implemented in the engine and currently nests payload under event.

2. Trigger and Cancel

fire_event(event_key, payload)

Fire active event triggers matching event_key in current scope.

This delegates to trigger_engine.fire_event(...), forwarding tenant fields (org_id, user_id, client_id) from self.scope.

Examples:

Fire event without payload:

await context.triggers().fire_event("invoice.paid")

Fire event with payload:

await context.triggers().fire_event(
    "invoice.paid",
    payload={"invoice_id": "inv_123", "amount": 1999},
)

Parameters:

Name Type Description Default
event_key str

Routing key used to select active event triggers.

required
payload dict[str, Any] | None

Optional event payload forwarded to trigger run inputs.

None

Returns:

Name Type Description
None None

This method returns no value.

Notes

Payload merge policy is currently engine-defined ({"event": payload}) when payload is provided.

cancel(trigger_id)

Soft-cancel and deactivate a trigger by id.

This delegates to trigger_service.cancel(...), which marks the trigger inactive and clears future scheduling metadata.

Examples:

Cancel a known trigger:

await context.triggers().cancel("trig-ab12cd34")

Cancel after fetching:

trig = await context.triggers().get("trig-ab12cd34")
if trig:
    await context.triggers().cancel(trig.trigger_id)

Parameters:

Name Type Description Default
trigger_id str

Trigger identifier to deactivate.

required

Returns:

Name Type Description
None None

This method returns no value.

Notes

Canceling a missing trigger is typically a no-op in the service.

3. Helper

get(trigger_id)

Fetch a trigger record by id.

This delegates directly to trigger_service.get(...).

Examples:

Read a trigger record:

trig = await context.triggers().get("trig-ab12cd34")

Branch if missing:

trig = await context.triggers().get("trig-missing")
if trig is None:
    ...

Parameters:

Name Type Description Default
trigger_id str

Trigger identifier to retrieve.

required

Returns:

Type Description
TriggerRecord | None

TriggerRecord | None: Persisted trigger record, or None when not found.

Notes

No additional scope filtering is applied in this facade method.