Architecture

How the AI pipeline works

A two-graph LangGraph system that fetches account data, runs parallel LLM analysis, and produces consulting-grade reports with sales scripts.

Overview

The backend is a Python agent built on LangGraph and served via CopilotKit. It exposes tools that the chat UI can invoke: selecting accounts, finding opportunities, generating reports, and editing them conversationally.

All heavy lifting happens in two LangGraph StateGraph pipelines. The chat agent orchestrates them but never sends large data payloads through the chat channel. Data stays server-side; only summaries reach the user.

Key principle: the agent fetches data from the Next.js API, processes it through LLM pipelines, and writes results back to the database. The frontend reads from the DB via React Query. Chat is for coordination, not data transport.

The two LangGraph pipelines

Report Generation Graph

Handles the core workload: turning account data into full reports with sales scripts. The graph uses LangGraph's Send() primitive for fan-out parallelism, with concurrency-limited LLM calls and multi-step validation.

REPORT GENERATION GRAPH START fan_out Send() per account ID ACCOUNT 1 Fetch data parallel, ungated semaphore (max 5) LLM: Report Validate + rubric LLM: Fix sections? Numeric audit LLM: Sales script Save to DB ACCOUNT 2 Fetch data semaphore (max 5) LLM: Report Validate + rubric LLM: Fix sections? Numeric audit LLM: Sales script Save to DB ACCOUNT N Fetch data semaphore (max 5) LLM: Report Validate + rubric LLM: Fix sections? Numeric audit LLM: Sales script Save to DB collect_results operator.add reducer END LEGEND LLM call (with retry) Pydantic validation Non-blocking check Conditional LLM call

Opportunities Graph

A simpler pipeline. When the user clicks "Find Opportunities," this graph receives a batch of account IDs, fetches all summaries in one API call, and sends the full dataset to the LLM for cross-account triage.

OPPORTUNITIES GRAPH START Fetch summaries shared HTTP client LLM: Classify all upsell / negotiation / churn Pydantic validate OpportunitiesResult END Single LLM call with all accounts: cross-portfolio comparison requires full context

Data flow in detail

1. Account data ingestion

Account data lives in PostgreSQL, managed by Drizzle ORM in the Next.js app. The schema uses a flexible account_usage_metrics table (key-value: metric name, current value, limit value, unit) instead of rigid per-metric columns. This means any metric the user tracks (seats, API calls, storage, automation runs) is captured without schema changes.

The Python agent fetches data through the Next.js REST API (/api/account_summaries), not directly from the database. Responses are cached in a file-based JSON cache with configurable TTL (1 day in dev, 10 minutes in prod).

2. Multi-pass LLM analysis with validation

Each account goes through multiple sequential LLM calls within the process_account node:

  1. Pass 1: Analytical report. The LLM receives the account summary and 5-8 relevant historical deals (filtered by tier match and deal outcome). It produces a structured markdown report with situation analysis, key metrics, risk assessment, and next steps. The last line of output is a JSON metadata object validated by a Pydantic ReportMetadata model (enforcing valid enum values, 0-100 ranges, etc.).
  2. Validation: Section check. After Pass 1, the pipeline verifies all 9 required report sections are present (Executive Summary, Situation, Complication, Resolution, Key Metrics, Evidence, Risks, Next Steps, Key Question). If sections are missing, a focused re-prompt asks the LLM to add only the missing parts.
  3. Validation: Numeric consistency. A non-blocking check verifies that key input numbers (current_value, limit_value from each usage metric) appear in the report text. If >50% of input values are absent, a warning is logged. This does not block saving.
  4. Metadata fallback. If the JSON metadata line is missing or invalid, a with_structured_output(ReportMetadata) call extracts metadata from the report text as a fallback, rather than silently defaulting to "healthy" / 50%.
  5. Pass 2: Sales script. Using the report from Pass 1 as context, a second LLM call generates a tailored sales script with talk tracks, objection handlers, and a closing framework. The script is appended to the report body before saving.

All LLM calls use invoke_with_retry() with exponential backoff (3 retries, handles rate limits and timeouts). Each call is gated by an asyncio.Semaphore to limit concurrent LLM requests across all parallel account forks.

Why two passes? The analytical report and sales script serve different audiences and require different tones. Separating them lets each prompt focus on one task, producing better output than a single monolithic prompt. The second pass benefits from having the structured analysis as context.

3. Historical deal matching

Before calling the LLM, the pipeline filters the historical deals library to find the most relevant precedents. Deals are scored by:

The top 8 deals are passed to the LLM as evidence for its recommendations.

4. Report persistence

Reports are saved as raw markdown via POST /api/accounts/:id/account_reports. The database supports multiple reports per account (regeneration creates a new row, never overwrites). The frontend renders markdown using custom React components per section, with react-markdown as fallback.

Each report section is editable individually: users can edit by hand in the modal, or ask the AI via chat. The agent state tracks report_manually_edited and report_latest_content so it always fetches the latest version before applying changes.

Parallel processing

Fan-out with Send()

The report generation graph processes accounts in parallel using LangGraph's Send() API. The fan_out node returns a list of Send objects, each targeting the process_account node with a single account ID. LangGraph schedules all forks concurrently.

def fan_out(state: ReportGraphState) -> list[Send]:
    """Emit one Send per account ID for parallel processing."""
    return [
        Send("process_account", {"account_id": aid})
        for aid in state["account_ids"]
    ]

Results from each fork are merged back into the parent state using Annotated[list[dict], operator.add] reducers. This means every process_account return value is concatenated into a single results list without any explicit synchronization code.

Scaling characteristics

Why not batch the LLM calls? Each account needs its own prompt with account-specific data and filtered historical deals. There is no shared context across accounts, so per-account parallelism is the natural decomposition. The fan-out pattern makes adding new per-account steps (e.g. enrichment, validation) trivial: just add nodes to the subgraph.

Opportunities analysis: a different strategy

The opportunities graph does not fan out. Instead, it sends all account summaries to the LLM in a single call. This is deliberate: opportunity identification requires cross-account comparison (which accounts stand out relative to the portfolio?), so the LLM needs to see the full picture at once. A single call with a compact JSON payload is more effective than N separate calls that each lack portfolio context.

Landing page demo pipeline

The homepage "try it now" experience uses a separate tool (analyze_raw_data) that bypasses the database entirely. Users paste account data as JSON or free text; the tool parses what it can and passes the rest directly to the LLM prompt. The prompt is schema-agnostic, so it handles any format without lossy conversion steps.

Input formatHow it's processed
JSON Parsed and transformed to an AccountSummary shape. Handles both the new flexible usage_metrics array and the legacy paired-fields format.
Free text Passed directly to the LLM prompt as raw_data. No intermediary parsing. The prompt instructs the LLM to scan whatever data is present.

Agent tools

The CopilotKit agent exposes seven tools, each returning a LangGraph Command that updates the shared state:

ToolPurpose
select_accounts Mark account IDs as selected (pending report generation)
find_opportunities Run the opportunities graph; pre-select the best candidates
generate_reports Run the report generation graph for given account IDs
get_report_content Fetch latest report from DB (respects manual edits)
update_report Apply conversational edits to an existing report via LLM
get_account_reports Read current selection state
analyze_raw_data Generate report from pasted data (landing page demo)

Production resilience

LLM output validation

All LLM metadata output is validated through a Pydantic ReportMetadata model that enforces proposition_type as a strict enum, success_percent as 0-100, priority_score as 1-10, and other fields. Previously, _parse_report_metadata() silently returned safe defaults on any parse failure, meaning a garbage LLM response would produce a saved report classified as "healthy" at 50% success with no indication of error.

On validation failure, a structured-output fallback call asks the LLM to extract metadata from the report text, with the response constrained to the Pydantic schema. This eliminates silent misclassification.

Retry and backoff

invoke_with_retry() wraps all LLM calls with exponential backoff (base delay 1s, max 3 retries). It catches transient failures: openai.RateLimitError, openai.APIError, httpx.TimeoutException, and httpx.ConnectError. Each retry is logged via the structured tracing system. Non-retryable errors propagate immediately.

Concurrency control

The Send() fan-out dispatches all accounts simultaneously. Without limits, 50 accounts = 100 concurrent LLM calls, which will hit rate limits. A module-level asyncio.Semaphore (default 5, configurable via MAX_CONCURRENT_LLM env var) wraps each model.ainvoke() call. Data fetching remains fully parallel; only LLM calls are gated.

HTTP resilience

A shared httpx.AsyncClient with connection pooling and httpx.AsyncHTTPTransport(retries=2) replaces scattered per-request client instantiation. This reduces connection overhead and adds transport-level retries for transient network errors.

Health check and graceful degradation

GET /api/health checks two components: database (via SELECT 1 through Drizzle) and agent (HTTP ping to LangGraph URL). Returns 200 with "status":"ok" when both are healthy, 503 with "status":"degraded" and per-component status otherwise. Docker Compose uses this for container healthcheck.

The CopilotKit route handler is wrapped in try/catch, returning a 503 with a user-friendly error message when the agent is unreachable instead of an unhandled 500.

Evaluation and observability

Evaluation harness

15 contract cases in evaluation/dataset.json covering: classic upsell, overdue negotiation, churn risk, at-capacity, healthy, free-text input, narrow-but-deep usage, single-metric accounts, high-ARR enterprise with mixed signals, boundary cases (exactly 100%, exactly 85%), imminent renewal, and messy free-text data.

Each case is scored for classification accuracy (with equivalents for near-misses) and report quality: section completeness (9 required sections), metric table coverage (input numbers appearing in output), and ARR at Risk presence. A --mock flag uses pre-recorded LLM responses from evaluation/fixtures/ for deterministic CI testing.

Metrics

In-memory counters in tracing.py track: reports_generated_total (by proposition type), report_generation_errors_total, llm_retries_total, and report_generation_avg_duration_ms. Exposed via get_metrics() and proxied through GET /api/metrics.

LangSmith

Optional LangSmith integration for LangGraph trace visibility. Set LANGSMITH_API_KEY in environment to enable. Provides end-to-end trace visualization of LangGraph runs including tool calls, state transitions, and LLM inputs/outputs.

Langfuse

Optional Langfuse integration for LLM-specific observability. When LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY are set, all LLM calls are automatically traced via a Langfuse CallbackHandler passed to each model.ainvoke() call. This provides token/cost tracking, latency visualization, and prompt versioning without custom dashboards.

The evaluator node's quality scores (report_quality, metrics_accurate, classification_justified) are attached to Langfuse traces as evaluation metrics, creating an automatic quality-over-time dashboard. If Langfuse keys are not set, the existing structured JSON logging continues unchanged.

Report quality evaluator

After the initial LLM report generation, each report is scored by an LLM evaluator using with_structured_output(ReportEvaluation). The evaluator receives the account input data, historical deals, generated report, and metadata, then scores against a quality rubric:

On fail, the pipeline re-analyzes the account with the evaluator's issues included as feedback in the prompt (max 1 retry). The retry is then re-evaluated. On marginal, the report is saved with a warning logged. On pass, the report proceeds to save. This implements a generate-then-verify pattern that catches hallucinations, fabricated deal references, and misclassifications before reports reach users.