mirror of
https://github.com/codeflash-ai/codeflash-internal.git
synced 2026-05-04 18:25:18 +00:00
## Problem The JS/TS language handler (`core/languages/js_ts/`) was importing models, schemas, config, prompts, and helpers directly from the Python language handler. This created a confusing architectural dependency and risked serving wrong language-specific prompt content. ## What Changed - Created `core/shared/` for genuinely language-agnostic code (optimizer schemas, models, config, testgen models, context helpers) - Moved JS/TS-specific prompts and context helpers into `core/languages/js_ts/` - Updated all consumers (20+ files) to import from the correct locations - Removed backwards-compat re-exports from the Python module ## Result - **Before:** 11 imports from `core.languages.python` in `core/languages/js_ts/` - **After:** 0
577 lines
19 KiB
Python
577 lines
19 KiB
Python
from libcst import parse_module as parse_module_to_cst
|
|
|
|
from aiservice.models.functions_to_optimize import FunctionToOptimize
|
|
from core.languages.python.testgen.postprocessing.code_validator import validate_testgen_code
|
|
from core.languages.python.testgen.postprocessing.postprocess_pipeline import postprocessing_testgen_pipeline
|
|
from core.shared.context_helpers import group_code
|
|
|
|
|
|
def test_postprocessing_testgen_pipeline() -> None:
|
|
code = r'''
|
|
import re
|
|
from typing import Any
|
|
|
|
# imports
|
|
import pytest # used for our unit tests
|
|
|
|
def function_to_remove():
|
|
pass
|
|
|
|
for i in range(2000):
|
|
for j in range(3000):
|
|
print(i, j)
|
|
|
|
def extract_input_variables(nodes: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""Extracts input variables from the template and adds them to the input_variables field."""
|
|
prompt_pattern = re.compile(r"\{(.*?)\}")
|
|
|
|
for node in nodes:
|
|
try:
|
|
data_node = node["data"]["node"]
|
|
template_info = data_node["template"]
|
|
template_type = template_info["_type"]
|
|
|
|
if "input_variables" in template_info:
|
|
if template_type == "prompt":
|
|
value = template_info["template"]["value"]
|
|
variables = prompt_pattern.findall(value)
|
|
elif template_type == "few_shot":
|
|
prefix = template_info["prefix"]["value"]
|
|
suffix = template_info["suffix"]["value"]
|
|
variables = prompt_pattern.findall(prefix + suffix)
|
|
else:
|
|
variables = []
|
|
|
|
template_info["input_variables"]["value"] = variables
|
|
except (KeyError, TypeError):
|
|
# Exception suppressed as in the original code
|
|
pass
|
|
|
|
return nodes
|
|
|
|
|
|
# unit tests
|
|
|
|
def test_single_prompt_variable():
|
|
nodes = [{"data": {"node": {"template": {"_type": "prompt", "template": {"value": "{var1}"}, "input_variables": {}}}}}]
|
|
result = extract_input_variables(nodes)
|
|
assert result[0]["data"]["node"]["template"]["input_variables"]["value"] == ["var1"]
|
|
|
|
def test_multiple_prompt_variables():
|
|
nodes = [{"data": {"node": {"template": {"_type": "prompt", "template": {"value": "{var1} and {var2}"}, "input_variables": {}}}}}]
|
|
result = extract_input_variables(nodes)
|
|
assert result[0]["data"]["node"]["template"]["input_variables"]["value"] == ["var1", "var2"]
|
|
|
|
def test_few_shot_variables():
|
|
nodes = [{"data": {"node": {"template": {"_type": "few_shot", "prefix": {"value": "{var1}"}, "suffix": {"value": "{var2}"}, "input_variables": {}}}}}]
|
|
result = extract_input_variables(nodes)
|
|
assert result[0]["data"]["node"]["template"]["input_variables"]["value"] == ["var1", "var2"]
|
|
'''
|
|
module = parse_module_to_cst(code)
|
|
|
|
# Source code that defines the function being tested
|
|
source_code_being_tested = '''
|
|
def extract_input_variables(nodes):
|
|
"""Extracts input variables from the template."""
|
|
pass
|
|
'''
|
|
|
|
function_to_optimize = FunctionToOptimize(
|
|
function_name="extract_input_variables",
|
|
file_path="testgen/postprocessing/tests/test_validate_pipeline.py",
|
|
parents=[],
|
|
starting_line=None,
|
|
ending_line=None,
|
|
)
|
|
module_path = "test_validate_pipeline"
|
|
result = postprocessing_testgen_pipeline(
|
|
module, ["function_to_remove"], function_to_optimize, module_path, source_code_being_tested
|
|
)
|
|
|
|
# After consolidation, the function definition is removed by add_missing_imports_from_source
|
|
# (which detects local redefinitions of public symbols) and replaced with an import.
|
|
# The import is added at the top by AddImportsVisitor.
|
|
expected = r"""from test_validate_pipeline import extract_input_variables
|
|
import re
|
|
from typing import Any
|
|
|
|
# imports
|
|
import pytest # used for our unit tests
|
|
|
|
|
|
# unit tests
|
|
|
|
def test_single_prompt_variable():
|
|
nodes = [{"data": {"node": {"template": {"_type": "prompt", "template": {"value": "{var1}"}, "input_variables": {}}}}}]
|
|
codeflash_output = extract_input_variables(nodes); result = codeflash_output
|
|
|
|
def test_multiple_prompt_variables():
|
|
nodes = [{"data": {"node": {"template": {"_type": "prompt", "template": {"value": "{var1} and {var2}"}, "input_variables": {}}}}}]
|
|
codeflash_output = extract_input_variables(nodes); result = codeflash_output
|
|
|
|
def test_few_shot_variables():
|
|
nodes = [{"data": {"node": {"template": {"_type": "few_shot", "prefix": {"value": "{var1}"}, "suffix": {"value": "{var2}"}, "input_variables": {}}}}}]
|
|
codeflash_output = extract_input_variables(nodes); result = codeflash_output
|
|
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
|
|
"""
|
|
|
|
assert result.code == expected
|
|
|
|
|
|
def test_postprocessing_pipeline_with_multi_context_imports() -> None:
|
|
"""Test the full testgen pipeline including all processing stages.
|
|
|
|
This tests the complete flow that generated test code goes through:
|
|
1. validate_testgen_code - validates and cleans raw LLM output
|
|
2. postprocessing_testgen_pipeline - transforms the code (includes all stages)
|
|
|
|
This also tests the fix for the bug where CST tree corruption could cause
|
|
'NoneType' object has no attribute 'visit' errors.
|
|
"""
|
|
# Raw test code (as if from LLM output)
|
|
raw_test_code = """
|
|
import pytest
|
|
from unittest.mock import Mock, MagicMock, patch
|
|
from typing import Any, Optional
|
|
import numpy as np
|
|
|
|
from unstructured.documents.elements import Text, ListItem, PageBreak
|
|
from unstructured.partition.pdf import document_to_element_list
|
|
|
|
|
|
def test_document_to_element_list_empty_document():
|
|
\"\"\"Test that an empty document returns an empty list of elements.\"\"\"
|
|
mock_document = Mock()
|
|
mock_document.pages = []
|
|
result = document_to_element_list(mock_document)
|
|
assert result == []
|
|
assert isinstance(result, list)
|
|
|
|
|
|
def test_document_to_element_list_single_element():
|
|
\"\"\"Test basic conversion with one element.\"\"\"
|
|
mock_layout_element = Mock()
|
|
mock_layout_element.text = "Hello World"
|
|
mock_layout_element.type = "Text"
|
|
mock_layout_element.bbox = Mock()
|
|
mock_layout_element.bbox.x1 = np.nan
|
|
mock_layout_element.parent = None
|
|
|
|
mock_page = Mock()
|
|
mock_page.elements_array = Mock()
|
|
mock_page.elements_array.element_class_id_map = {}
|
|
mock_page.elements_array.element_class_ids = np.array([])
|
|
mock_page.elements_array.iter_elements = Mock(return_value=[mock_layout_element])
|
|
|
|
mock_document = Mock()
|
|
mock_document.pages = [mock_page]
|
|
|
|
with patch('unstructured.partition.pdf.normalize_layout_element') as mock_normalize:
|
|
text_element = Text(text="Hello World")
|
|
mock_normalize.return_value = text_element
|
|
result = document_to_element_list(mock_document)
|
|
|
|
assert len(result) == 1
|
|
assert isinstance(result[0], Text)
|
|
"""
|
|
|
|
# Source code being tested (what the LLM saw)
|
|
source_code = """
|
|
def document_to_element_list(document, sortable=False, include_page_breaks=False,
|
|
last_modification_date=None, detection_origin=None):
|
|
\"\"\"Convert a document to a list of elements.\"\"\"
|
|
elements = []
|
|
for page in document.pages:
|
|
for layout_element in page.elements_array.iter_elements():
|
|
element = normalize_layout_element(layout_element)
|
|
elements.append(element)
|
|
return elements
|
|
"""
|
|
|
|
# Source code blocks simulating multi-context
|
|
source_code_blocks = {
|
|
"unstructured/partition/pdf.py": source_code,
|
|
"unstructured/documents/elements.py": """
|
|
class Element:
|
|
def __init__(self, text=""):
|
|
self.text = text
|
|
|
|
class Text(Element):
|
|
pass
|
|
|
|
class ListItem(Element):
|
|
pass
|
|
|
|
class PageBreak(Element):
|
|
pass
|
|
""",
|
|
}
|
|
|
|
expected = """from unstructured.partition.pdf import document_to_element_list
|
|
from typing import Any, Optional
|
|
from unittest.mock import MagicMock, Mock, patch
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from unstructured.documents.elements import ListItem, PageBreak, Text
|
|
|
|
|
|
def test_document_to_element_list_empty_document():
|
|
\"\"\"Test that an empty document returns an empty list of elements.\"\"\"
|
|
mock_document = Mock()
|
|
mock_document.pages = []
|
|
codeflash_output = document_to_element_list(mock_document); result = codeflash_output
|
|
|
|
|
|
def test_document_to_element_list_single_element():
|
|
\"\"\"Test basic conversion with one element.\"\"\"
|
|
mock_layout_element = Mock()
|
|
mock_layout_element.text = "Hello World"
|
|
mock_layout_element.type = "Text"
|
|
mock_layout_element.bbox = Mock()
|
|
mock_layout_element.bbox.x1 = np.nan
|
|
mock_layout_element.parent = None
|
|
|
|
mock_page = Mock()
|
|
mock_page.elements_array = Mock()
|
|
mock_page.elements_array.element_class_id_map = {}
|
|
mock_page.elements_array.element_class_ids = np.array([])
|
|
mock_page.elements_array.iter_elements = Mock(return_value=[mock_layout_element])
|
|
|
|
mock_document = Mock()
|
|
mock_document.pages = [mock_page]
|
|
|
|
with patch('unstructured.partition.pdf.normalize_layout_element') as mock_normalize:
|
|
text_element = Text(text="Hello World")
|
|
mock_normalize.return_value = text_element
|
|
codeflash_output = document_to_element_list(mock_document); result = codeflash_output
|
|
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code."""
|
|
|
|
function_to_optimize = FunctionToOptimize(
|
|
function_name="document_to_element_list",
|
|
file_path="unstructured/partition/pdf.py",
|
|
parents=[],
|
|
starting_line=None,
|
|
ending_line=None,
|
|
)
|
|
module_path = "unstructured.partition.pdf"
|
|
python_version = (3, 11)
|
|
|
|
# Step 1: Validate testgen code (simulates what happens after LLM response)
|
|
validated_code = validate_testgen_code(raw_test_code, python_version)
|
|
|
|
# Step 2: postprocessing_testgen_pipeline (includes add_missing_imports and replace_definition_with_import)
|
|
source_code_being_tested = group_code(source_code_blocks)
|
|
processed_module = postprocessing_testgen_pipeline(
|
|
parse_module_to_cst(validated_code), [], function_to_optimize, module_path, source_code_being_tested
|
|
)
|
|
|
|
assert processed_module.code == expected
|
|
|
|
|
|
def test_postprocessing_pipeline_with_unstructured_test_code() -> None:
|
|
"""Test the full testgen pipeline with complex test code from unstructured codebase.
|
|
|
|
This tests the complete flow with helper functions and multiple test cases:
|
|
1. validate_testgen_code - validates and cleans raw LLM output
|
|
2. postprocessing_testgen_pipeline - transforms the code (includes all stages)
|
|
|
|
Uses a simplified version of actual generated test code that triggered
|
|
'NoneType' object has no attribute 'visit' error.
|
|
"""
|
|
# Raw test code with helper functions (as if from LLM output)
|
|
raw_test_code = """
|
|
import math
|
|
from types import SimpleNamespace
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from unstructured.partition.pdf import document_to_element_list
|
|
from unstructured.documents.elements import Text, ListItem, PageBreak, Title, Element
|
|
from unstructured.documents.elements import ElementType
|
|
|
|
|
|
def make_layout_element_dict_like(
|
|
*,
|
|
text: str = "",
|
|
element_type: str | None = None,
|
|
coordinates: tuple | None = None,
|
|
prob: float | None = None,
|
|
bbox_x1: float = 0.0,
|
|
bbox_x2: float = 1.0,
|
|
bbox_y1: float = 0.0,
|
|
bbox_y2: float = 1.0,
|
|
parent: object | None = None,
|
|
):
|
|
bbox = SimpleNamespace(x1=bbox_x1, x2=bbox_x2, y1=bbox_y1, y2=bbox_y2)
|
|
def to_dict():
|
|
out = {"text": text}
|
|
if element_type is not None:
|
|
out["type"] = element_type
|
|
if coordinates is not None:
|
|
out["coordinates"] = coordinates
|
|
if prob is not None:
|
|
out["prob"] = prob
|
|
return out
|
|
le = SimpleNamespace(
|
|
bbox=bbox,
|
|
to_dict=to_dict,
|
|
parent=parent,
|
|
)
|
|
return le
|
|
|
|
|
|
def make_page(elements, *, image_metadata=None, image=None):
|
|
class ElementsArray:
|
|
def __init__(self, elements):
|
|
self._elements = elements
|
|
self.element_class_id_map = {}
|
|
self.element_class_ids = np.array([], dtype=int)
|
|
|
|
def iter_elements(self):
|
|
for el in self._elements:
|
|
yield el
|
|
|
|
elements_array = ElementsArray(elements)
|
|
page = SimpleNamespace(
|
|
elements_array=elements_array,
|
|
image_metadata=image_metadata,
|
|
image=image,
|
|
)
|
|
return page
|
|
|
|
|
|
def test_single_text_element_basic():
|
|
coords = ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0))
|
|
layout_el = make_layout_element_dict_like(
|
|
text="Hello World",
|
|
element_type=ElementType.TEXT,
|
|
coordinates=coords,
|
|
bbox_x1=5.0,
|
|
)
|
|
image_metadata = {"format": "PNG", "width": 100, "height": 200}
|
|
page = make_page([layout_el], image_metadata=image_metadata)
|
|
document = SimpleNamespace(pages=[page])
|
|
|
|
elements = document_to_element_list(
|
|
document,
|
|
sortable=False,
|
|
include_page_breaks=False,
|
|
last_modification_date="2022-01-01",
|
|
detection_origin="detected-by-model",
|
|
)
|
|
|
|
assert len(elements) == 1
|
|
el = elements[0]
|
|
assert isinstance(el, Text)
|
|
assert str(el) == "Hello World"
|
|
assert el.metadata.page_number == 1
|
|
|
|
|
|
def test_list_items_are_inferred():
|
|
list_text = "1. First item\\n2. Second item"
|
|
layout_el = make_layout_element_dict_like(
|
|
text=list_text,
|
|
element_type=ElementType.LIST,
|
|
coordinates=None,
|
|
bbox_x1=5.0,
|
|
)
|
|
page = make_page([layout_el], image_metadata={"format": "JPEG", "width": 10, "height": 20})
|
|
document = SimpleNamespace(pages=[page])
|
|
|
|
elements = document_to_element_list(
|
|
document,
|
|
infer_list_items=True,
|
|
last_modification_date="2023-07-07",
|
|
)
|
|
|
|
assert len(elements) == 2
|
|
for item in elements:
|
|
assert isinstance(item, ListItem)
|
|
"""
|
|
|
|
# Source code being tested (multi-context format)
|
|
source_code = """
|
|
from unstructured.documents.elements import Element, Text, ListItem, PageBreak
|
|
from unstructured.partition.common.common import normalize_layout_element
|
|
|
|
def document_to_element_list(document, sortable=False, include_page_breaks=False,
|
|
last_modification_date=None, detection_origin=None,
|
|
starting_page_number=1, infer_list_items=True, **kwargs):
|
|
elements = []
|
|
for page_idx, page in enumerate(document.pages):
|
|
page_number = starting_page_number + page_idx
|
|
for layout_element in page.elements_array.iter_elements():
|
|
element = normalize_layout_element(layout_element)
|
|
if isinstance(element, list):
|
|
elements.extend(element)
|
|
else:
|
|
elements.append(element)
|
|
if include_page_breaks and page_idx < len(document.pages):
|
|
elements.append(PageBreak())
|
|
return elements
|
|
"""
|
|
|
|
# Source code blocks from unstructured codebase
|
|
source_code_blocks = {
|
|
"unstructured/partition/pdf.py": source_code,
|
|
"unstructured/documents/elements.py": """
|
|
from typing import Any, Optional
|
|
|
|
class ElementMetadata:
|
|
def __init__(self):
|
|
self.page_number = None
|
|
self.parent_id = None
|
|
self.coordinates = None
|
|
self.last_modified = None
|
|
|
|
class Element:
|
|
def __init__(self, text=""):
|
|
self.text = text
|
|
self.metadata = ElementMetadata()
|
|
self.id = id(self)
|
|
|
|
def __str__(self):
|
|
return self.text
|
|
|
|
class Text(Element):
|
|
pass
|
|
|
|
class ListItem(Element):
|
|
pass
|
|
|
|
class PageBreak(Element):
|
|
pass
|
|
|
|
class Title(Element):
|
|
pass
|
|
|
|
class ElementType:
|
|
TEXT = "Text"
|
|
LIST = "List"
|
|
TITLE = "Title"
|
|
""",
|
|
}
|
|
|
|
expected = r"""from unstructured.partition.pdf import document_to_element_list
|
|
import math
|
|
from types import SimpleNamespace
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from unstructured.documents.elements import (Element, ElementType, ListItem,
|
|
PageBreak, Text, Title)
|
|
|
|
|
|
def make_layout_element_dict_like(
|
|
*,
|
|
text: str = "",
|
|
element_type: str | None = None,
|
|
coordinates: tuple | None = None,
|
|
prob: float | None = None,
|
|
bbox_x1: float = 0.0,
|
|
bbox_x2: float = 1.0,
|
|
bbox_y1: float = 0.0,
|
|
bbox_y2: float = 1.0,
|
|
parent: object | None = None,
|
|
):
|
|
bbox = SimpleNamespace(x1=bbox_x1, x2=bbox_x2, y1=bbox_y1, y2=bbox_y2)
|
|
def to_dict():
|
|
out = {"text": text}
|
|
if element_type is not None:
|
|
out["type"] = element_type
|
|
if coordinates is not None:
|
|
out["coordinates"] = coordinates
|
|
if prob is not None:
|
|
out["prob"] = prob
|
|
return out
|
|
le = SimpleNamespace(
|
|
bbox=bbox,
|
|
to_dict=to_dict,
|
|
parent=parent,
|
|
)
|
|
return le
|
|
|
|
|
|
def make_page(elements, *, image_metadata=None, image=None):
|
|
class ElementsArray:
|
|
def __init__(self, elements):
|
|
self._elements = elements
|
|
self.element_class_id_map = {}
|
|
self.element_class_ids = np.array([], dtype=int)
|
|
|
|
def iter_elements(self):
|
|
for el in self._elements:
|
|
yield el
|
|
|
|
elements_array = ElementsArray(elements)
|
|
page = SimpleNamespace(
|
|
elements_array=elements_array,
|
|
image_metadata=image_metadata,
|
|
image=image,
|
|
)
|
|
return page
|
|
|
|
|
|
def test_single_text_element_basic():
|
|
coords = ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0))
|
|
layout_el = make_layout_element_dict_like(
|
|
text="Hello World",
|
|
element_type=ElementType.TEXT,
|
|
coordinates=coords,
|
|
bbox_x1=5.0,
|
|
)
|
|
image_metadata = {"format": "PNG", "width": 100, "height": 200}
|
|
page = make_page([layout_el], image_metadata=image_metadata)
|
|
document = SimpleNamespace(pages=[page])
|
|
|
|
codeflash_output = document_to_element_list(
|
|
document,
|
|
sortable=False,
|
|
include_page_breaks=False,
|
|
last_modification_date="2022-01-01",
|
|
detection_origin="detected-by-model",
|
|
); elements = codeflash_output
|
|
el = elements[0]
|
|
|
|
|
|
def test_list_items_are_inferred():
|
|
list_text = "1. First item\n2. Second item"
|
|
layout_el = make_layout_element_dict_like(
|
|
text=list_text,
|
|
element_type=ElementType.LIST,
|
|
coordinates=None,
|
|
bbox_x1=5.0,
|
|
)
|
|
page = make_page([layout_el], image_metadata={"format": "JPEG", "width": 10, "height": 20})
|
|
document = SimpleNamespace(pages=[page])
|
|
|
|
codeflash_output = document_to_element_list(
|
|
document,
|
|
infer_list_items=True,
|
|
last_modification_date="2023-07-07",
|
|
); elements = codeflash_output
|
|
for item in elements:
|
|
pass
|
|
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code."""
|
|
|
|
function_to_optimize = FunctionToOptimize(
|
|
function_name="document_to_element_list",
|
|
file_path="unstructured/partition/pdf.py",
|
|
parents=[],
|
|
starting_line=None,
|
|
ending_line=None,
|
|
)
|
|
module_path = "unstructured.partition.pdf"
|
|
python_version = (3, 11)
|
|
|
|
# Step 1: Validate testgen code (simulates what happens after LLM response)
|
|
validated_code = validate_testgen_code(raw_test_code, python_version)
|
|
|
|
# Step 2: postprocessing_testgen_pipeline (includes add_missing_imports and replace_definition_with_import)
|
|
source_code_being_tested = group_code(source_code_blocks)
|
|
processed_module = postprocessing_testgen_pipeline(
|
|
parse_module_to_cst(validated_code), [], function_to_optimize, module_path, source_code_being_tested
|
|
)
|
|
|
|
assert processed_module.code == expected
|