Fix: Validate source_code with dependency_code context

CRITICAL BUG FIX: AI service validates TypeScript/JavaScript source code
without considering dependency_code (read-only context), causing false
'undefined variable' errors for constants and types.

Root Cause:
- optimizer.py:375-380 validates data.source_code alone
- Constants, types, and globals in data.dependency_code are invisible
- Validation fails with 'Undefined variable(s)' error

Impact: 120+ optimization failures across openclaw project
- Trace IDs: 037da636-6128-434a-925c-8b7ba329c2d8 and many others
- Error: Undefined variable(s): BLOCKED_TAR_ENTRY_TYPES, DEFAULT_MAX_*

Fix:
- Validate source_code + dependency_code together
- Both single-file and multi-file code paths fixed
- Ensures all context (constants/types/globals) visible to validator

Testing:
- Added test_validation_with_dependency_context() test
- Verifies source+dependency validates correctly
- All existing tests still pass
This commit is contained in:
Codeflash Bot 2026-04-02 09:53:56 +00:00
parent 0abc6bf1e3
commit 33a01d0dde
2 changed files with 705 additions and 0 deletions

View file

@ -0,0 +1,511 @@
"""JavaScript/TypeScript code optimizer module.
This module handles optimization requests for JavaScript and TypeScript code.
"""
from __future__ import annotations
import asyncio
import logging
import re
import uuid
from typing import TYPE_CHECKING, Any
import sentry_sdk
from ninja.errors import HttpError
from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam
from aiservice.analytics.posthog import ph
from aiservice.common.markdown_utils import split_markdown_code
from aiservice.common_utils import validate_trace_id
from aiservice.env_specific import debug_log_sensitive_data, debug_log_sensitive_data_from_callable
from aiservice.llm import llm_client
from aiservice.llm_models import LLM, OPTIMIZE_MODEL
from aiservice.validators.javascript_validator import validate_javascript_syntax, validate_typescript_syntax
from authapp.auth import AuthenticatedRequest
from authapp.user import get_user_by_id
from core.languages.js_ts.context_helpers import is_multi_context_js, is_multi_context_ts
from core.languages.js_ts.prompts.optimizer import get_system_prompt, get_user_prompt
from core.log_features.log_event import get_or_create_optimization_event
from core.log_features.log_features import log_features
from core.shared.context_helpers import extract_code_and_explanation, group_code, normalize_c_style_code
from core.shared.optimizer_config import MAX_OPTIMIZER_CALLS, get_model_distribution
from core.shared.optimizer_models import OptimizedCandidateSource, OptimizeSchema
from core.shared.optimizer_schemas import (
OptimizeErrorResponseSchema,
OptimizeResponseItemSchema,
OptimizeResponseSchema,
)
if TYPE_CHECKING:
from openai.types.chat import ChatCompletionMessageParam
# Pattern to extract code blocks from LLM response (single file, no file path)
JS_CODE_PATTERN = re.compile(r"```(?:javascript|js|typescript|ts)\s*\n(.*?)```", re.MULTILINE | re.DOTALL)
# Pattern to extract code blocks with file paths (multi-file context)
JS_CODE_WITH_PATH_PATTERN = re.compile(
r"```(?:javascript|js|typescript|ts):([^\n]+)\n(.*?)```", re.MULTILINE | re.DOTALL
)
async def optimize_javascript_code_single(
user_id: str,
source_code: str,
trace_id: str,
dependency_code: str | None = None,
optimize_model: LLM = OPTIMIZE_MODEL,
language_version: str = "ES2022",
is_async: bool = False,
call_sequence: int | None = None,
language: str = "javascript",
) -> tuple[OptimizeResponseItemSchema | None, float | None, str]:
"""Optimize JavaScript/TypeScript code using LLMs.
Args:
user_id: The user ID making the request
source_code: The source code to optimize (can be multi-file markdown format)
trace_id: The trace ID for logging
dependency_code: Optional dependency code for context
optimize_model: The LLM model to use
language_version: Target JS/TS version (e.g., "ES2022")
is_async: Whether the code is async
call_sequence: Call sequence number for tracking
language: The programming language ("javascript" or "typescript")
Returns:
Tuple of (optimization_result, llm_cost, model_name)
"""
lang_name = "TypeScript" if language == "typescript" else "JavaScript"
code_block_tag = "typescript" if language == "typescript" else "javascript"
logging.info("/optimize: Optimizing %s code.", lang_name)
debug_log_sensitive_data(f"Optimizing {lang_name} code for user {user_id}:\n{source_code}")
# Check if source code is multi-file format
is_multi_file = is_multi_context_ts(source_code) if language == "typescript" else is_multi_context_js(source_code)
original_file_to_code: dict[str, str] = {}
if is_multi_file:
original_file_to_code = split_markdown_code(source_code, language)
logging.info(
"Multi-file context detected with %d files: %s",
len(original_file_to_code),
list(original_file_to_code.keys()),
)
# Get language-appropriate prompts (TypeScript uses same prompts as JavaScript)
system_prompt = get_system_prompt(is_async)
user_prompt = get_user_prompt(is_async)
# Format prompts with proper language tag
system_prompt = system_prompt.format(language_version=language_version)
if is_multi_file:
# For multi-file, identify the first file as the target and others as helper context
# The first file in the markdown is the target function's file
file_paths = list(original_file_to_code.keys())
target_file = file_paths[0] if file_paths else "main file"
helper_files = file_paths[1:] if len(file_paths) > 1 else []
# Build the multi-file instructions
helper_notice = ""
if helper_files:
helper_list = ", ".join(f"`{f}`" for f in helper_files)
helper_notice = f"""
HELPER FILES: {helper_list}
These files contain helper functions that the target function uses. You may optimize these as well if needed.
"""
multi_file_instructions = f"""
The code is provided in a multi-file format. Each file is wrapped in a code block with its path.
TARGET FILE: `{target_file}`
{helper_notice}
Output the optimized code for each file that you modify. Wrap each file's code in:
```{code_block_tag}:<file_path>
<optimized code>
```
You MUST output the target file. You may also output helper files if you optimize them.
"""
system_prompt = system_prompt + "\n" + multi_file_instructions
user_prompt = user_prompt.format(source_code=source_code)
else:
user_prompt = user_prompt.format(source_code=f"```{code_block_tag}\n{source_code}\n```")
if dependency_code:
user_prompt = f"Dependencies (read-only):\n```{code_block_tag}\n{dependency_code}\n```\n\n{user_prompt}"
obs_context: dict[str, Any] | None = {"call_sequence": call_sequence} if call_sequence is not None else None
messages: list[ChatCompletionMessageParam] = [
ChatCompletionSystemMessageParam(role="system", content=system_prompt),
ChatCompletionUserMessageParam(role="user", content=user_prompt),
]
try:
output = await llm_client.call(
llm=optimize_model,
messages=messages,
call_type="optimization",
trace_id=trace_id,
user_id=user_id,
python_version=language_version, # Reusing python_version field for language version
context=obs_context,
)
except Exception:
debug_log_sensitive_data(f"Failed to generate code for source:\n{source_code}")
return None, None, optimize_model.name
llm_cost = output.cost
debug_log_sensitive_data(f"LLM optimization response:\n{output.raw_response.model_dump_json(indent=2)}")
if output.raw_response.usage is not None:
ph(
user_id,
"aiservice-optimize-openai-usage",
properties={"model": optimize_model.name, "usage": output.raw_response.usage.json(), "language": language},
)
# Extract code and explanation from response
extracted_code, explanation = extract_code_and_explanation(
output.content, JS_CODE_PATTERN, JS_CODE_WITH_PATH_PATTERN, is_multi_file
)
if not extracted_code:
sentry_sdk.capture_message("No code block found in JavaScript optimization response")
debug_log_sensitive_data(f"No code found in response for source:\n{source_code}")
return None, llm_cost, optimize_model.name
optimization_id = str(uuid.uuid4())
if is_multi_file and isinstance(extracted_code, dict):
# Handle multi-file response
# LLM can optimize both target and helper files
merged_file_to_code: dict[str, str] = {}
has_changes = False
for file_path, original_code in original_file_to_code.items():
if file_path in extracted_code:
new_code = extracted_code[file_path]
# Validate the new code
if language == "typescript":
is_valid, error = validate_typescript_syntax(new_code)
else:
is_valid, error = validate_javascript_syntax(new_code)
if not is_valid:
sentry_sdk.capture_message(f"Invalid {lang_name} generated for {file_path}: {error}")
debug_log_sensitive_data(f"Invalid code generated for {file_path}:\n{new_code}\nError: {error}")
# Keep original code for this file
merged_file_to_code[file_path] = original_code
else:
merged_file_to_code[file_path] = new_code
if normalize_c_style_code(new_code) != normalize_c_style_code(original_code):
has_changes = True
else:
# File not in response, keep original
merged_file_to_code[file_path] = original_code
if not has_changes:
debug_log_sensitive_data("Generated code identical to original (multi-file)")
return None, llm_cost, optimize_model.name
# Format as multi-file markdown
wrapped_code = group_code(merged_file_to_code, language=code_block_tag)
result = OptimizeResponseItemSchema(
source_code=wrapped_code, explanation=explanation, optimization_id=optimization_id
)
return result, llm_cost, optimize_model.name
# Single file handling
optimized_code = extracted_code if isinstance(extracted_code, str) else ""
if not optimized_code:
return None, llm_cost, optimize_model.name
# Validate the generated code using the appropriate validator
if language == "typescript":
is_valid, error = validate_typescript_syntax(optimized_code)
else:
is_valid, error = validate_javascript_syntax(optimized_code)
if not is_valid:
sentry_sdk.capture_message(f"Invalid {lang_name} generated: {error}")
debug_log_sensitive_data(f"Invalid code generated:\n{optimized_code}\nError: {error}")
return None, llm_cost, optimize_model.name
# Check that the code is actually different from the original
if normalize_c_style_code(optimized_code) == normalize_c_style_code(source_code):
debug_log_sensitive_data("Generated code identical to original")
return None, llm_cost, optimize_model.name
# Wrap code in markdown format for CLI parsing
# Format: ```typescript:filename.ts\ncode\n``` or ```javascript:filename.js\ncode\n```
wrapped_code = (
f"```{code_block_tag}\n{optimized_code}\n```"
if not optimized_code.endswith("\n")
else f"```{code_block_tag}\n{optimized_code}```"
)
result = OptimizeResponseItemSchema(
source_code=wrapped_code, explanation=explanation, optimization_id=optimization_id
)
return result, llm_cost, optimize_model.name
async def optimize_javascript_code(
user_id: str,
source_code: str,
trace_id: str,
dependency_code: str | None = None,
language_version: str = "ES2022",
is_async: bool = False,
n_candidates: int = 0,
language: str = "javascript",
) -> tuple[list[OptimizeResponseItemSchema], float, dict[str, str]]:
"""Run parallel optimizations with multiple models.
Args:
user_id: The user ID making the request
source_code: The source code to optimize
trace_id: The trace ID for logging
dependency_code: Optional dependency code for context
language_version: Target JS/TS version
is_async: Whether the code is async
n_candidates: Number of optimization candidates to generate
language: The programming language ("javascript" or "typescript")
Returns:
Tuple of (optimization_results, total_cost, optimization_models)
"""
tasks: list[asyncio.Task[tuple[OptimizeResponseItemSchema | None, float | None, str]]] = []
call_sequence = 1
if n_candidates == 0:
return [], 0.0, {}
async with asyncio.TaskGroup() as tg:
for model, num_calls in get_model_distribution(n_candidates, MAX_OPTIMIZER_CALLS):
for _ in range(num_calls):
task = tg.create_task(
optimize_javascript_code_single(
user_id=user_id,
source_code=source_code,
trace_id=trace_id,
dependency_code=dependency_code,
optimize_model=model,
language_version=language_version,
is_async=is_async,
call_sequence=call_sequence,
language=language,
)
)
tasks.append(task)
call_sequence += 1
# Collect results
optimization_results: list[OptimizeResponseItemSchema] = []
total_cost = 0.0
optimization_models: dict[str, str] = {}
seen_code: set[str] = set()
for task in tasks:
result, cost, model_name = task.result()
if cost:
total_cost += cost
if result is not None:
# Deduplicate by normalized code
normalized = normalize_c_style_code(result.source_code)
if normalized not in seen_code:
seen_code.add(normalized)
optimization_results.append(result)
optimization_models[result.optimization_id] = model_name
return optimization_results, total_cost, optimization_models
def validate_javascript_request_data(data: OptimizeSchema) -> None:
"""Validate JavaScript/TypeScript optimization request data.
Args:
data: The request data
Raises:
HttpError: If validation fails
"""
if not data.source_code:
raise HttpError(400, "Source code cannot be empty.")
if not validate_trace_id(data.trace_id):
raise HttpError(400, "Invalid trace ID. Please provide a valid UUIDv4.")
lang_name = "TypeScript" if data.language == "typescript" else "JavaScript"
# Check if multi-file context
is_multi_file = (
is_multi_context_ts(data.source_code)
if data.language == "typescript"
else is_multi_context_js(data.source_code)
)
if is_multi_file:
# Validate each file in the multi-file context
file_to_code = split_markdown_code(data.source_code, data.language)
if not file_to_code:
raise HttpError(400, f"Invalid source code format. Expected multi-file {lang_name} markdown format.")
# For multi-file, validate each file with dependency_code as context
dependency_context = data.dependency_code or ""
for file_path, code in file_to_code.items():
# Combine dependency_code with source for validation (constants, types, etc. must be visible)
code_with_context = (dependency_context + "\n\n" + code) if dependency_context else code
if data.language == "typescript":
is_valid, error = validate_typescript_syntax(code_with_context)
else:
is_valid, error = validate_javascript_syntax(code_with_context)
if not is_valid:
raise HttpError(400, f"Invalid source code in {file_path}. It is not valid {lang_name}: {error}")
else:
# Single file validation - combine source_code with dependency_code for context
# This ensures that constants, types, and globals in dependency_code are visible during validation
dependency_context = data.dependency_code or ""
code_to_validate = (dependency_context + "\n\n" + data.source_code) if dependency_context else data.source_code
if data.language == "typescript":
is_valid, error = validate_typescript_syntax(code_to_validate)
else:
is_valid, error = validate_javascript_syntax(code_to_validate)
if not is_valid:
raise HttpError(400, f"Invalid source code. It is not valid {lang_name}: {error}")
async def optimize_javascript(
request: AuthenticatedRequest, data: OptimizeSchema
) -> tuple[int, OptimizeResponseSchema | OptimizeErrorResponseSchema]:
"""Main endpoint handler for JavaScript/TypeScript optimization.
Args:
request: The authenticated request
data: The optimization request data
Returns:
Tuple of (status_code, response)
"""
language = data.language
ph(request.user, "aiservice-optimize-called", properties={"language": language})
try:
validate_javascript_request_data(data)
except HttpError as e:
e.add_note(f"JavaScript optimizer request validation error: {e.status_code} {e.message}")
logging.exception("JavaScript optimizer request validation error: %s. trace_id=%s", e.message, data.trace_id)
sentry_sdk.capture_exception(e)
return e.status_code, OptimizeErrorResponseSchema(error=e.message)
try:
async with asyncio.TaskGroup() as tg:
optimize_task = tg.create_task(
optimize_javascript_code(
user_id=request.user,
source_code=data.source_code,
trace_id=data.trace_id,
dependency_code=data.dependency_code,
language_version=data.language_version or "ES2022",
is_async=data.is_async or False,
n_candidates=data.n_candidates,
language=language,
)
)
user_task = None
if data.current_username is None:
user_task = tg.create_task(get_user_by_id(request.user))
except Exception as e:
logging.exception("Error during JavaScript optimization task. trace_id=%s", data.trace_id)
sentry_sdk.capture_exception(e)
return 500, OptimizeErrorResponseSchema(error="Error generating optimizations. Internal server error.")
optimization_response_items, llm_cost, optimization_models = optimize_task.result()
if user_task:
user = await user_task
if user and user.github_username:
data.current_username = str(user.github_username)
if len(optimization_response_items) == 0:
ph(request.user, "aiservice-optimize-no-optimizations-found", properties={"language": language})
debug_log_sensitive_data(f"No JavaScript optimizations found for source:\n{data.source_code}")
logging.error("Could not generate any JavaScript optimizations. trace_id=%s", data.trace_id)
return 500, OptimizeErrorResponseSchema(error="Could not generate any optimizations. Please try again.")
ph(
request.user,
"aiservice-optimize-optimizations-found",
properties={"num_optimizations": len(optimization_response_items), "language": language},
)
async with asyncio.TaskGroup() as tg:
event_task = tg.create_task(
get_or_create_optimization_event(
event_type="no-pr",
user_id=request.user,
current_username=data.current_username,
repo_owner=data.repo_owner,
repo_name=data.repo_name,
trace_id=data.trace_id,
api_key_id=request.api_key_id,
metadata={
"codeflash_version": data.codeflash_version,
"num_optimizations": len(optimization_response_items),
"experiment_metadata": data.experiment_metadata,
"language": language,
},
llm_cost=llm_cost,
)
)
tg.create_task(
log_features(
trace_id=data.trace_id,
user_id=request.user,
original_code=data.source_code,
dependency_code=data.dependency_code,
optimizations_post={opt.optimization_id: opt.source_code for opt in optimization_response_items},
explanations_post={opt.optimization_id: opt.explanation for opt in optimization_response_items},
experiment_metadata=data.experiment_metadata or None,
optimizations_origin={
opt.optimization_id: {
"source": OptimizedCandidateSource.OPTIMIZE,
"parent": None,
"model": optimization_models.get(opt.optimization_id, "unknown"),
"language": language,
}
for opt in optimization_response_items
},
)
)
event, _created = event_task.result()
for item in optimization_response_items:
item.optimization_event_id = str(event.id) if event else None
response = OptimizeResponseSchema(optimizations=optimization_response_items)
def log_response() -> None:
debug_log_sensitive_data(f"JavaScript Response:\n{response.model_dump_json()}")
for opt in response.optimizations:
debug_log_sensitive_data(f"Optimized JavaScript source:\n{opt.source_code}")
debug_log_sensitive_data(f"JavaScript optimization explanation:\n{opt.explanation}")
debug_log_sensitive_data_from_callable(log_response)
ph(request.user, "aiservice-optimize-successful", properties={"language": language})
return 200, response

View file

@ -0,0 +1,194 @@
"""Tests for JavaScript/TypeScript validator module."""
import pytest
from aiservice.validators.javascript_validator import (
validate_javascript_syntax,
validate_typescript_syntax,
)
class TestJavaScriptValidator:
"""Tests for tree-sitter based JavaScript validation."""
def test_valid_simple_function(self) -> None:
"""Test a simple valid function."""
code = """
function hello(name) {
return 'Hello, ' + name;
}
"""
is_valid, error = validate_javascript_syntax(code)
assert is_valid
assert error is None
def test_invalid_syntax(self) -> None:
"""Test invalid JavaScript syntax is detected."""
code = "function { }" # Missing function name
is_valid, error = validate_javascript_syntax(code)
assert not is_valid
assert error is not None
def test_valid_arrow_function(self) -> None:
"""Test valid arrow function."""
code = "const add = (a, b) => a + b;"
is_valid, error = validate_javascript_syntax(code)
assert is_valid
assert error is None
class TestTypeScriptValidator:
"""Tests for tree-sitter based TypeScript validation."""
def test_valid_typed_function(self) -> None:
"""Test a simple valid typed function."""
code = """
function add(a: number, b: number): number {
return a + b;
}
"""
is_valid, error = validate_typescript_syntax(code)
assert is_valid
assert error is None
def test_invalid_syntax(self) -> None:
"""Test invalid TypeScript syntax is detected."""
code = "function test() { return; }} }" # Extra closing braces
is_valid, error = validate_typescript_syntax(code)
assert not is_valid
assert error is not None
def test_undefined_variable_reference(self) -> None:
"""Test that code referencing undefined variables fails validation.
The validator now performs basic semantic validation to catch undefined variable references.
"""
code = """
const options = {
value: UNDEFINED_CONSTANT, // This constant is never defined!
};
"""
is_valid, error = validate_typescript_syntax(code)
assert not is_valid, "Code with undefined variables should fail validation"
assert error is not None
assert "UNDEFINED_CONSTANT" in error or "Undefined" in error
@pytest.mark.skip(reason="Type checking requires full TypeScript compiler - not implemented in basic validator")
def test_nonexistent_type_reference(self) -> None:
"""Test that code referencing non-existent types should fail validation.
NOTE: Type checking (vs variable checking) requires running the full TypeScript compiler,
which is not implemented in the basic tree-sitter validator. This would require
executing `tsc --noEmit` which is a heavier operation.
For now, we focus on catching undefined variable references which are more common
and can cause immediate runtime errors.
"""
code = """
function processItem(item: NonExistentType): string {
return item.toString();
}
"""
is_valid, error = validate_typescript_syntax(code)
# This currently passes because we don't do full type checking
# Full type checking would require running tsc
assert not is_valid, "Code with non-existent types should fail validation"
assert error is not None
assert "NonExistentType" in error or "not found" in error.lower()
def test_real_world_buggy_optimization(self) -> None:
"""Test the real buggy optimization from trace c1b6ae01-0395-4c96-867c-cb287f8a7156.
This is the actual buggy code that was generated and passed validation but failed at runtime.
It references MARKDOWN_STYLE_MARKERS without defining it and uses RenderLinkInput type that doesn't exist.
This test demonstrates that semantic validation now catches these errors.
"""
code = """
import type { MarkdownTableMode } from "../config/types.base.js";
import { markdownToIRWithMeta } from "./ir.js";
import { renderMarkdownWithMarkers } from "./render.js";
const identityEscape = (text: string) => text;
function buildLink(link: RenderLinkInput, text: string) {
const href = (link.href ?? "").trim();
if (!href) {
return null;
}
const start = link.start;
const end = link.end;
if (end - start === 0) {
return null;
}
return { start, end, open: "[", close: `](${href})` };
}
const RENDER_OPTIONS = {
styleMarkers: MARKDOWN_STYLE_MARKERS, // UNDEFINED!
escapeText: identityEscape,
buildLink,
};
export function convertMarkdownTables(markdown: string, mode: MarkdownTableMode): string {
if (!markdown || mode === "off") {
return markdown;
}
const effectiveMode = mode === "block" ? "code" : mode;
const { ir, hasTables } = markdownToIRWithMeta(markdown, {
linkify: false,
autolink: false,
headingStyle: "none",
blockquotePrefix: "",
tableMode: effectiveMode,
});
if (!hasTables) {
return markdown;
}
return renderMarkdownWithMarkers(ir, RENDER_OPTIONS);
}
"""
is_valid, error = validate_typescript_syntax(code)
# With semantic validation, this now correctly fails
assert not is_valid, "Code with undefined variables should fail validation"
assert error is not None
assert "MARKDOWN_STYLE_MARKERS" in error, f"Error should mention undefined variable, got: {error}"
def test_validation_with_dependency_context(self) -> None:
"""Test that code should be validated with dependency_code context.
This reproduces the bug where the optimizer validates source_code alone
without considering dependency_code (read-only context), causing false
"undefined variable" errors.
Trace ID: 037da636-6128-434a-925c-8b7ba329c2d8 and 120+ others
"""
# This is the function that the CLI extracts and wants to optimize
source_code = """
export function createTarEntryPreflightChecker(params: {
rootDir: string;
}): (entry: TarEntryInfo) => void {
return (entry: TarEntryInfo) => {
if (BLOCKED_TAR_ENTRY_TYPES.has(entry.type)) {
throw new Error(`tar entry is a link`);
}
};
}
"""
# This is the read-only context (constants, types, etc.) sent separately
dependency_code = """
const BLOCKED_TAR_ENTRY_TYPES = new Set(["SymbolicLink", "Link"]);
type TarEntryInfo = { path: string; type: string; size: number };
"""
# BUG: Validating source_code alone fails
is_valid_source_only, error_source_only = validate_typescript_syntax(source_code)
assert not is_valid_source_only, "This demonstrates the bug - validation fails without context"
assert "BLOCKED_TAR_ENTRY_TYPES" in error_source_only
# FIX: Validating source_code + dependency_code together should pass
combined_code = dependency_code + "\n\n" + source_code
is_valid_combined, error_combined = validate_typescript_syntax(combined_code)
assert is_valid_combined, f"Validation should pass with full context, but got error: {error_combined}"
assert error_combined is None