lemon (Elixir / BEAM)

lemon is a BEAM-native AI assistant runtime built as a Mix umbrella with 17 supervised OTP applications and a GenServer per conversation session. Its architecture — one process per session, supervised fault isolation, concurrent by default — maps naturally onto Ravi’s Identity model: one Identity per agent process, each scoped to its own config directory.

This page shows two integration paths for Elixir / BEAM runtimes:

  1. Subprocess via System.cmd/3 — shell out to the ravi CLI, pass RAVI_CONFIG_DIR per process
  2. Direct REST API via Req — call Ravi’s HTTP API from Elixir code, no CLI required

Both patterns work in supervised OTP trees. The subprocess approach is simpler to integrate incrementally; the REST approach removes the CLI binary dependency and is preferred for containerized or distributed deployments.

Prerequisites

  • Ravi CLI installed and authenticated on the host (for subprocess approach)
  • RAVI_ACCESS_TOKEN environment variable set, or ~/.ravi/auth.json present (for REST approach)
  • Elixir dependencies in mix.exs:
# mix.exs
defp deps do
  [
    {:req, "~> 0.5"},    # HTTP client (REST approach)
    {:jason, "~> 1.4"}   # JSON parsing (both approaches)
  ]
end

Subprocess approach

System.cmd/3 wraps the ravi CLI. Pass RAVI_CONFIG_DIR in the environment to scope each agent to its own identity — never call ravi identity use from concurrent processes, as it writes shared global state.

defmodule MyApp.RaviAgent do
  @moduledoc """
  Wraps ravi CLI for a single agent process.
  Each agent gets an isolated config_dir — safe to run in parallel OTP supervisors.
  """

  @doc """
  Provision a new Ravi identity and return its config dir path.
  Call once at agent startup (e.g., in GenServer.init/1).
  """
  def provision(agent_name) do
    config_dir = "/tmp/ravi-#{agent_name}"
    File.mkdir_p!(config_dir)

    {output, 0} =
      System.cmd("ravi", ["identity", "create", "--name", agent_name, "--json"],
        stderr_to_stdout: false
      )

    identity = Jason.decode!(output)

    # Write a per-agent config so all subsequent CLI calls use this identity
    config_path = Path.join(config_dir, "config.json")
    File.write!(config_path, Jason.encode!(%{"identity_uuid" => identity["uuid"]}))

    {:ok, %{
      uuid:       identity["uuid"],
      email:      identity["inbox"],
      phone:      identity["phone"],
      config_dir: config_dir
    }}
  end

  @doc """
  Run any ravi CLI command scoped to this agent's config dir.
  """
  def cmd(config_dir, args) do
    env = [{"RAVI_CONFIG_DIR", config_dir}]
    {output, exit_code} = System.cmd("ravi", args, env: env, stderr_to_stdout: true)
    {output, exit_code}
  end

  @doc """
  Poll SMS inbox for an OTP code with retry. Returns the first unread preview or nil.
  Retries up to max_attempts times with interval_ms between each.
  """
  def poll_sms_otp(config_dir, max_attempts \\ 15, interval_ms \\ 2000) do
    Enum.reduce_while(1..max_attempts, nil, fn attempt, _acc ->
      {output, 0} =
        cmd(config_dir, ["inbox", "sms", "--unread", "--json"])

      case Jason.decode!(output) do
        [%{"preview" => preview} | _] when is_binary(preview) and preview != "" ->
          {:halt, preview}

        _ ->
          if attempt < max_attempts do
            Process.sleep(interval_ms)
            {:cont, nil}
          else
            {:halt, nil}
          end
      end
    end)
  end

  @doc """
  Poll email inbox for an unread message. Returns the first unread thread or nil.
  """
  def poll_email(config_dir, max_attempts \\ 15, interval_ms \\ 2000) do
    Enum.reduce_while(1..max_attempts, nil, fn attempt, _acc ->
      {output, 0} =
        cmd(config_dir, ["inbox", "email", "--unread", "--json"])

      case Jason.decode!(output) do
        [thread | _] -> {:halt, thread}

        [] ->
          if attempt < max_attempts do
            Process.sleep(interval_ms)
            {:cont, nil}
          else
            {:halt, nil}
          end
      end
    end)
  end
end

GenServer integration

defmodule MyApp.AgentWorker do
  use GenServer

  def start_link(agent_name) do
    GenServer.start_link(__MODULE__, agent_name, name: via_tuple(agent_name))
  end

  def init(agent_name) do
    {:ok, identity} = MyApp.RaviAgent.provision(agent_name)
    {:ok, %{agent_name: agent_name, identity: identity}}
  end

  def handle_call({:signup, service_url}, _from, state) do
    email = state.identity.email
    phone = state.identity.phone
    config_dir = state.identity.config_dir

    # Use email/phone in your signup flow...
    # Then poll for OTP:
    otp = MyApp.RaviAgent.poll_sms_otp(config_dir)

    {:reply, {:ok, %{email: email, phone: phone, otp: otp}}, state}
  end

  defp via_tuple(name), do: {:via, Registry, {MyApp.Registry, name}}
end

REST API approach

Call Ravi’s HTTP API directly using Req. This removes the CLI dependency and integrates cleanly into OTP supervision trees. Authentication uses a Bearer token loaded from the environment or ~/.ravi/auth.json.

Loading credentials

defmodule MyApp.RaviClient do
  @base_url "https://api.ravi.app"

  @doc """
  Read the Ravi access token from RAVI_ACCESS_TOKEN env var or ~/.ravi/auth.json.
  For container / headless deployments, set RAVI_ACCESS_TOKEN at launch time.
  """
  def load_token do
    case System.get_env("RAVI_ACCESS_TOKEN") do
      token when is_binary(token) and token != "" ->
        token

      _ ->
        auth_path = Path.expand("~/.ravi/auth.json")

        case File.read(auth_path) do
          {:ok, contents} ->
            contents |> Jason.decode!() |> Map.fetch!("access_token")

          {:error, _} ->
            raise "No RAVI_ACCESS_TOKEN set and ~/.ravi/auth.json not found. " <>
                  "Run `ravi auth login` or set RAVI_ACCESS_TOKEN."
        end
    end
  end

  @doc """
  Create a new Ravi identity. Returns the identity map with uuid, inbox, phone.
  """
  def create_identity(token, name) do
    Req.post!(
      "#{@base_url}/api/identities/",
      headers: [{"Authorization", "Bearer #{token}"}],
      json: %{name: name}
    ).body
  end

  @doc """
  List unread SMS for an identity. The X-Ravi-Identity header scopes this
  to a specific identity without mutating any global state — safe for parallel calls.
  """
  def list_sms(token, identity_uuid, opts \\ []) do
    params = if Keyword.get(opts, :unread, false), do: [unread: true], else: []

    Req.get!(
      "#{@base_url}/api/identities/#{identity_uuid}/inbox/sms/",
      headers: [
        {"Authorization", "Bearer #{token}"},
        {"X-Ravi-Identity", identity_uuid}
      ],
      params: params
    ).body
  end

  @doc """
  List unread email threads for an identity.
  """
  def list_email(token, identity_uuid, opts \\ []) do
    params = if Keyword.get(opts, :unread, false), do: [unread: true], else: []

    Req.get!(
      "#{@base_url}/api/identities/#{identity_uuid}/inbox/email/",
      headers: [
        {"Authorization", "Bearer #{token}"},
        {"X-Ravi-Identity", identity_uuid}
      ],
      params: params
    ).body
  end

  @doc """
  Poll SMS inbox for an OTP code with retry. Returns the first unread preview or nil.
  """
  def poll_sms_otp(token, identity_uuid, max_attempts \\ 15, interval_ms \\ 2000) do
    Enum.reduce_while(1..max_attempts, nil, fn attempt, _acc ->
      case list_sms(token, identity_uuid, unread: true) do
        [%{"preview" => preview} | _] when is_binary(preview) and preview != "" ->
          {:halt, preview}

        _ ->
          if attempt < max_attempts do
            Process.sleep(interval_ms)
            {:cont, nil}
          else
            {:halt, nil}
          end
      end
    end)
  end

  @doc """
  Refresh the access token. Call this in a scheduled GenServer tick for
  long-running processes (tokens expire after ~1 hour).
  """
  def refresh_token(refresh_token) do
    Req.post!(
      "#{@base_url}/api/auth/token/refresh/",
      json: %{refresh: refresh_token}
    ).body["access"]
  end
end

Parallel fleet with REST API

The X-Ravi-Identity header makes the REST approach inherently parallel-safe — each call is scoped to a specific identity UUID without any shared state.

defmodule MyApp.AgentFleet do
  @doc """
  Provision N identities concurrently using Task.async_stream.
  Returns a list of identity maps.
  """
  def provision(fleet_id, agent_names) do
    token = MyApp.RaviClient.load_token()

    agent_names
    |> Task.async_stream(fn name ->
      MyApp.RaviClient.create_identity(token, "#{fleet_id}-#{name}")
    end, max_concurrency: 10)
    |> Enum.map(fn {:ok, identity} -> identity end)
  end

  @doc """
  Fan out a task to all agents in the fleet, collect results.
  Each call is independently scoped via X-Ravi-Identity — no coordination needed.
  """
  def broadcast_and_collect(token, identities, task_fn) do
    identities
    |> Task.async_stream(fn identity ->
      task_fn.(token, identity["uuid"])
    end, max_concurrency: length(identities))
    |> Enum.map(fn {:ok, result} -> result end)
  end
end

lemon-specific integration notes

lemon’s OTP supervision model maps cleanly onto this pattern:

  • One RaviAgent GenServer per lemon session — provision in init/1, clean up in terminate/2
  • Supervisor restarts are safeprovision/1 is idempotent if you check for an existing config dir before creating a new identity
  • Use RAVI_ACCESS_TOKEN in lemon’s Mix config for containerized deployments rather than relying on ~/.ravi/auth.json at runtime
# config/runtime.exs — inject token for headless deployments
config :my_lemon_app, :ravi_token,
  System.get_env("RAVI_ACCESS_TOKEN") ||
    raise("Missing RAVI_ACCESS_TOKEN — run `ravi auth login` locally or inject via env")

:::note Token TTL: Ravi access tokens expire after approximately 1 hour. For long-running lemon sessions, schedule a periodic RaviClient.refresh_token/1 call from a supervised GenServer (e.g., every 50 minutes). :::

Next steps