LobsterAI
Overview
LobsterAI is NetEase Youdao’s 24/7 all-scenario personal assistant built on the Claude Agent SDK. It uses an extensible skill system — including a skill-creator meta-skill that generates new capabilities at runtime — and integrates with Telegram, Discord, DingTalk, and Feishu. LobsterAI runs continuously as a persistent process, managing multiple agents across platforms simultaneously.
The moment those agents need a real identity — an email address for signups, a phone number for SMS verification, a secrets vault for API keys — they hit Ravi. Ravi gives each LobsterAI skill a provisioned, persistent identity that survives container restarts, platform migrations, and agent upgrades.
The Problem LobsterAI Solves Without Ravi
LobsterAI agents operate continuously. They receive messages on Telegram, parse calendar events, retrieve documents from Notion, and reply on your behalf. All of this requires credentials: OAuth tokens, API keys, service passwords. Without a structured identity layer, those credentials are hardcoded into config.yml, scattered across .env files, or — worse — lost when the container restarts.
Ravi’s E2E-encrypted vault is the structured layer: credentials live in one place, accessible from any environment via CLI or REST API, scoped to a named identity that persists across deployments.
Integration Patterns
LobsterAI is SDK-native: skills are collections of tool definitions that the Claude Agent SDK calls at runtime. The cleanest Ravi integration is not subprocess.run(["ravi", ...]) on every call — it’s wrapping the Ravi CLI into reusable async tool functions that LobsterAI loads as a skill.
Pattern 1: Ravi skill as TypeScript tool definitions
// skills/ravi-identity/index.ts
import { execSync } from "child_process";
function ravi(args: string[], identity?: string): unknown {
const env = identity
? { ...process.env, RAVI_CONFIG_DIR: `/tmp/ravi-${identity}` }
: process.env;
const raw = execSync(`ravi ${args.join(" ")} --json`, {
encoding: "utf8",
env,
});
return JSON.parse(raw);
}
export const raviTools = [
{
name: "ravi_get_email",
description:
"Get the agent's provisioned email address. Use this before signing up for a service or composing an email.",
input_schema: { type: "object", properties: {} },
handler: async () => ravi(["get", "email"]),
},
{
name: "ravi_get_phone",
description:
"Get the agent's provisioned phone number (E.164 format). Use this before triggering SMS verification.",
input_schema: { type: "object", properties: {} },
handler: async () => ravi(["get", "phone"]),
},
{
name: "ravi_poll_sms",
description:
"Poll for an incoming SMS OTP. Call this after the agent triggers an SMS verification. Returns the first unread message body, or null on timeout.",
input_schema: {
type: "object",
properties: {
timeout_seconds: {
type: "number",
description: "Max seconds to wait (default 30)",
},
},
},
handler: async ({ timeout_seconds = 30 }) => {
const retries = Math.ceil(timeout_seconds / 2);
for (let i = 0; i < retries; i++) {
const msgs = ravi(["inbox", "sms", "--unread"]) as Array<{ preview: string }>;
if (msgs.length > 0) return { body: msgs[0].preview };
await new Promise((r) => setTimeout(r, 2000));
}
return { body: null, error: "SMS timeout" };
},
},
{
name: "ravi_poll_email",
description:
"Poll for an unread email — useful after triggering an email verification or approval request.",
input_schema: {
type: "object",
properties: {
subject_contains: {
type: "string",
description: "Filter by subject substring (optional)",
},
timeout_seconds: { type: "number" },
},
},
handler: async ({ subject_contains = "", timeout_seconds = 30 }) => {
const retries = Math.ceil(timeout_seconds / 2);
for (let i = 0; i < retries; i++) {
const threads = ravi(["inbox", "email", "--unread"]) as Array<{
subject: string;
preview: string;
}>;
const match = subject_contains
? threads.find((t) =>
t.subject.toLowerCase().includes(subject_contains.toLowerCase())
)
: threads[0];
if (match) return match;
await new Promise((r) => setTimeout(r, 2000));
}
return { subject: null, error: "Email timeout" };
},
},
{
name: "ravi_get_secret",
description: "Retrieve an API key or secret from the Ravi vault by key name.",
input_schema: {
type: "object",
properties: {
key: {
type: "string",
description: "Secret key name (e.g. TELEGRAM_BOT_TOKEN)",
},
},
required: ["key"],
},
handler: async ({ key }: { key: string }) => ravi(["secrets", "get", key]),
},
];
Pattern 2: Python tool definitions
# skills/ravi_identity.py
import subprocess, json, time, os
from typing import Any
def ravi(args: list[str], identity: str | None = None) -> Any:
env = os.environ.copy()
if identity:
env["RAVI_CONFIG_DIR"] = f"/tmp/ravi-{identity}"
result = subprocess.run(
["ravi"] + args + ["--json"],
capture_output=True, text=True, env=env, check=True,
)
return json.loads(result.stdout)
def ravi_get_email(identity: str | None = None) -> dict:
"""Return the agent's provisioned email address."""
return ravi(["get", "email"], identity)
def ravi_get_phone(identity: str | None = None) -> dict:
"""Return the agent's provisioned phone number."""
return ravi(["get", "phone"], identity)
def ravi_poll_sms(timeout_seconds: int = 30, identity: str | None = None) -> dict:
"""
Poll for an incoming SMS OTP. Call after triggering a verification.
Returns first unread SMS body, or raises TimeoutError.
"""
retries = timeout_seconds // 2
for _ in range(retries):
msgs = ravi(["inbox", "sms", "--unread"], identity)
if msgs:
return {"body": msgs[0]["preview"]}
time.sleep(2)
raise TimeoutError(f"No SMS received within {timeout_seconds}s")
def ravi_poll_email(
subject_contains: str = "",
timeout_seconds: int = 30,
identity: str | None = None,
) -> dict:
"""Poll for an unread email, optionally filtering by subject substring."""
retries = timeout_seconds // 2
for _ in range(retries):
threads = ravi(["inbox", "email", "--unread"], identity)
for t in threads:
if subject_contains.lower() in t.get("subject", "").lower():
return t
if threads and not subject_contains:
return threads[0]
time.sleep(2)
raise TimeoutError(f"No matching email received within {timeout_seconds}s")
def ravi_get_secret(key: str, identity: str | None = None) -> dict:
"""Retrieve an API key from the Ravi vault by key name."""
return ravi(["secrets", "get", key], identity)
Startup: Loading Secrets at Agent Init
LobsterAI reads credentials at startup. Instead of placing secrets in config.yml, load them from the Ravi vault:
# lobsterai_startup.py
import subprocess, json, os
def load_agent_secrets() -> None:
"""Inject Ravi vault secrets into the process environment at startup."""
keys = ["TELEGRAM_BOT_TOKEN", "OPENAI_API_KEY", "ELEVENLABS_API_KEY"]
for key in keys:
result = subprocess.run(
["ravi", "secrets", "get", key, "--json"],
capture_output=True, text=True,
)
if result.returncode == 0:
data = json.loads(result.stdout)
os.environ[key] = data["value"]
else:
print(f"Warning: secret {key} not found in Ravi vault")
load_agent_secrets()
# Now os.environ["TELEGRAM_BOT_TOKEN"] is set — pass to LobsterAI config
Headless / Docker Deployment
LobsterAI running in Docker cannot do an interactive ravi auth login. Inject authentication via RAVI_ACCESS_TOKEN:
# Extract token from your local machine (once)
TOKEN=$(jq -r .access_token ~/.ravi/auth.json)
# Inject at container start — no browser flow needed inside
docker run \
-e RAVI_ACCESS_TOKEN="$TOKEN" \
-e RAVI_IDENTITY_NAME="lobsterai-prod" \
lobsterai:latest
Or use the REST API directly (no CLI subprocess, no auth file):
# ravi_rest.py — pure REST, no CLI dependency
import os, httpx
BASE = "https://app.ravi.app"
TOKEN = os.environ["RAVI_ACCESS_TOKEN"]
IDENTITY = os.environ.get("RAVI_IDENTITY_NAME", "lobsterai-prod")
headers = {
"Authorization": f"Bearer {TOKEN}",
"X-Ravi-Identity": IDENTITY,
}
def get_sms_inbox() -> list[dict]:
resp = httpx.get(f"{BASE}/api/sms/", headers=headers)
resp.raise_for_status()
return resp.json()["results"]
See TypeScript REST API for the full REST client.
Idempotent Identity Bootstrap
LobsterAI restarts frequently (container deploys, OS reboots). Bootstrap the identity idempotently so restarts are safe:
# bootstrap.py
import subprocess, json
def get_or_create_identity(name: str) -> dict:
"""Return existing identity by name, or create it if it doesn't exist."""
result = subprocess.run(
["ravi", "identity", "list", "--json"],
capture_output=True, text=True, check=True,
)
identities = json.loads(result.stdout)
for identity in identities:
if identity["name"] == name:
return identity
result = subprocess.run(
["ravi", "identity", "create", "--name", name, "--json"],
capture_output=True, text=True, check=True,
)
return json.loads(result.stdout)
# Call once at startup — safe to re-run on every restart
identity = get_or_create_identity("lobsterai-prod")
print(f"Agent email: {identity['inbox']} phone: {identity['phone']}")
The same persistent identity means:
- Signal contacts accumulated over time stay intact
- Telegram reputation (trust signals, group memberships) survives restarts
- Email threads remain coherent — replies arrive in the same inbox
DingTalk and Feishu Setup
LobsterAI integrates with DingTalk and Feishu. Both require phone number verification during bot registration. Use the Ravi phone number:
# Get provisioned phone for DingTalk/Feishu bot registration
PHONE=$(ravi get phone --json | jq -r .phone_number)
echo "Register DingTalk bot with phone: $PHONE"
# After triggering OTP
for i in $(seq 1 15); do
OTP=$(ravi inbox sms --unread --json | jq -r '.[0].preview // empty')
[ -n "$OTP" ] && { echo "OTP: $OTP"; break; }
sleep 2
done
[ -z "$OTP" ] && { echo "OTP timeout" >&2; exit 1; }
Next Steps
- Production Patterns — ephemeral identity lifecycle, token refresh, parallel-safe scoping
- TypeScript REST API — REST client without CLI subprocess
- MCP Server — expose Ravi tools via MCP for Claude Agent SDK native tool calling
- Harness Integration — credential injection patterns for containerized agents