codeflash-internal/django/aiservice/optimizer/optimizer.py
Kevin Turcios b8827764cd formatting
2025-09-22 21:06:51 -07:00

228 lines
9.8 KiB
Python

from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
import libcst as cst
from ninja import NinjaAPI, Schema
from openai import OpenAIError
from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam
from pydantic import ValidationError
from aiservice.analytics.posthog import ph
from aiservice.common_utils import parse_python_version, validate_trace_id
from aiservice.env_specific import (
create_openai_client,
debug_log_sensitive_data,
debug_log_sensitive_data_from_callable,
)
from aiservice.models.aimodels import OPTIMIZE_MODEL, calculate_llm_cost
from authapp.user import get_user_by_id
from log_features.log_event import get_repository, log_optimization_event
from log_features.log_features import log_features
from optimizer.context_utils.optimizer_context import (
BaseOptimizerContext,
OptimizeErrorResponseSchema,
OptimizeResponseSchema,
)
if TYPE_CHECKING:
from openai.types.chat import (
ChatCompletionAssistantMessageParam,
ChatCompletionFunctionMessageParam,
ChatCompletionToolMessageParam,
)
from aiservice.models.aimodels import LLM
from optimizer.context_utils.optimizer_context import OptimizeResponseItemSchema
optimize_api = NinjaAPI(urls_namespace="optimize")
# Get the directory of the current file
current_dir = Path(__file__).parent
SYSTEM_PROMPT = (current_dir / "system_prompt.md").read_text()
USER_PROMPT = (current_dir / "user_prompt.md").read_text()
ASYNC_SYSTEM_PROMPT = (current_dir / "async_system_prompt.md").read_text()
ASYNC_USER_PROMPT = (current_dir / "async_user_prompt.md").read_text()
async def optimize_python_code(
user_id: str,
ctx: BaseOptimizerContext,
dependency_code: str | None = None,
n: int = 1,
optimize_model: LLM = OPTIMIZE_MODEL,
python_version: tuple[int, int, int] = (3, 12, 9),
) -> tuple[list[OptimizeResponseItemSchema], float | None]:
"""Optimize the given python code for performance using OpenAI's GPT-4 model.
Parameters
----------
- source_code (str): The python code to optimize.
- n (int): Number of optimization variants to generate. Default is 1.
- python_version (tuple[int, int, int]): The python version to use. Default is (3,12,9).
Returns: - List[Tuple[Union[str, None], Union[str, None]]]: A list of tuples where the first element is the
optimized code and the second is the explanation.
"""
print("/optimize: Optimizing python code.")
debug_log_sensitive_data(f"Optimizing python code for user {user_id}:\n{ctx.source_code}")
# TODO: Experiment with iterative approaches to optimization. Take the learnings from the testing phase into the
# next optimization iteration
# TODO: Experiment with iterative chain-of-thought generation. ask what is the
# function doing and then ask it to describe how to speed it up and then generate optimization
python_version_str = ".".join(str(x) for x in python_version)
system_prompt = ctx.get_system_prompt(python_version_str)
user_prompt = ctx.get_user_prompt(dependency_code, None)
system_message = ChatCompletionSystemMessageParam(role="system", content=system_prompt)
user_message = ChatCompletionUserMessageParam(role="user", content=user_prompt)
messages: list[
ChatCompletionSystemMessageParam
| ChatCompletionUserMessageParam
| ChatCompletionAssistantMessageParam
| ChatCompletionToolMessageParam
| ChatCompletionFunctionMessageParam
] = [system_message, user_message]
async with create_openai_client() as openai_client:
# TODO: Verify if the context window length is within the model capability
try:
output = await openai_client.with_options(max_retries=3).chat.completions.create(
model=optimize_model.name, messages=messages, n=n
)
except OpenAIError as e:
print("OpenAI Code Generation error ...")
print(e)
debug_log_sensitive_data(f"Failed to generate code for source:\n{ctx.source_code}")
return []
llm_cost = calculate_llm_cost(output, optimize_model)
debug_log_sensitive_data(f"OpenAIClient optimization response:\n{output.model_dump_json(indent=2)}")
if output.usage is not None:
ph(
user_id,
"aiservice-optimize-openai-usage",
properties={"model": optimize_model.name, "n": n, "usage": output.usage.json()},
)
results = [content for op in output.choices if (content := op.message.content)]
optimization_response_items: list[OptimizeResponseItemSchema] = []
for result in results:
ctx.extract_code_and_explanation_from_llm_res(result)
try:
res = ctx.parse_and_generate_candidate_schema()
if res is not None and ctx.is_valid_code():
optimization_response_items.append(res)
ctx.extracted_code_and_expl = None
ctx.parsed_code_and_explanation = None
except (ValueError, ValidationError, cst.ParserSyntaxError) as e:
debug_log_sensitive_data(f"error for source:\n{ctx.source_code}")
debug_log_sensitive_data(f"Traceback: {e}")
continue
return optimization_response_items, llm_cost
class OptimizeSchema(Schema):
source_code: str
dependency_code: str | None
trace_id: str
python_version: str
experiment_metadata: dict[str, str] | None = None
codeflash_version: str | None = None
current_username: str | None = None
repo_owner: str | None = None
repo_name: str | None = None
is_async: bool | None = False
n_candidates: int | None = 5
@optimize_api.post(
"/", response={200: OptimizeResponseSchema, 400: OptimizeErrorResponseSchema, 500: OptimizeErrorResponseSchema}
)
async def optimize(request, data: OptimizeSchema) -> tuple[int, OptimizeResponseSchema | OptimizeErrorResponseSchema]:
system_prompt = ASYNC_SYSTEM_PROMPT if data.is_async else SYSTEM_PROMPT
user_prompt = ASYNC_USER_PROMPT if data.is_async else USER_PROMPT
ctx: BaseOptimizerContext = BaseOptimizerContext.get_dynamic_context(system_prompt, user_prompt, data.source_code)
ph(request.user, "aiservice-optimize-called")
try:
python_version: tuple[int, int, int] = parse_python_version(data.python_version)
except:
return 400, OptimizeErrorResponseSchema(
error="Invalid Python version, it should look like 3.x.x. We only support Python 3.9 and above."
)
try:
ctx.validate_and_parse_source_code(data.source_code, feature_version=python_version[:2])
except SyntaxError:
return 400, OptimizeErrorResponseSchema(
error="Invalid source code. It is not valid Python code. Please check syntax of your code."
)
if not validate_trace_id(data.trace_id):
return 400, OptimizeErrorResponseSchema(error="Invalid trace ID. Please provide a valid UUIDv4.")
optimization_response_items, llm_cost = await optimize_python_code(
request.user, ctx, data.dependency_code, n=min(data.n_candidates or 5, 5), python_version=python_version
)
if len(optimization_response_items) == 0:
ph(request.user, "aiservice-optimize-no-optimizations-found")
debug_log_sensitive_data(f"No optimizations found for source:\n{data.source_code}")
return 500, OptimizeErrorResponseSchema(error="Error generating optimizations. Internal server error.")
ph(
request.user,
"aiservice-optimize-optimizations-found",
properties={"num_optimizations": len(optimization_response_items)},
)
try:
repository = await get_repository(data.repo_owner, data.repo_name)
except Exception:
repository = None
if data.current_username is None:
user = await get_user_by_id(request.user)
data.current_username = user.github_username
event = await log_optimization_event(
event_type="no-pr",
user_id=request.user,
current_username=data.current_username,
repository_id=repository.id if repository else None,
trace_id=data.trace_id,
api_key_id=request.api_key_id,
metadata={
"codeflash_version": data.codeflash_version,
"num_optimizations": len(optimization_response_items),
"experiment_metadata": data.experiment_metadata,
},
llm_cost=llm_cost,
)
if hasattr(request, "should_log_features") and request.should_log_features:
await log_features(
trace_id=data.trace_id,
user_id=request.user,
original_code=data.source_code,
dependency_code=data.dependency_code,
optimizations_raw={
op_id: cei.code for op_id, cei in ctx.code_and_explanation_before_post_processing.items()
},
optimizations_post={cei.optimization_id: cei.source_code for cei in optimization_response_items},
explanations_raw={
op_id: cei.explanation for op_id, cei in ctx.code_and_explanation_before_post_processing.items()
},
explanations_post={cei.optimization_id: cei.explanation for cei in optimization_response_items},
experiment_metadata=data.experiment_metadata if data.experiment_metadata else None,
)
for item in optimization_response_items:
item.optimization_event_id = str(event.id) if event else None
response = OptimizeResponseSchema(optimizations=optimization_response_items)
def log_response() -> None:
debug_log_sensitive_data(f"Response:\n{response.json()}")
for opt in response.optimizations:
debug_log_sensitive_data(f"Optimized source:\n{opt.source_code}")
debug_log_sensitive_data(f"Optimization explanation:\n{opt.explanation}")
debug_log_sensitive_data_from_callable(log_response)
ph(request.user, "aiservice-optimize-successful")
return 200, response