TURION.AI
Tutorials

Deploying AI Agents to Production: A Comprehensive Guide

Andrius Putna 4 min read
#ai#agents#production#deployment#monitoring#scaling#devops#tutorial

Deploying AI Agents to Production

Building an AI agent that works in development is one thing. Deploying it to handle real users, unpredictable inputs, and 24/7 uptime is another challenge entirely. This guide covers the essential considerations for taking your AI agents from prototype to production-ready systems.

What You’ll Learn

By the end of this guide, you’ll understand:

Prerequisites

This guide assumes you have:


Scaling AI Agents

AI agents present unique scaling challenges. Unlike traditional web services, agent requests can take seconds or minutes and consume significant memory for context management.

Horizontal Scaling with Stateless Design

The first principle of scalable agents is statelessness. Each request should be independent, with state stored externally:

from redis import Redis
from langgraph.checkpoint.redis import RedisSaver

# External state storage
redis_client = Redis(host="redis-cluster.internal", port=6379)
checkpointer = RedisSaver(redis_client)

# Create agent with external checkpointer
agent = create_agent().compile(checkpointer=checkpointer)

With state externalized, you can run multiple agent instances behind a load balancer. Any instance can handle any request because conversation history lives in Redis, not local memory.

Queue-Based Architecture

For long-running agent tasks, implement a queue-based architecture:

import asyncio
from celery import Celery

app = Celery('agent_tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3)
def run_agent_task(self, thread_id: str, user_message: str):
    """Process agent task asynchronously."""
    try:
        result = agent.invoke(
            {"messages": [HumanMessage(content=user_message)]},
            {"configurable": {"thread_id": thread_id}}
        )
        return {"status": "success", "response": result["messages"][-1].content}
    except Exception as e:
        self.retry(countdown=2 ** self.request.retries)

This pattern decouples request handling from agent execution. Your API responds immediately with a task ID, and clients poll for results. Benefits include:

Container Resource Limits

AI agents can consume substantial memory, especially with large context windows. Set appropriate limits:

# kubernetes deployment
resources:
  requests:
    memory: "2Gi"
    cpu: "500m"
  limits:
    memory: "4Gi"
    cpu: "2000m"

Monitor memory usage carefully. Context windows of 100k+ tokens can require several gigabytes of RAM when processing complex documents.


Monitoring and Observability

You cannot fix what you cannot see. Production agents need comprehensive monitoring across three dimensions: metrics, logs, and traces.

Key Metrics to Track

Implement metrics collection for critical agent behaviors:

from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
agent_requests = Counter('agent_requests_total', 'Total agent requests', ['status'])
agent_latency = Histogram('agent_latency_seconds', 'Agent request latency')
tool_calls = Counter('agent_tool_calls_total', 'Tool invocations', ['tool_name'])
token_usage = Counter('agent_tokens_total', 'Token usage', ['type'])
active_sessions = Gauge('agent_active_sessions', 'Currently active sessions')

def monitored_agent_call(thread_id: str, message: str):
    """Wrap agent calls with metrics collection."""
    active_sessions.inc()
    start_time = time.time()

    try:
        result = agent.invoke(
            {"messages": [HumanMessage(content=message)]},
            {"configurable": {"thread_id": thread_id}}
        )
        agent_requests.labels(status="success").inc()
        return result
    except Exception as e:
        agent_requests.labels(status="error").inc()
        raise
    finally:
        agent_latency.observe(time.time() - start_time)
        active_sessions.dec()

Essential metrics to monitor:

Structured Logging

Log agent decisions and tool calls in a structured format for easier debugging:

import structlog
from langchain.callbacks.base import BaseCallbackHandler

logger = structlog.get_logger()

class ProductionCallbackHandler(BaseCallbackHandler):
    def __init__(self, thread_id: str):
        self.thread_id = thread_id

    def on_tool_start(self, tool_name: str, tool_input: dict, **kwargs):
        logger.info(
            "tool_started",
            thread_id=self.thread_id,
            tool_name=tool_name,
            input_preview=str(tool_input)[:200]
        )

    def on_tool_end(self, output: str, **kwargs):
        logger.info(
            "tool_completed",
            thread_id=self.thread_id,
            output_length=len(output)
        )

    def on_llm_error(self, error: Exception, **kwargs):
        logger.error(
            "llm_error",
            thread_id=self.thread_id,
            error_type=type(error).__name__,
            error_message=str(error)
        )

Distributed Tracing

For complex agent workflows, implement distributed tracing to understand request flow:

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer("ai-agent")

async def traced_agent_call(thread_id: str, message: str):
    with tracer.start_as_current_span("agent_request") as span:
        span.set_attribute("thread_id", thread_id)
        span.set_attribute("message_length", len(message))

        try:
            result = await agent.ainvoke(...)
            span.set_status(Status(StatusCode.OK))
            return result
        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

Tracing reveals where time is spent: LLM inference, tool execution, or network latency.


Error Handling Patterns

Production agents encounter errors you never see in development. Build resilience from the start.

Graceful Degradation

When tools fail, agents should continue functioning with reduced capabilities:

@tool
def search_database(query: str) -> str:
    """Search the product database."""
    try:
        results = db.search(query, timeout=5.0)
        return format_results(results)
    except TimeoutError:
        return "Database search is temporarily slow. I can still help with general questions."
    except ConnectionError:
        return "Database unavailable. Please try again in a few minutes."

Return informative messages rather than raising exceptions. This lets the agent explain the situation and potentially try alternative approaches.

Circuit Breakers

Prevent cascading failures when external services become unreliable:

from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=30)
def call_external_api(endpoint: str, payload: dict):
    """External API call with circuit breaker protection."""
    response = httpx.post(endpoint, json=payload, timeout=10.0)
    response.raise_for_status()
    return response.json()

When a service fails repeatedly, the circuit opens and fails fast rather than waiting for timeouts. After the recovery period, it allows test requests through to check if the service recovered.

Request Validation

Validate inputs before they reach your agent:

from pydantic import BaseModel, Field, validator

class AgentRequest(BaseModel):
    message: str = Field(..., min_length=1, max_length=10000)
    thread_id: str = Field(..., pattern=r'^[a-zA-Z0-9-]{1,64}$')

    @validator('message')
    def sanitize_message(cls, v):
        # Remove potential prompt injection patterns
        if any(phrase in v.lower() for phrase in ['ignore previous', 'system prompt']):
            raise ValueError("Invalid message content")
        return v.strip()

Cost Management

LLM API costs can escalate rapidly in production. Implement controls from day one.

Token Budget Enforcement

Set hard limits on token usage per request:

class TokenBudgetExceeded(Exception):
    pass

class BudgetedAgent:
    def __init__(self, agent, max_tokens: int = 10000):
        self.agent = agent
        self.max_tokens = max_tokens
        self.tokens_used = 0

    def invoke(self, input_data, config):
        if self.tokens_used >= self.max_tokens:
            raise TokenBudgetExceeded(
                f"Token budget of {self.max_tokens} exceeded"
            )

        result = self.agent.invoke(input_data, config)
        self.tokens_used += self._count_tokens(result)
        return result

Caching Strategies

Cache expensive operations to reduce redundant API calls:

from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def cached_embedding(text: str) -> list[float]:
    """Cache embeddings to avoid redundant API calls."""
    return embedding_model.embed(text)

def get_cache_key(messages: list) -> str:
    """Generate cache key for conversation state."""
    content = "".join(m.content for m in messages)
    return hashlib.sha256(content.encode()).hexdigest()

Consider caching at multiple levels:

Model Selection by Task

Not every request needs your most capable model:

def select_model(task_complexity: str) -> str:
    """Choose appropriate model based on task."""
    model_map = {
        "simple": "gpt-4o-mini",      # Simple queries, low cost
        "standard": "gpt-4o",          # Standard reasoning tasks
        "complex": "gpt-4o",           # Complex multi-step tasks
    }
    return model_map.get(task_complexity, "gpt-4o-mini")

Route simple queries to cheaper models. Use complexity detection based on message length, tool requirements, or explicit user flags.


Deployment Checklist

Before going live, verify:


Key Takeaways

Production deployment is an ongoing process. Start with solid foundations, monitor closely, and iterate based on real-world behavior. The patterns in this guide apply whether you’re serving hundreds or millions of agent interactions.


Building your first agent? Start with our LangGraph tutorial or learn about custom tools to extend agent capabilities. For comprehensive production guidance, see our Building Production AI Agents pillar guide.

← Back to Blog