Trigger Facade¶
context.triggers() returns a TriggerFacade instance that enables scheduling graph runs via cron expressions, intervals, specific times, or external events. The trigger engine scans for new tasks every 5 seconds by default.
Note: The current trigger engine is optimized for simple tasks (up to a few hundred). For scheduled tasks in the AG UI, use the Schedule section without running a graph.
0. TriggerConfig¶
TriggerConfig is the base class for all trigger types and configurations. It's used in the create() method for general-purpose triggers. In most cases, you won't need to interact with this dataclass directly.
TriggerConfig
Configuration for trigger execution.
Attributes:
| Name | Type | Description |
|---|---|---|
kind |
TriggerKind
|
The type of trigger to execute. |
cron_expr |
str | None
|
Cron expression for scheduled triggers. Defaults to None. |
run_at |
datetime | None
|
Specific datetime to run the trigger. Defaults to None. |
event_key |
str | None
|
Key identifier for event-based triggers. Defaults to None. |
interval_seconds |
int | None
|
Interval in seconds for periodic triggers. Defaults to None. |
memory_level |
ScopeLevel | None
|
Scope level for trigger memory/state management. Defaults to None. |
tz |
str | None
|
Timezone for cron triggers (e.g., "America/New_York"). Defaults to UTC if not set. |
1. Scheduling¶
create(*, graph_id, default_inputs, config, trigger_name)
Create a trigger from an explicit TriggerConfig.
This is the generic entrypoint used by convenience helpers (cron, at,
after, interval, event) and delegates to
trigger_service.create_from_scope(...) using the bound scope.
Examples:
Create a one-shot trigger:
trig = await context.triggers().create(
graph_id="reminder-graph",
default_inputs={"message": "Ping me"},
config=TriggerConfig(kind="one_shot", run_at=run_at_utc),
trigger_name="single-reminder",
)
Create an event trigger:
trig = await context.triggers().create(
graph_id="billing-graph",
default_inputs={"source": "billing"},
config=TriggerConfig(kind="event", event_key="invoice.paid"),
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_id
|
str
|
Graph to execute when the trigger fires. |
required |
default_inputs
|
dict[str, Any]
|
Base inputs merged into submitted runs. |
required |
config
|
TriggerConfig
|
Trigger configuration payload describing kind and timing. |
required |
trigger_name
|
str | None
|
Optional human-readable trigger label. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TriggerRecord |
TriggerRecord
|
Persisted trigger record returned by the trigger service. |
Notes
config.memory_level is currently accepted for API compatibility but
is not consumed by this facade method.
cron(*, graph_id, default_inputs, cron_expr, tz, memory_level, trigger_name)
Create a cron-based recurring trigger.
This helper builds a TriggerConfig(kind="cron", ...) and forwards to
create(...).
Examples:
Weekday 9AM local trigger:
trig = await context.triggers().cron(
graph_id="daily-report",
default_inputs={"team": "ops"},
cron_expr="0 9 * * 1-5",
tz="America/Los_Angeles",
)
Hourly trigger in UTC:
trig = await context.triggers().cron(
graph_id="sync-graph",
default_inputs={},
cron_expr="0 * * * *",
trigger_name="hourly-sync",
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_id
|
str
|
Graph to execute when schedule fires. |
required |
default_inputs
|
dict[str, Any]
|
Base run inputs for each fire. |
required |
cron_expr
|
str
|
Cron expression interpreted by backend scheduler. |
required |
tz
|
str | None
|
Optional timezone for cron interpretation; defaults to UTC when omitted by the service. |
None
|
memory_level
|
ScopeLevel | None
|
Reserved scope-level hint for future behavior. |
None
|
trigger_name
|
str | None
|
Optional human-readable trigger label. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TriggerRecord |
TriggerRecord
|
Persisted cron trigger record. |
at(*, graph_id, default_inputs, run_at, tz, memory_level, trigger_name)
Create a one-shot trigger scheduled for an absolute datetime.
This helper builds TriggerConfig(kind="one_shot", run_at=...) and
forwards to create(...).
Examples:
Schedule with timezone-aware UTC datetime:
trig = await context.triggers().at(
graph_id="notify-graph",
default_inputs={"message": "check deployment"},
run_at=run_at_utc,
trigger_name="deploy-reminder",
)
Schedule with explicit timezone label:
trig = await context.triggers().at(
graph_id="reminder-graph",
default_inputs={"message": "standup"},
run_at=run_at_local,
tz="America/New_York",
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_id
|
str
|
Graph to execute at the scheduled instant. |
required |
default_inputs
|
dict[str, Any]
|
Base run inputs. |
required |
run_at
|
datetime
|
Absolute fire datetime. |
required |
tz
|
str | None
|
Optional timezone hint for normalization by service logic. |
None
|
memory_level
|
ScopeLevel | None
|
Reserved scope-level hint for future behavior. |
None
|
trigger_name
|
str | None
|
Optional human-readable trigger label. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TriggerRecord |
TriggerRecord
|
Persisted one-shot trigger record. |
Notes
For naive datetimes, timezone interpretation is delegated to service behavior and trigger timezone settings.
after(*, graph_id, default_inputs, delay_seconds, memory_level, trigger_name)
Schedule a one-shot trigger that fires after a delay.
This helper computes run_at in UTC as now + delay_seconds and
delegates to at(...).
Examples:
Fire after 5 minutes:
trig = await context.triggers().after(
graph_id="cleanup-graph",
default_inputs={"job": "stale-cache"},
delay_seconds=300,
)
Request immediate scheduling:
trig = await context.triggers().after(
graph_id="notify-graph",
default_inputs={"message": "run now"},
delay_seconds=0,
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_id
|
str
|
Graph to execute when delay elapses. |
required |
default_inputs
|
dict[str, Any]
|
Base run inputs. |
required |
delay_seconds
|
float
|
Delay in seconds before first fire. |
required |
memory_level
|
ScopeLevel | None
|
Reserved scope-level hint for future behavior. |
None
|
trigger_name
|
str | None
|
Optional human-readable trigger label. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TriggerRecord |
TriggerRecord
|
Persisted one-shot trigger record with computed
|
Notes
Non-positive delay values are coerced to "as soon as possible" by using the current UTC timestamp.
interval(*, graph_id, default_inputs, interval_seconds, memory_level, trigger_name)
Create an interval trigger that repeats every N seconds.
This helper builds TriggerConfig(kind="interval", interval_seconds=...)
and forwards to create(...).
Examples:
Run every minute:
trig = await context.triggers().interval(
graph_id="heartbeat-graph",
default_inputs={"source": "scheduler"},
interval_seconds=60,
)
Run every 10 minutes with a name:
trig = await context.triggers().interval(
graph_id="sync-graph",
default_inputs={"full": False},
interval_seconds=600,
trigger_name="periodic-sync",
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_id
|
str
|
Graph to execute on each interval tick. |
required |
default_inputs
|
dict[str, Any]
|
Base run inputs. |
required |
interval_seconds
|
int
|
Repeat interval in seconds. |
required |
memory_level
|
ScopeLevel | None
|
Reserved scope-level hint for future behavior. |
None
|
trigger_name
|
str | None
|
Optional human-readable trigger label. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TriggerRecord |
TriggerRecord
|
Persisted interval trigger record. |
Notes
Validation of minimum/maximum interval values is delegated to service or storage policies.
event(*, graph_id, default_inputs, event_key, memory_level, trigger_name)
Create an event-driven trigger for a specific event key.
This helper builds TriggerConfig(kind="event", event_key=...) and
forwards to create(...). Event triggers fire only when
fire_event(...) is invoked with the same key.
Examples:
Create a billing event trigger:
trig = await context.triggers().event(
graph_id="billing-graph",
default_inputs={"origin": "webhook"},
event_key="invoice.paid",
)
Create with explicit label:
trig = await context.triggers().event(
graph_id="audit-graph",
default_inputs={},
event_key="user.deleted",
trigger_name="audit-user-delete",
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph_id
|
str
|
Graph to execute when matching events are fired. |
required |
default_inputs
|
dict[str, Any]
|
Base run inputs merged with event payload. |
required |
event_key
|
str
|
Event routing key used for matching. |
required |
memory_level
|
ScopeLevel | None
|
Reserved scope-level hint for future behavior. |
None
|
trigger_name
|
str | None
|
Optional human-readable trigger label. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TriggerRecord |
TriggerRecord
|
Persisted event trigger record. |
Notes
Event payload merge policy is implemented in the engine and currently
nests payload under event.
2. Trigger and Cancel¶
fire_event(event_key, payload)
Fire active event triggers matching event_key in current scope.
This delegates to trigger_engine.fire_event(...), forwarding tenant
fields (org_id, user_id, client_id) from self.scope.
Examples:
Fire event without payload:
await context.triggers().fire_event("invoice.paid")
Fire event with payload:
await context.triggers().fire_event(
"invoice.paid",
payload={"invoice_id": "inv_123", "amount": 1999},
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_key
|
str
|
Routing key used to select active event triggers. |
required |
payload
|
dict[str, Any] | None
|
Optional event payload forwarded to trigger run inputs. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
None |
None
|
This method returns no value. |
Notes
Payload merge policy is currently engine-defined ({"event": payload})
when payload is provided.
cancel(trigger_id)
Soft-cancel and deactivate a trigger by id.
This delegates to trigger_service.cancel(...), which marks the trigger
inactive and clears future scheduling metadata.
Examples:
Cancel a known trigger:
await context.triggers().cancel("trig-ab12cd34")
Cancel after fetching:
trig = await context.triggers().get("trig-ab12cd34")
if trig:
await context.triggers().cancel(trig.trigger_id)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
trigger_id
|
str
|
Trigger identifier to deactivate. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
None |
None
|
This method returns no value. |
Notes
Canceling a missing trigger is typically a no-op in the service.
3. Helper¶
get(trigger_id)
Fetch a trigger record by id.
This delegates directly to trigger_service.get(...).
Examples:
Read a trigger record:
trig = await context.triggers().get("trig-ab12cd34")
Branch if missing:
trig = await context.triggers().get("trig-missing")
if trig is None:
...
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
trigger_id
|
str
|
Trigger identifier to retrieve. |
required |
Returns:
| Type | Description |
|---|---|
TriggerRecord | None
|
TriggerRecord | None: Persisted trigger record, or |
Notes
No additional scope filtering is applied in this facade method.