Agent-to-Agent Communication

Why email as a message bus?

Most multi-agent systems communicate through shared memory, function calls, or a centralized broker. Ravi email gives you a different primitive: each agent has a real inbox with a real address.

This means:

  • An orchestrator agent can delegate to a specialist and await a reply with a full audit trail
  • Any agent can reach any other agent without routing tables or shared infrastructure
  • Messages are asynchronous — the sender does not block, and the receiver processes on its own schedule
  • Every exchange is logged as real email, permanently readable and attributable

The pattern scales from two agents to dozens with no central coordination layer.

Basic pattern

The simplest form: orchestrator sends a task, specialist receives it, replies with a result.

# Orchestrator: send a task to the research agent
ravi identity use orchestrator-agent
ravi email send \
  --to research@in.ravi.app \
  --subject "Task: summarize this URL" \
  --body "Please summarize https://example.com/report.pdf and reply with bullet points."

# Research agent: poll inbox for work
ravi identity use research-agent
for i in $(seq 1 30); do
  TASK=$(ravi inbox email --unread --json | jq -r '.[0] // empty')
  [ -n "$TASK" ] && break
  sleep 5
done

# Research agent: do the work, then reply
ravi email reply \
  --message-id "$(echo "$TASK" | jq -r '.id')" \
  --body "Summary: ..."

# Orchestrator: poll for the reply
ravi identity use orchestrator-agent
for i in $(seq 1 30); do
  REPLY=$(ravi inbox email --unread --json | jq -r '.[0] // empty')
  [ -n "$REPLY" ] && break
  sleep 5
done

Python example

A reusable wrapper for dispatching tasks and collecting replies:

import subprocess
import json
import time
from typing import Optional

def ravi(identity: str, *args: str) -> dict:
    """Run a ravi CLI command as the given identity."""
    result = subprocess.run(
        ["ravi", "identity", "use", identity],
        capture_output=True, check=True
    )
    out = subprocess.run(
        ["ravi", *args, "--json"],
        capture_output=True, text=True, check=True
    )
    return json.loads(out.stdout)

def send_task(from_identity: str, to_email: str, subject: str, body: str) -> None:
    ravi(from_identity, "email", "send",
         "--to", to_email,
         "--subject", subject,
         "--body", body)

def wait_for_reply(identity: str, retries: int = 30, delay: float = 5.0) -> Optional[dict]:
    """Poll inbox until an unread message arrives or timeout."""
    for _ in range(retries):
        messages = ravi(identity, "inbox", "email", "--unread")
        if messages:
            return messages[0]
        time.sleep(delay)
    return None

# Orchestrator sends a task
send_task(
    from_identity="orchestrator",
    to_email="research@in.ravi.app",
    subject="Task: competitive analysis",
    body="List the top 3 competitors of Stripe and their pricing. Reply in JSON.",
)

# Orchestrator waits for the result
reply = wait_for_reply("orchestrator")
if reply:
    result_body = reply["body"]
    print("Received:", result_body)
else:
    print("Timeout — no reply received")

TypeScript example

import { execSync } from "child_process";

function ravi(identity: string, ...args: string[]): unknown {
  execSync(`ravi identity use ${identity}`);
  const out = execSync(`ravi ${args.join(" ")} --json`, { encoding: "utf8" });
  return JSON.parse(out);
}

async function waitForReply(
  identity: string,
  retries = 30,
  delayMs = 5000
): Promise<Record<string, unknown> | null> {
  for (let i = 0; i < retries; i++) {
    const messages = ravi(identity, "inbox", "email", "--unread") as Array<
      Record<string, unknown>
    >;
    if (messages.length > 0) return messages[0];
    await new Promise((r) => setTimeout(r, delayMs));
  }
  return null;
}

// Dispatch a task from the orchestrator
ravi(
  "orchestrator",
  "email send",
  "--to research@in.ravi.app",
  '--subject "Task: research request"',
  '--body "Summarize the state of AI coding assistants in 2026."'
);

// Await the reply
const reply = await waitForReply("orchestrator");
if (reply) {
  console.log("Result:", reply.body);
} else {
  console.error("No reply received within timeout");
}

Parallel task fan-out

Dispatch multiple tasks simultaneously and collect results as they arrive:

import subprocess, json, time, concurrent.futures

SPECIALISTS = {
    "research":  "research@in.ravi.app",
    "finance":   "invoices@in.ravi.app",
    "scheduler": "calendar@in.ravi.app",
}

def dispatch(to_email: str, subject: str, body: str):
    subprocess.run(
        ["ravi", "--identity", "orchestrator",
         "email", "send",
         "--to", to_email,
         "--subject", subject,
         "--body", body],
        check=True,
    )

# Fan out tasks to all specialists
dispatch(SPECIALISTS["research"],  "Task: market report",    "Summarize AI market in 2026.")
dispatch(SPECIALISTS["finance"],   "Task: cost estimate",    "Estimate AWS cost for 1M API calls/day.")
dispatch(SPECIALISTS["scheduler"], "Task: meeting slots",    "Find 3 open slots next week for a 30-min call.")

# Collect replies as they arrive (poll orchestrator inbox)
collected = []
deadline = time.time() + 120  # 2-minute window

while len(collected) < len(SPECIALISTS) and time.time() < deadline:
    messages = json.loads(
        subprocess.check_output(["ravi", "--identity", "orchestrator",
                                 "inbox", "email", "--unread", "--json"])
    )
    for msg in messages:
        if msg not in collected:
            collected.append(msg)
            print(f"Got reply from {msg['from']}: {msg['subject']}")
    if len(collected) < len(SPECIALISTS):
        time.sleep(5)

print(f"Collected {len(collected)}/{len(SPECIALISTS)} replies")

Note: The --identity flag is the safe way to scope commands in parallel processes. Using ravi identity use (global state) across concurrent processes causes race conditions. See Multi-Agent Setup for details.

Identity addressing

Each agent’s email address follows the pattern <name>@in.ravi.app. You can look up an agent’s address programmatically:

# Retrieve the email for a named identity
ravi identity list --json | jq -r '.[] | select(.name == "research-agent") | .email'

Store these addresses in a shared config file (not in the vault) so all agents in the swarm can address each other without hardcoding.

Audit trail

Because communication happens over real email, every message is retrievable:

# Read all messages in a thread
ravi inbox email --json | jq '.[] | {from, subject, timestamp}'

# Check what the orchestrator sent and received
ravi identity use orchestrator
ravi inbox email --all --json | jq '.[] | {from, to, subject, timestamp}'

This is useful for debugging swarm behavior, replaying failed tasks, or auditing decisions post-hoc.

When to use this pattern

ScenarioRecommended approach
Orchestrator → specialist, async resultAgent-to-agent email (this guide)
Agent → human → agent approval gateHuman Approval
Parallel agents, isolated inboxesMulti-Agent Setup
High-frequency, low-latency messagesShared queue (Redis, SQS) — email adds ~1–5s latency

Email is not the right transport for tight feedback loops. It is the right transport for durable, auditable, loosely-coupled task delegation.

Next steps