Wire observability recording into LLM client

Add fire-and-forget background task manager (background.py) and
LLM call recording (recording.py). Every LLMClient.call now records
trace_id, model, latency, tokens, cost, and errors via fire-and-forget.
drain() awaits pending tasks on shutdown. Currently logs only —
database persistence deferred until llm_calls table is wired.
This commit is contained in:
Kevin Turcios 2026-04-22 20:30:10 -05:00
parent a62f1ecd03
commit 1d70d65914
5 changed files with 169 additions and 3 deletions

View file

@ -13,6 +13,7 @@ from codeflash_api._config import settings
from codeflash_api.db._engine import create_pool
from codeflash_api.db._queries import Queries
from codeflash_api.llm._client import LLMClient
from codeflash_api.observability._background import drain
if TYPE_CHECKING:
from collections.abc import AsyncIterator
@ -34,6 +35,7 @@ async def _lifespan(app: FastAPI) -> AsyncIterator[dict[str, object]]:
state: dict[str, object] = {}
yield state
await drain()
await pool.close()
log.info("Shutting down codeflash-api")

View file

@ -13,6 +13,8 @@ from codeflash_api.llm._retry import (
ANTHROPIC_MAX_INPUT_TOKENS,
CHARS_PER_TOKEN_ESTIMATE,
)
from codeflash_api.observability._background import fire_and_forget
from codeflash_api.observability._recording import record_llm_call
if TYPE_CHECKING:
from anthropic import AsyncAnthropicBedrock
@ -110,18 +112,25 @@ class LLMClient:
max_tokens: int = 16384,
call_type: str = "",
trace_id: str = "",
user_id: str | None = None,
) -> LLMResponse:
"""
Route to the correct provider and return an LLMResponse.
"""
self._ensure_clients()
start = time.monotonic()
result: LLMResponse | None = None
error: Exception | None = None
try:
if llm.model_type == "anthropic":
result = await self._call_anthropic(llm, messages, max_tokens)
result = await self._call_anthropic(
llm, messages, max_tokens
)
elif llm.model_type == "openai":
result = await self._call_openai(llm, messages, max_tokens)
result = await self._call_openai(
llm, messages, max_tokens
)
else:
msg = f"Unsupported model type: {llm.model_type}"
raise ValueError(msg)
@ -133,7 +142,8 @@ class LLMClient:
cost=calculate_llm_cost(result.raw_response, llm),
)
except Exception:
except Exception as exc:
error = exc
latency_ms = int((time.monotonic() - start) * 1000)
log.exception(
"LLM call failed: type=%s trace=%s latency=%dms",
@ -143,6 +153,22 @@ class LLMClient:
)
raise
finally:
latency_ms = int((time.monotonic() - start) * 1000)
fire_and_forget(
record_llm_call(
trace_id=trace_id,
call_type=call_type,
model_name=llm.name,
messages=messages,
latency_ms=latency_ms,
user_id=user_id,
result=result,
error=error,
llm_cost=result.cost if result else None,
)
)
return result
async def _call_anthropic(

View file

@ -0,0 +1,41 @@
"""Fire-and-forget background task management."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
log = logging.getLogger(__name__)
_background_tasks: set[asyncio.Task[Any]] = set()
def fire_and_forget(
coro: Any,
) -> asyncio.Task[Any]:
"""
Schedule *coro* as a background task without blocking.
"""
task = asyncio.create_task(coro)
_background_tasks.add(task)
def _on_done(t: asyncio.Task[Any]) -> None:
_background_tasks.discard(t)
if t.cancelled():
return
if exc := t.exception():
log.warning("Background task failed: %s", exc)
task.add_done_callback(_on_done)
return task
async def drain() -> None:
"""
Await all pending background tasks.
"""
while _background_tasks:
await asyncio.gather(
*_background_tasks, return_exceptions=True
)

View file

@ -0,0 +1,93 @@
"""Fire-and-forget LLM call and error recording."""
from __future__ import annotations
import logging
import uuid
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from codeflash_api.llm._client import LLMResponse
log = logging.getLogger(__name__)
async def record_llm_call(
trace_id: str,
call_type: str,
model_name: str,
messages: list[dict[str, Any]],
latency_ms: int,
*,
user_id: str | None = None,
result: LLMResponse | None = None,
error: Exception | None = None,
llm_cost: float | None = None,
) -> str:
"""
Record an LLM call. Returns the generated ``llm_call_id``.
Currently logs only database persistence will be added when
the ``llm_calls`` table is wired up.
"""
llm_call_id = str(uuid.uuid4())
status = "success" if result else "failed"
input_tokens = result.usage.input_tokens if result else 0
output_tokens = result.usage.output_tokens if result else 0
log.info(
"LLM call recorded: id=%s trace=%s type=%s model=%s"
" status=%s tokens=%d/%d cost=%.6f latency=%dms",
llm_call_id,
trace_id,
call_type,
model_name,
status,
input_tokens,
output_tokens,
llm_cost or 0.0,
latency_ms,
)
if error:
log.info(
"LLM call error: id=%s error_type=%s error=%s",
llm_call_id,
type(error).__name__,
error,
)
return llm_call_id
async def record_error(
trace_id: str,
error_type: str,
error_message: str,
*,
error_category: str = "llm_error",
severity: str = "error",
error_code: str | None = None,
context: dict[str, Any] | None = None,
) -> str:
"""
Record an error. Returns the generated ``error_id``.
Currently logs only database persistence will be added when
the ``optimization_errors`` table is wired up.
"""
error_id = str(uuid.uuid4())
log.warning(
"Error recorded: id=%s trace=%s type=%s category=%s"
" severity=%s message=%s",
error_id,
trace_id,
error_type,
error_category,
severity,
error_message,
)
return error_id

View file

@ -101,8 +101,12 @@ ignore = [
]
"packages/codeflash-api/src/codeflash_api/llm/_client.py" = [
"PLC0415", # conditional imports for event loop safety (clients recreated on loop change)
"PLR0913", # LLMClient.call needs many params for provider routing + observability
"TRY301", # raise inside try is the intended pattern for cost-tracking on unsupported model type
]
"packages/codeflash-api/src/codeflash_api/observability/_recording.py" = [
"PLR0913", # recording functions faithfully match Django signatures
]
"packages/codeflash-api/src/codeflash_api/optimize/_context.py" = [
"PLR2004", # magic values in faithfully ported version parsing and humanize_ns
]