Agent-to-Agent Communication
Why email as a message bus?
Most multi-agent systems communicate through shared memory, function calls, or a centralized broker. Ravi email gives you a different primitive: each agent has a real inbox with a real address.
This means:
- An orchestrator agent can delegate to a specialist and await a reply with a full audit trail
- Any agent can reach any other agent without routing tables or shared infrastructure
- Messages are asynchronous — the sender does not block, and the receiver processes on its own schedule
- Every exchange is logged as real email, permanently readable and attributable
The pattern scales from two agents to dozens with no central coordination layer.
Basic pattern
The simplest form: orchestrator sends a task, specialist receives it, replies with a result.
# Orchestrator: send a task to the research agent
ravi identity use orchestrator-agent
ravi email send \
--to research@in.ravi.app \
--subject "Task: summarize this URL" \
--body "Please summarize https://example.com/report.pdf and reply with bullet points."
# Research agent: poll inbox for work
ravi identity use research-agent
for i in $(seq 1 30); do
TASK=$(ravi inbox email --unread --json | jq -r '.[0] // empty')
[ -n "$TASK" ] && break
sleep 5
done
# Research agent: do the work, then reply
ravi email reply \
--message-id "$(echo "$TASK" | jq -r '.id')" \
--body "Summary: ..."
# Orchestrator: poll for the reply
ravi identity use orchestrator-agent
for i in $(seq 1 30); do
REPLY=$(ravi inbox email --unread --json | jq -r '.[0] // empty')
[ -n "$REPLY" ] && break
sleep 5
done
Python example
A reusable wrapper for dispatching tasks and collecting replies:
import subprocess
import json
import time
from typing import Optional
def ravi(identity: str, *args: str) -> dict:
"""Run a ravi CLI command as the given identity."""
result = subprocess.run(
["ravi", "identity", "use", identity],
capture_output=True, check=True
)
out = subprocess.run(
["ravi", *args, "--json"],
capture_output=True, text=True, check=True
)
return json.loads(out.stdout)
def send_task(from_identity: str, to_email: str, subject: str, body: str) -> None:
ravi(from_identity, "email", "send",
"--to", to_email,
"--subject", subject,
"--body", body)
def wait_for_reply(identity: str, retries: int = 30, delay: float = 5.0) -> Optional[dict]:
"""Poll inbox until an unread message arrives or timeout."""
for _ in range(retries):
messages = ravi(identity, "inbox", "email", "--unread")
if messages:
return messages[0]
time.sleep(delay)
return None
# Orchestrator sends a task
send_task(
from_identity="orchestrator",
to_email="research@in.ravi.app",
subject="Task: competitive analysis",
body="List the top 3 competitors of Stripe and their pricing. Reply in JSON.",
)
# Orchestrator waits for the result
reply = wait_for_reply("orchestrator")
if reply:
result_body = reply["body"]
print("Received:", result_body)
else:
print("Timeout — no reply received")
TypeScript example
import { execSync } from "child_process";
function ravi(identity: string, ...args: string[]): unknown {
execSync(`ravi identity use ${identity}`);
const out = execSync(`ravi ${args.join(" ")} --json`, { encoding: "utf8" });
return JSON.parse(out);
}
async function waitForReply(
identity: string,
retries = 30,
delayMs = 5000
): Promise<Record<string, unknown> | null> {
for (let i = 0; i < retries; i++) {
const messages = ravi(identity, "inbox", "email", "--unread") as Array<
Record<string, unknown>
>;
if (messages.length > 0) return messages[0];
await new Promise((r) => setTimeout(r, delayMs));
}
return null;
}
// Dispatch a task from the orchestrator
ravi(
"orchestrator",
"email send",
"--to research@in.ravi.app",
'--subject "Task: research request"',
'--body "Summarize the state of AI coding assistants in 2026."'
);
// Await the reply
const reply = await waitForReply("orchestrator");
if (reply) {
console.log("Result:", reply.body);
} else {
console.error("No reply received within timeout");
}
Parallel task fan-out
Dispatch multiple tasks simultaneously and collect results as they arrive:
import subprocess, json, time, concurrent.futures
SPECIALISTS = {
"research": "research@in.ravi.app",
"finance": "invoices@in.ravi.app",
"scheduler": "calendar@in.ravi.app",
}
def dispatch(to_email: str, subject: str, body: str):
subprocess.run(
["ravi", "--identity", "orchestrator",
"email", "send",
"--to", to_email,
"--subject", subject,
"--body", body],
check=True,
)
# Fan out tasks to all specialists
dispatch(SPECIALISTS["research"], "Task: market report", "Summarize AI market in 2026.")
dispatch(SPECIALISTS["finance"], "Task: cost estimate", "Estimate AWS cost for 1M API calls/day.")
dispatch(SPECIALISTS["scheduler"], "Task: meeting slots", "Find 3 open slots next week for a 30-min call.")
# Collect replies as they arrive (poll orchestrator inbox)
collected = []
deadline = time.time() + 120 # 2-minute window
while len(collected) < len(SPECIALISTS) and time.time() < deadline:
messages = json.loads(
subprocess.check_output(["ravi", "--identity", "orchestrator",
"inbox", "email", "--unread", "--json"])
)
for msg in messages:
if msg not in collected:
collected.append(msg)
print(f"Got reply from {msg['from']}: {msg['subject']}")
if len(collected) < len(SPECIALISTS):
time.sleep(5)
print(f"Collected {len(collected)}/{len(SPECIALISTS)} replies")
Note: The
--identityflag is the safe way to scope commands in parallel processes. Usingravi identity use(global state) across concurrent processes causes race conditions. See Multi-Agent Setup for details.
Identity addressing
Each agent’s email address follows the pattern <name>@in.ravi.app. You can look up an agent’s address programmatically:
# Retrieve the email for a named identity
ravi identity list --json | jq -r '.[] | select(.name == "research-agent") | .email'
Store these addresses in a shared config file (not in the vault) so all agents in the swarm can address each other without hardcoding.
Audit trail
Because communication happens over real email, every message is retrievable:
# Read all messages in a thread
ravi inbox email --json | jq '.[] | {from, subject, timestamp}'
# Check what the orchestrator sent and received
ravi identity use orchestrator
ravi inbox email --all --json | jq '.[] | {from, to, subject, timestamp}'
This is useful for debugging swarm behavior, replaying failed tasks, or auditing decisions post-hoc.
When to use this pattern
| Scenario | Recommended approach |
|---|---|
| Orchestrator → specialist, async result | Agent-to-agent email (this guide) |
| Agent → human → agent approval gate | Human Approval |
| Parallel agents, isolated inboxes | Multi-Agent Setup |
| High-frequency, low-latency messages | Shared queue (Redis, SQS) — email adds ~1–5s latency |
Email is not the right transport for tight feedback loops. It is the right transport for durable, auditable, loosely-coupled task delegation.
Next steps
- Multi-Agent Setup — create and isolate identities for each agent
- Human Approval — add a human-in-the-loop gate using the same email pattern
- Identities — programmatic identity management