codeflash-internal/django/aiservice/optimizer/optimizer_javascript.py
2026-01-14 22:15:27 -08:00

403 lines
15 KiB
Python

"""
JavaScript/TypeScript code optimizer module.
This module handles optimization requests for JavaScript and TypeScript code.
"""
from __future__ import annotations
import asyncio
import logging
import re
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Any
import sentry_sdk
from ninja.errors import HttpError
from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam
from aiservice.analytics.posthog import ph
from aiservice.common_utils import validate_trace_id
from aiservice.env_specific import debug_log_sensitive_data, debug_log_sensitive_data_from_callable
from aiservice.llm import LLM, OPTIMIZE_MODEL, calculate_llm_cost, call_llm
from aiservice.validators.javascript_validator import validate_javascript_syntax, validate_typescript_syntax
from authapp.auth import AuthenticatedRequest
from authapp.user import get_user_by_id
from log_features.log_event import get_or_create_optimization_event
from log_features.log_features import log_features
from optimizer.config import MAX_OPTIMIZER_CALLS, get_model_distribution
from optimizer.context_utils.optimizer_context import (
OptimizeErrorResponseSchema,
OptimizeResponseItemSchema,
OptimizeResponseSchema,
)
from optimizer.models import OptimizedCandidateSource, OptimizeSchema
from optimizer.prompts import get_system_prompt, get_user_prompt
if TYPE_CHECKING:
from openai.types.chat import ChatCompletionMessageParam
# Pattern to extract code blocks from LLM response
JS_CODE_PATTERN = re.compile(
r"```(?:javascript|js|typescript|ts)(?::[^\n]*)?\s*\n(.*?)```",
re.MULTILINE | re.DOTALL,
)
def extract_code_and_explanation(content: str) -> tuple[str, str]:
"""
Extract code and explanation from LLM response.
Args:
content: The raw LLM response content
Returns:
Tuple of (code, explanation)
"""
match = JS_CODE_PATTERN.search(content)
if match:
code = match.group(1).strip()
# Explanation is everything before the code block
explanation_end = match.start()
explanation = content[:explanation_end].strip()
return code, explanation
# No code block found, return empty code
return "", content
async def optimize_javascript_code_single(
user_id: str,
source_code: str,
trace_id: str,
dependency_code: str | None = None,
optimize_model: LLM = OPTIMIZE_MODEL,
language_version: str = "ES2022",
is_async: bool = False,
call_sequence: int | None = None,
) -> tuple[OptimizeResponseItemSchema | None, float | None, str]:
"""
Optimize JavaScript/TypeScript code using LLMs.
Args:
user_id: The user ID making the request
source_code: The source code to optimize
trace_id: The trace ID for logging
dependency_code: Optional dependency code for context
optimize_model: The LLM model to use
language_version: Target JS/TS version (e.g., "ES2022")
is_async: Whether the code is async
call_sequence: Call sequence number for tracking
Returns:
Tuple of (optimization_result, llm_cost, model_name)
"""
logging.info("/optimize: Optimizing JavaScript code.")
debug_log_sensitive_data(f"Optimizing JavaScript code for user {user_id}:\n{source_code}")
# Get language-appropriate prompts
language = "javascript" # TypeScript uses same prompts
system_prompt = get_system_prompt(language, is_async)
user_prompt = get_user_prompt(language, is_async)
# Format prompts
system_prompt = system_prompt.format(language_version=language_version)
user_prompt = user_prompt.format(source_code=f"```javascript\n{source_code}\n```")
if dependency_code:
user_prompt = f"Dependencies (read-only):\n```javascript\n{dependency_code}\n```\n\n{user_prompt}"
obs_context: dict[str, Any] | None = {"call_sequence": call_sequence} if call_sequence is not None else None
messages: list[ChatCompletionMessageParam] = [
ChatCompletionSystemMessageParam(role="system", content=system_prompt),
ChatCompletionUserMessageParam(role="user", content=user_prompt),
]
try:
output = await call_llm(
llm=optimize_model,
messages=messages,
call_type="optimization",
trace_id=trace_id,
user_id=user_id,
python_version=language_version, # Reusing python_version field for language version
context=obs_context,
)
except Exception as e:
logging.exception("LLM Code Generation error in JavaScript optimizer")
sentry_sdk.capture_exception(e)
debug_log_sensitive_data(f"Failed to generate code for source:\n{source_code}")
return None, None, optimize_model.name
llm_cost = calculate_llm_cost(output.raw_response, optimize_model)
debug_log_sensitive_data(f"LLM optimization response:\n{output.raw_response.model_dump_json(indent=2)}")
if output.raw_response.usage is not None:
ph(
user_id,
"aiservice-optimize-openai-usage",
properties={"model": optimize_model.name, "usage": output.raw_response.usage.json(), "language": language},
)
# Extract code and explanation from response
optimized_code, explanation = extract_code_and_explanation(output.content)
if not optimized_code:
sentry_sdk.capture_message("No code block found in JavaScript optimization response")
debug_log_sensitive_data(f"No code found in response for source:\n{source_code}")
return None, llm_cost, optimize_model.name
# Validate the generated code
is_valid, error = validate_javascript_syntax(optimized_code)
if not is_valid:
sentry_sdk.capture_message(f"Invalid JavaScript generated: {error}")
debug_log_sensitive_data(f"Invalid code generated:\n{optimized_code}\nError: {error}")
return None, llm_cost, optimize_model.name
# Check that the code is actually different from the original
if _normalize_code(optimized_code) == _normalize_code(source_code):
debug_log_sensitive_data("Generated code identical to original")
return None, llm_cost, optimize_model.name
optimization_id = str(uuid.uuid4())
result = OptimizeResponseItemSchema(
source_code=optimized_code,
explanation=explanation,
optimization_id=optimization_id,
)
return result, llm_cost, optimize_model.name
def _normalize_code(code: str) -> str:
"""
Normalize code for comparison (remove comments and whitespace).
"""
# Remove single-line comments
code = re.sub(r"//.*$", "", code, flags=re.MULTILINE)
# Remove multi-line comments
code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL)
# Normalize whitespace
code = " ".join(code.split())
return code
async def optimize_javascript_code(
user_id: str,
source_code: str,
trace_id: str,
dependency_code: str | None = None,
language_version: str = "ES2022",
is_async: bool = False,
n_candidates: int = 0,
) -> tuple[list[OptimizeResponseItemSchema], float, dict[str, str]]:
"""
Run parallel optimizations with multiple models.
Args:
user_id: The user ID making the request
source_code: The source code to optimize
trace_id: The trace ID for logging
dependency_code: Optional dependency code for context
language_version: Target JS/TS version
is_async: Whether the code is async
n_candidates: Number of optimization candidates to generate
Returns:
Tuple of (optimization_results, total_cost, optimization_models)
"""
tasks: list[asyncio.Task[tuple[OptimizeResponseItemSchema | None, float | None, str]]] = []
call_sequence = 1
if n_candidates == 0:
return [], 0.0, {}
async with asyncio.TaskGroup() as tg:
for model, num_calls in get_model_distribution(n_candidates, MAX_OPTIMIZER_CALLS):
for _ in range(num_calls):
task = tg.create_task(
optimize_javascript_code_single(
user_id=user_id,
source_code=source_code,
trace_id=trace_id,
dependency_code=dependency_code,
optimize_model=model,
language_version=language_version,
is_async=is_async,
call_sequence=call_sequence,
)
)
tasks.append(task)
call_sequence += 1
# Collect results
optimization_results: list[OptimizeResponseItemSchema] = []
total_cost = 0.0
optimization_models: dict[str, str] = {}
seen_code: set[str] = set()
for task in tasks:
result, cost, model_name = task.result()
if cost:
total_cost += cost
if result is not None:
# Deduplicate by normalized code
normalized = _normalize_code(result.source_code)
if normalized not in seen_code:
seen_code.add(normalized)
optimization_results.append(result)
optimization_models[result.optimization_id] = model_name
return optimization_results, total_cost, optimization_models
def validate_javascript_request_data(data: OptimizeSchema) -> None:
"""
Validate JavaScript/TypeScript optimization request data.
Args:
data: The request data
Raises:
HttpError: If validation fails
"""
if not data.source_code:
raise HttpError(400, "Source code cannot be empty.")
if not validate_trace_id(data.trace_id):
raise HttpError(400, "Invalid trace ID. Please provide a valid UUIDv4.")
# Validate syntax based on language
if data.language == "typescript":
is_valid, error = validate_typescript_syntax(data.source_code)
lang_name = "TypeScript"
else:
is_valid, error = validate_javascript_syntax(data.source_code)
lang_name = "JavaScript"
if not is_valid:
raise HttpError(400, f"Invalid source code. It is not valid {lang_name}: {error}")
async def optimize_javascript(
request: AuthenticatedRequest, data: OptimizeSchema
) -> tuple[int, OptimizeResponseSchema | OptimizeErrorResponseSchema]:
"""
Main endpoint handler for JavaScript/TypeScript optimization.
Args:
request: The authenticated request
data: The optimization request data
Returns:
Tuple of (status_code, response)
"""
language = data.language
ph(request.user, "aiservice-optimize-called", properties={"language": language})
try:
validate_javascript_request_data(data)
except HttpError as e:
e.add_note(f"JavaScript optimizer request validation error: {e.status_code} {e.message}")
logging.error(f"JavaScript optimizer request validation error: {e.message}. trace_id={data.trace_id}")
sentry_sdk.capture_exception(e)
return e.status_code, OptimizeErrorResponseSchema(error=e.message)
try:
async with asyncio.TaskGroup() as tg:
optimize_task = tg.create_task(
optimize_javascript_code(
user_id=request.user,
source_code=data.source_code,
trace_id=data.trace_id,
dependency_code=data.dependency_code,
language_version=data.language_version or "ES2022",
is_async=data.is_async or False,
n_candidates=data.n_candidates,
)
)
user_task = None
if data.current_username is None:
user_task = tg.create_task(get_user_by_id(request.user))
except Exception as e:
logging.exception(f"Error during JavaScript optimization task. trace_id={data.trace_id}")
sentry_sdk.capture_exception(e)
return 500, OptimizeErrorResponseSchema(error="Error generating optimizations. Internal server error.")
optimization_response_items, llm_cost, optimization_models = optimize_task.result()
if user_task:
user = await user_task
if user and user.github_username:
data.current_username = str(user.github_username)
if len(optimization_response_items) == 0:
ph(request.user, "aiservice-optimize-no-optimizations-found", properties={"language": language})
debug_log_sensitive_data(f"No JavaScript optimizations found for source:\n{data.source_code}")
logging.error(f"Could not generate any JavaScript optimizations. trace_id={data.trace_id}")
return 500, OptimizeErrorResponseSchema(error="Could not generate any optimizations. Please try again.")
ph(
request.user,
"aiservice-optimize-optimizations-found",
properties={"num_optimizations": len(optimization_response_items), "language": language},
)
async with asyncio.TaskGroup() as tg:
event_task = tg.create_task(
get_or_create_optimization_event(
event_type="no-pr",
user_id=request.user,
current_username=data.current_username,
repo_owner=data.repo_owner,
repo_name=data.repo_name,
trace_id=data.trace_id,
api_key_id=request.api_key_id,
metadata={
"codeflash_version": data.codeflash_version,
"num_optimizations": len(optimization_response_items),
"experiment_metadata": data.experiment_metadata,
"language": language,
},
llm_cost=llm_cost,
)
)
tg.create_task(
log_features(
trace_id=data.trace_id,
user_id=request.user,
original_code=data.source_code,
dependency_code=data.dependency_code,
optimizations_post={opt.optimization_id: opt.source_code for opt in optimization_response_items},
explanations_post={opt.optimization_id: opt.explanation for opt in optimization_response_items},
experiment_metadata=data.experiment_metadata if data.experiment_metadata else None,
optimizations_origin={
opt.optimization_id: {
"source": OptimizedCandidateSource.OPTIMIZE,
"parent": None,
"model": optimization_models.get(opt.optimization_id, "unknown"),
"language": language,
}
for opt in optimization_response_items
},
)
)
event, _created = event_task.result()
for item in optimization_response_items:
item.optimization_event_id = str(event.id) if event else None
response = OptimizeResponseSchema(optimizations=optimization_response_items)
def log_response() -> None:
debug_log_sensitive_data(f"JavaScript Response:\n{response.model_dump_json()}")
for opt in response.optimizations:
debug_log_sensitive_data(f"Optimized JavaScript source:\n{opt.source_code}")
debug_log_sensitive_data(f"JavaScript optimization explanation:\n{opt.explanation}")
debug_log_sensitive_data_from_callable(log_response)
ph(request.user, "aiservice-optimize-successful", properties={"language": language})
return 200, response