perf: fire-and-forget logging to reduce response latency 100-300ms (#2525)
## Summary - Move `safe_log_features()` and `update_optimization_cost()` out of blocking `TaskGroup`s into fire-and-forget background tasks across 4 optimization endpoints (optimizer, optimizer_line_profiler, jit_rewrite, adaptive_optimizer) - These DB writes are analytics-only and don't affect response bodies — waiting for them adds 100-300ms per request unnecessarily - Add `aiservice/background.py` with `fire_and_forget()` helper using the same `set` + `add_done_callback` pattern already used in `LLMClient` - `get_or_create_optimization_event()` remains awaited where the response needs `event.id` ## Test plan - [x] All 550 tests pass locally - [ ] Verify response latency improvement in production metrics after deploy - [ ] Confirm `safe_log_features` and `update_optimization_cost` still complete successfully in background (check DB records) --------- Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
de0f30ae15
commit
d0e97992d6
6 changed files with 164 additions and 122 deletions
34
django/aiservice/aiservice/background.py
Normal file
34
django/aiservice/aiservice/background.py
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any, Coroutine
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_background_tasks: set[asyncio.Task[Any]] = set()
|
||||
|
||||
|
||||
def fire_and_forget(coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
|
||||
"""Schedule a coroutine as a background task without blocking the caller.
|
||||
|
||||
Holds a strong reference so the task is not garbage-collected before completion.
|
||||
"""
|
||||
task = asyncio.create_task(coro)
|
||||
_background_tasks.add(task)
|
||||
|
||||
def _on_done(t: asyncio.Task[Any]) -> None:
|
||||
_background_tasks.discard(t)
|
||||
if t.cancelled():
|
||||
return
|
||||
if exc := t.exception():
|
||||
logger.warning("Background task failed: %s", exc)
|
||||
|
||||
task.add_done_callback(_on_done)
|
||||
return task
|
||||
|
||||
|
||||
async def drain() -> None:
|
||||
"""Await all pending background tasks. Call during shutdown or test teardown."""
|
||||
while _background_tasks:
|
||||
await asyncio.gather(*_background_tasks, return_exceptions=True)
|
||||
|
|
@ -11,6 +11,7 @@ from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUs
|
|||
from pydantic import ValidationError
|
||||
|
||||
from aiservice.analytics.posthog import ph
|
||||
from aiservice.background import fire_and_forget
|
||||
from aiservice.common_utils import validate_trace_id
|
||||
from aiservice.env_specific import debug_log_sensitive_data
|
||||
from aiservice.llm import llm_client
|
||||
|
|
@ -130,25 +131,24 @@ async def adaptive_optimize(
|
|||
if llm_cost is not None:
|
||||
total_llm_cost += llm_cost
|
||||
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.create_task(update_optimization_cost(trace_id=trace_id, cost=total_llm_cost, user_id=request.user))
|
||||
if hasattr(request, "should_log_features") and request.should_log_features:
|
||||
tg.create_task(
|
||||
safe_log_features(
|
||||
trace_id=data.trace_id,
|
||||
user_id=request.user,
|
||||
optimizations_raw={
|
||||
adaptive_optimization_candidate.optimization_id: adaptive_optimization_candidate.source_code
|
||||
},
|
||||
explanations_raw={
|
||||
adaptive_optimization_candidate.optimization_id: adaptive_optimization_candidate.explanation
|
||||
},
|
||||
optimizations_origin={
|
||||
adaptive_optimization_candidate.optimization_id: {
|
||||
"source": OptimizedCandidateSource.ADAPTIVE,
|
||||
"parent": adaptive_optimization_candidate.parent_id,
|
||||
}
|
||||
},
|
||||
)
|
||||
fire_and_forget(update_optimization_cost(trace_id=trace_id, cost=total_llm_cost, user_id=request.user))
|
||||
if hasattr(request, "should_log_features") and request.should_log_features:
|
||||
fire_and_forget(
|
||||
safe_log_features(
|
||||
trace_id=data.trace_id,
|
||||
user_id=request.user,
|
||||
optimizations_raw={
|
||||
adaptive_optimization_candidate.optimization_id: adaptive_optimization_candidate.source_code
|
||||
},
|
||||
explanations_raw={
|
||||
adaptive_optimization_candidate.optimization_id: adaptive_optimization_candidate.explanation
|
||||
},
|
||||
optimizations_origin={
|
||||
adaptive_optimization_candidate.optimization_id: {
|
||||
"source": OptimizedCandidateSource.ADAPTIVE,
|
||||
"parent": adaptive_optimization_candidate.parent_id,
|
||||
}
|
||||
},
|
||||
)
|
||||
)
|
||||
return 200, adaptive_optimization_candidate
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUs
|
|||
from pydantic import ValidationError
|
||||
|
||||
from aiservice.analytics.posthog import ph
|
||||
from aiservice.background import fire_and_forget
|
||||
from aiservice.common_utils import parse_python_version, validate_trace_id
|
||||
from aiservice.env_specific import debug_log_sensitive_data, debug_log_sensitive_data_from_callable
|
||||
from aiservice.llm import llm_client
|
||||
|
|
@ -248,46 +249,43 @@ async def jit_rewrite(
|
|||
"aiservice-jit-rewrite-optimizations-found",
|
||||
properties={"num_optimizations": len(jit_rewrite_response_items)},
|
||||
)
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
event_task = tg.create_task(
|
||||
get_or_create_optimization_event(
|
||||
event_type="no-pr",
|
||||
user_id=request.user,
|
||||
current_username=data.current_username,
|
||||
repo_owner=data.repo_owner,
|
||||
repo_name=data.repo_name,
|
||||
trace_id=data.trace_id,
|
||||
api_key_id=request.api_key_id,
|
||||
metadata={
|
||||
"codeflash_version": data.codeflash_version,
|
||||
"num_optimizations": len(jit_rewrite_response_items),
|
||||
"experiment_metadata": data.experiment_metadata,
|
||||
},
|
||||
llm_cost=llm_cost,
|
||||
)
|
||||
fire_and_forget(
|
||||
safe_log_features(
|
||||
trace_id=data.trace_id,
|
||||
user_id=request.user,
|
||||
original_code=data.source_code,
|
||||
dependency_code=data.dependency_code,
|
||||
optimizations_raw={op_id: cei["code"] for op_id, cei in code_and_explanations.items()},
|
||||
optimizations_post={cei.optimization_id: cei.source_code for cei in jit_rewrite_response_items},
|
||||
explanations_raw={op_id: cei["explanation"] for op_id, cei in code_and_explanations.items()},
|
||||
explanations_post={cei.optimization_id: cei.explanation for cei in jit_rewrite_response_items},
|
||||
experiment_metadata=data.experiment_metadata or None,
|
||||
optimizations_origin={
|
||||
cei.optimization_id: {
|
||||
"source": OptimizedCandidateSource.JIT_REWRITE,
|
||||
"parent": None,
|
||||
"model": jit_rewrite_models.get(cei.optimization_id, "unknown"),
|
||||
}
|
||||
for cei in jit_rewrite_response_items
|
||||
},
|
||||
)
|
||||
tg.create_task(
|
||||
safe_log_features(
|
||||
trace_id=data.trace_id,
|
||||
user_id=request.user,
|
||||
original_code=data.source_code,
|
||||
dependency_code=data.dependency_code,
|
||||
optimizations_raw={op_id: cei["code"] for op_id, cei in code_and_explanations.items()},
|
||||
optimizations_post={cei.optimization_id: cei.source_code for cei in jit_rewrite_response_items},
|
||||
explanations_raw={op_id: cei["explanation"] for op_id, cei in code_and_explanations.items()},
|
||||
explanations_post={cei.optimization_id: cei.explanation for cei in jit_rewrite_response_items},
|
||||
experiment_metadata=data.experiment_metadata or None,
|
||||
optimizations_origin={
|
||||
cei.optimization_id: {
|
||||
"source": OptimizedCandidateSource.JIT_REWRITE,
|
||||
"parent": None,
|
||||
"model": jit_rewrite_models.get(cei.optimization_id, "unknown"),
|
||||
}
|
||||
for cei in jit_rewrite_response_items
|
||||
},
|
||||
)
|
||||
)
|
||||
event, _created = event_task.result()
|
||||
)
|
||||
|
||||
event, _created = await get_or_create_optimization_event(
|
||||
event_type="no-pr",
|
||||
user_id=request.user,
|
||||
current_username=data.current_username,
|
||||
repo_owner=data.repo_owner,
|
||||
repo_name=data.repo_name,
|
||||
trace_id=data.trace_id,
|
||||
api_key_id=request.api_key_id,
|
||||
metadata={
|
||||
"codeflash_version": data.codeflash_version,
|
||||
"num_optimizations": len(jit_rewrite_response_items),
|
||||
"experiment_metadata": data.experiment_metadata,
|
||||
},
|
||||
llm_cost=llm_cost,
|
||||
)
|
||||
for item in jit_rewrite_response_items:
|
||||
item.optimization_event_id = str(event.id) if event else None
|
||||
response = OptimizeResponseSchema(optimizations=jit_rewrite_response_items)
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUs
|
|||
from pydantic import ValidationError
|
||||
|
||||
from aiservice.analytics.posthog import ph
|
||||
from aiservice.background import fire_and_forget
|
||||
from aiservice.common_utils import parse_python_version, validate_trace_id
|
||||
from aiservice.env_specific import debug_log_sensitive_data, debug_log_sensitive_data_from_callable
|
||||
from aiservice.llm import llm_client
|
||||
|
|
@ -297,47 +298,43 @@ async def optimize_python(
|
|||
properties={"num_optimizations": len(optimization_response_items)},
|
||||
)
|
||||
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
event_task = tg.create_task(
|
||||
get_or_create_optimization_event(
|
||||
event_type="no-pr",
|
||||
user_id=request.user,
|
||||
current_username=data.current_username,
|
||||
repo_owner=data.repo_owner,
|
||||
repo_name=data.repo_name,
|
||||
trace_id=data.trace_id,
|
||||
api_key_id=request.api_key_id,
|
||||
metadata={
|
||||
"codeflash_version": data.codeflash_version,
|
||||
"num_optimizations": len(optimization_response_items),
|
||||
"experiment_metadata": data.experiment_metadata,
|
||||
},
|
||||
llm_cost=llm_cost,
|
||||
)
|
||||
)
|
||||
tg.create_task(
|
||||
safe_log_features(
|
||||
trace_id=data.trace_id,
|
||||
user_id=request.user,
|
||||
original_code=data.source_code,
|
||||
dependency_code=data.dependency_code,
|
||||
optimizations_raw={op_id: cei["code"] for op_id, cei in code_and_explanations.items()},
|
||||
optimizations_post={cei.optimization_id: cei.source_code for cei in optimization_response_items},
|
||||
explanations_raw={op_id: cei["explanation"] for op_id, cei in code_and_explanations.items()},
|
||||
explanations_post={cei.optimization_id: cei.explanation for cei in optimization_response_items},
|
||||
experiment_metadata=data.experiment_metadata or None,
|
||||
optimizations_origin={
|
||||
cei.optimization_id: {
|
||||
"source": OptimizedCandidateSource.OPTIMIZE,
|
||||
"parent": None,
|
||||
"model": optimization_models.get(cei.optimization_id, "unknown"),
|
||||
}
|
||||
for cei in optimization_response_items
|
||||
},
|
||||
)
|
||||
fire_and_forget(
|
||||
safe_log_features(
|
||||
trace_id=data.trace_id,
|
||||
user_id=request.user,
|
||||
original_code=data.source_code,
|
||||
dependency_code=data.dependency_code,
|
||||
optimizations_raw={op_id: cei["code"] for op_id, cei in code_and_explanations.items()},
|
||||
optimizations_post={cei.optimization_id: cei.source_code for cei in optimization_response_items},
|
||||
explanations_raw={op_id: cei["explanation"] for op_id, cei in code_and_explanations.items()},
|
||||
explanations_post={cei.optimization_id: cei.explanation for cei in optimization_response_items},
|
||||
experiment_metadata=data.experiment_metadata or None,
|
||||
optimizations_origin={
|
||||
cei.optimization_id: {
|
||||
"source": OptimizedCandidateSource.OPTIMIZE,
|
||||
"parent": None,
|
||||
"model": optimization_models.get(cei.optimization_id, "unknown"),
|
||||
}
|
||||
for cei in optimization_response_items
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
event, _created = event_task.result()
|
||||
event, _created = await get_or_create_optimization_event(
|
||||
event_type="no-pr",
|
||||
user_id=request.user,
|
||||
current_username=data.current_username,
|
||||
repo_owner=data.repo_owner,
|
||||
repo_name=data.repo_name,
|
||||
trace_id=data.trace_id,
|
||||
api_key_id=request.api_key_id,
|
||||
metadata={
|
||||
"codeflash_version": data.codeflash_version,
|
||||
"num_optimizations": len(optimization_response_items),
|
||||
"experiment_metadata": data.experiment_metadata,
|
||||
},
|
||||
llm_cost=llm_cost,
|
||||
)
|
||||
|
||||
for item in optimization_response_items:
|
||||
item.optimization_event_id = str(event.id) if event else None
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ from ninja import NinjaAPI
|
|||
from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam
|
||||
|
||||
from aiservice.analytics.posthog import ph
|
||||
from aiservice.background import fire_and_forget
|
||||
from aiservice.common.markdown_utils import split_markdown_code
|
||||
from aiservice.common_utils import parse_python_version, validate_trace_id
|
||||
from aiservice.env_specific import debug_log_sensitive_data, debug_log_sensitive_data_from_callable
|
||||
|
|
@ -380,31 +381,30 @@ async def optimize(
|
|||
properties={"num_optimizations": len(optimization_response_items), "language": language},
|
||||
)
|
||||
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.create_task(update_optimization_cost(trace_id=data.trace_id, cost=llm_cost, user_id=request.user))
|
||||
if hasattr(request, "should_log_features") and request.should_log_features:
|
||||
tg.create_task(
|
||||
safe_log_features(
|
||||
trace_id=data.trace_id,
|
||||
user_id=request.user,
|
||||
original_code=data.source_code,
|
||||
dependency_code=data.dependency_code,
|
||||
line_profiler_results=data.line_profiler_results,
|
||||
optimizations_raw={op_id: cei["code"] for op_id, cei in code_and_explanations.items()},
|
||||
optimizations_post={cei.optimization_id: cei.source_code for cei in optimization_response_items},
|
||||
explanations_raw={op_id: cei["explanation"] for op_id, cei in code_and_explanations.items()},
|
||||
explanations_post={cei.optimization_id: cei.explanation for cei in optimization_response_items},
|
||||
experiment_metadata=data.experiment_metadata or None,
|
||||
optimizations_origin={
|
||||
cei.optimization_id: {
|
||||
"source": OptimizedCandidateSource.OPTIMIZE_LP,
|
||||
"parent": None,
|
||||
"model": optimization_models.get(cei.optimization_id, "unknown"),
|
||||
}
|
||||
for cei in optimization_response_items
|
||||
},
|
||||
)
|
||||
fire_and_forget(update_optimization_cost(trace_id=data.trace_id, cost=llm_cost, user_id=request.user))
|
||||
if hasattr(request, "should_log_features") and request.should_log_features:
|
||||
fire_and_forget(
|
||||
safe_log_features(
|
||||
trace_id=data.trace_id,
|
||||
user_id=request.user,
|
||||
original_code=data.source_code,
|
||||
dependency_code=data.dependency_code,
|
||||
line_profiler_results=data.line_profiler_results,
|
||||
optimizations_raw={op_id: cei["code"] for op_id, cei in code_and_explanations.items()},
|
||||
optimizations_post={cei.optimization_id: cei.source_code for cei in optimization_response_items},
|
||||
explanations_raw={op_id: cei["explanation"] for op_id, cei in code_and_explanations.items()},
|
||||
explanations_post={cei.optimization_id: cei.explanation for cei in optimization_response_items},
|
||||
experiment_metadata=data.experiment_metadata or None,
|
||||
optimizations_origin={
|
||||
cei.optimization_id: {
|
||||
"source": OptimizedCandidateSource.OPTIMIZE_LP,
|
||||
"parent": None,
|
||||
"model": optimization_models.get(cei.optimization_id, "unknown"),
|
||||
}
|
||||
for cei in optimization_response_items
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
response = OptimizeResponseSchema(optimizations=optimization_response_items)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,17 @@
|
|||
import ast
|
||||
from collections.abc import Generator
|
||||
|
||||
import pytest
|
||||
|
||||
from aiservice.background import _background_tasks
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _drain_background_tasks() -> Generator[None, None, None]:
|
||||
yield
|
||||
for task in list(_background_tasks):
|
||||
task.cancel()
|
||||
_background_tasks.clear()
|
||||
|
||||
|
||||
def normalize_code(code: str | None) -> str | None:
|
||||
|
|
|
|||
Loading…
Reference in a new issue