Extending Context Services¶
AetherGraph lets you extend the runtime by adding your own context.<name> methods. These external context services live alongside built‑ins like channel, memory, and artifacts, and provide reusable, lifecycle‑aware helpers for clients, caches, orchestration, or domain APIs — without changing your agent code.
Key idea: keep agent logic pure‑Python; move integration glue and shared state into services that the runtime injects per node.
1. What is an External Context Service?¶
A context service is a registered Python object bound into every NodeContext. After registration, you can use it anywhere inside a graph or tool:
@graph_fn(name="demo")
async def demo(*, context):
info = await context.trainer().inspect_job(job_id="abc123")
return {"status": info["status"]}
Why/When to Use¶
- Reusable helpers — share clients (e.g., HPC, S3, DB, solver), connection pools, or token buckets.
- Shared state — memoize expensive lookups; coordinate across nodes within a run.
- Centralized config — keep API keys, timeouts, routing, or policies in one place.
- Per‑node awareness — access to built-in services through
self.ctx()for provenance or multi‑tenancy.
Use a service for long‑lived instances or cross‑node coordination. For tiny stateless helpers, plain imports are fine.
2. Naming & Boundaries (Important)¶
Built‑ins (context.artifacts(), context.memory(), etc.) are not swappable in OSS. To extend the system, register new services with new names (e.g., context.trainer(), context.datasets(), context.lineage_store()).
- Keep agent code explicit about which storage or API it’s using.
- If mirroring/exporting, record links (artifact URIs, memory event IDs) inside your external system for provenance.
3. Minimal Service (Instance‑based)¶
Define a service and use self.ctx()¶
from aethergraph import Service
class Trainer(Service):
async def submit(self, spec: dict) -> str:
# Submit a training job to your HPC/cluster
...
return job_id
async def inspect_job(self, job_id: str) -> dict:
# Inspect the job status
...
return {"job_id": job_id, "status": status}
Register at startup (pass an instance)¶
from aethergraph.runtime import register_context_service
from aethergraph import start_server()
# register after server is started
start_server()
register_context_service("trainer", Trainer())
After this,
context.trainer()is available everywhere in the runtime.
4. Usage Patterns¶
A) Submit training & link artifacts¶
@graph_fn(name="train_model", outputs=["job_id", "ckpt_uri"])
async def train_model(spec: dict, *, context):
# Submit to your cluster via the custom service
job_id = await context.trainer().submit(spec)
return {"job_id": job_id, "ckpt_uri": ckpt.uri}
B) Inspect status in another node/tool¶
@tool(name="wait_for_training", outputs=["ready"])
async def wait_for_training(job_id: str, *, context) -> dict:
# Inspect you job through your service
info = await context.trainer().inspect_job(job_id)
return {"ready": info["status"] == "COMPLETED"}
5. Concurrency & Lifecycle¶
If you expect your services are accessed by multiple agents concurrently, consider the designs:
- Lifecycle hooks:
start()/close()are optional; call them from your app/server bootstrap. - Shared access: use
self.critical()to protect mutable shared state. Design your own mutex when scaling up. - Per‑node context: call
self.ctx()whenever you need{run_id, graph_id, node_id}. - Async native: expose async APIs; if integrating queues, consider
asyncio.Queue.
6. Common Service Patterns (Examples)¶
| Scenario | Suggested accessor | What it abstracts | Typical operations |
|---|---|---|---|
| HPC / Training orchestration | context.trainer() |
Submit/track jobs on Slurm/K8s/Ray | submit(spec), inspect_job(id), cancel(id) |
| External object storage | context.storage() |
S3/GCS/MinIO buckets & signed URLs | put(path), get(uri), sign(uri), list(prefix) |
| Vendor API client | context.apiclient() |
Rate‑limited, retried HTTP SDK | get/put/post, batch(), retry/backoff |
| In‑house AI models | context.models() |
Local inference endpoints | embed(texts), generate(prompt) |
| Materials DB / domain registry | context.materials() |
Domain lookups & cached tables | get_index(name), search(filters) |
| Lineage export | context.lineage_store() |
Mirror core provenance to BI/warehouse | export_run(run_id), push(events) |
Pick names that are explicit in your org (e.g.,
context.k8s_jobs(),context.minio()). Avoid names that shadow built‑ins.
Summary¶
- External services add named capabilities to
contextwithout changing agent code. - Built‑ins remain stable; extend via new names (no in‑place swaps).
- Register instances, not factories; services run on the main event loop.
- Use
self.ctx()to fetch per‑node provenance on demand; protect shared state withcritical()or your own lock design.