Use JunitXMl parser instead of Etree to parse test result output.

This commit is contained in:
afik.cohen 2023-10-26 11:19:16 -07:00
parent 51e650b3de
commit 6b528a883a
10 changed files with 153 additions and 94 deletions

View file

@ -1,5 +1,10 @@
def sorter(arr):
arr.sort()
for i in range(len(arr)):
for j in range(len(arr) - 1):
if arr[j] > arr[j + 1]:
temp = arr[j]
arr[j] = arr[j + 1]
arr[j + 1] = temp
return arr

View file

@ -6,31 +6,34 @@ from collections import defaultdict, namedtuple
import jedi
from codeflash.models import TestConfig
TestsInFile = namedtuple("TestsInFile", ["test_file", "test_function", "test_suite"])
def discover_unit_tests(test_directory, project_root_path, test_framework="pytest"):
if test_framework == "pytest":
return discover_tests_pytest(test_directory, project_root_path)
elif test_framework == "unittest":
return discover_tests_unittest(test_directory, project_root_path)
def discover_unit_tests(cfg: TestConfig):
if cfg.test_framework == "pytest":
return discover_tests_pytest(cfg)
elif cfg.test_framework == "unittest":
return discover_tests_unittest(cfg)
def discover_tests_pytest(test_directory, project_root_path):
def discover_tests_pytest(cfg: TestConfig):
test_root = cfg.test_root
pytest_result = subprocess.run(
["pytest", f"{test_directory}", "--co", "-q"],
["pytest", f"{test_root}", "--co", "-q"],
stdout=subprocess.PIPE,
cwd=test_directory,
cwd=test_root,
)
tests = parse_tests(pytest_result.stdout.decode("utf-8"))
file_to_test_map = defaultdict(list)
for test in tests:
test_file, function = test.split("::")
test_file_path = os.path.join(test_directory, test_file)
test_file_path = os.path.join(test_root, test_file)
if not os.path.exists(test_file_path):
# Seeing that in some circumstances pytest also returns the cwd as part of the test file path
one_level_up_test_directory = os.path.abspath(os.path.join(test_directory, ".."))
one_level_up_test_directory = os.path.abspath(os.path.join(test_root, ".."))
test_file_path = os.path.join(one_level_up_test_directory, test_file)
if not os.path.exists(test_file_path):
raise ValueError(
@ -38,12 +41,14 @@ def discover_tests_pytest(test_directory, project_root_path):
)
file_to_test_map[test_file_path].append({"test_function": function})
# Within these test files, find the project functions they are referring to and return their names/locations
return process_test_files(file_to_test_map, project_root_path, test_framework="pytest")
return process_test_files(file_to_test_map, cfg)
def discover_tests_unittest(test_directory, project_root_path):
def discover_tests_unittest(cfg: TestConfig):
test_root = cfg.test_root
project_root_path = cfg.project_root_path
loader = unittest.TestLoader()
tests = loader.discover(str(test_directory))
tests = loader.discover(str(test_root))
file_to_test_map = defaultdict(list)
for _test_suite in tests._tests:
for test_suite_2 in _test_suite._tests:
@ -58,16 +63,18 @@ def discover_tests_unittest(test_directory, project_root_path):
)
test_module_path = test_module.replace(".", os.sep)
test_module_path = os.path.join(str(test_directory), test_module_path) + ".py"
test_module_path = os.path.join(str(test_root), test_module_path) + ".py"
if not os.path.exists(test_module_path):
continue
file_to_test_map[test_module_path].append(
{"test_function": test_function, "test_suite_name": test_suite_name}
)
return process_test_files(file_to_test_map, project_root_path, test_framework="unittest")
return process_test_files(file_to_test_map, cfg)
def process_test_files(file_to_test_map, project_root_path, test_framework="pytest"):
def process_test_files(file_to_test_map, cfg: TestConfig):
project_root_path = cfg.project_root_path
test_framework = cfg.test_framework
function_to_test_map = defaultdict(list)
jedi_project = jedi.Project(path=project_root_path)
TestFunction = namedtuple("TestFunction", ["function_name", "test_suite_name"])

View file

@ -16,6 +16,7 @@ from codeflash.code_utils.config_parser import parse_config_file
from codeflash.discovery.discover_unit_tests import discover_unit_tests
from codeflash.discovery.functions_to_optimize import get_functions_to_optimize
from codeflash.instrumentation.instrument_existing_tests import inject_profiling_into_existing_test
from codeflash.models import TestConfig
from codeflash.optimization.function_context import get_function_context_len_constrained
from codeflash.optimization.optimizer import optimize_python_code
from codeflash.verification.equivalence import compare_results
@ -78,6 +79,10 @@ def parse_args():
return args
def main() -> None:
args = handle_arguments()
MAX_TEST_RUN_ITERATIONS = 5
INDIVIDUAL_TEST_TIMEOUT = 15
MAX_FUNCTION_TEST_SECONDS = 60
@ -85,7 +90,7 @@ N_CANDIDATES = 10
MIN_IMPROVEMENT_THRESHOLD = 0.05
def main() -> None:
def handle_arguments():
print("RUNNING THE OPTIMIZER")
args = parse_args()
modified_functions = get_functions_to_optimize(
@ -95,12 +100,16 @@ def main() -> None:
instrumented_unittests_created = set()
found_atleast_one_optimization = False
test_cfg = TestConfig(
test_root=args.test_root,
project_root_path=args.root,
test_framework=args.test_framework,
)
if os.path.exists("/tmp/pr_comment_temp.txt"):
os.remove("/tmp/pr_comment_temp.txt")
try:
functions_to_tests_map = discover_unit_tests(
args.test_root, args.root, test_framework=args.test_framework
)
functions_to_tests_map = discover_unit_tests(test_cfg)
for path in modified_functions:
if path.startswith(args.test_root + os.sep):
print("SKIPPING OPTIMIZING TEST FILE")
@ -164,6 +173,7 @@ def main() -> None:
dev_mode=args.dev_mode,
)
if new_tests is None:
print(f"/!\ NO TESTS GENERATED for {function_name}")
continue
generated_tests_path = get_test_file_path(args.test_root, function_name, 0)
test_files_created.add(generated_tests_path)
@ -196,19 +206,11 @@ def main() -> None:
existing_unittest_results_original.merge(
parse_test_xml(
result_file_path,
test_framework=args.test_framework,
test_type=TestType.EXISTING_UNIT_TEST,
test_config=test_cfg,
)
)
# existing_unittest_results_original = {
# **existing_unittest_results_original,
# **parse_test_xml(result_file_path),
# }
# timing_result = parse_test_timing(result_file_path)
# timing_result = filter_out_failed_test_timing(
# existing_unittest_results_original, timing_result
# )
timing = sum(
[
result.runtime
@ -216,7 +218,6 @@ def main() -> None:
if result.did_pass
]
)
# timing = sum(list(timing_result.values()))
instrumented_test_timing.append(timing)
test_env = os.environ.copy()
@ -230,6 +231,12 @@ def main() -> None:
pytest_timeout=INDIVIDUAL_TEST_TIMEOUT,
)
generated_tests_elapsed_time += time.time() - start_time
generated_tests_results_original = parse_test_xml(
result_file_path,
test_type=TestType.GENERATED_REGRESSION,
test_config=test_cfg,
)
# TODO: Implement the logic to disregard the timing info of the tests that ERRORed out. That is remove test cases that failed to run.
try:
original_results = parse_test_return_values_bin(
@ -319,15 +326,11 @@ def main() -> None:
existing_unittest_results_optimized.merge(
parse_test_xml(
result_file_path,
test_framework=args.test_framework,
test_type=TestType.EXISTING_UNIT_TEST,
test_config=test_cfg,
)
)
# timing_result = parse_test_xml(result_file_path)
# timing_result = filter_out_failed_test_timing(
# existing_unittest_results_optimized, timing_result
# )
timing = sum(
[
result.runtime
@ -335,7 +338,6 @@ def main() -> None:
if result.did_pass
]
)
# timing = sum(list(timing_result.values()))
instrumented_test_timing.append(timing)
if test_index == 0:
equal_results = True
@ -368,6 +370,11 @@ def main() -> None:
pytest_timeout=INDIVIDUAL_TEST_TIMEOUT,
)
generated_tests_elapsed_time += time.time() - start_time
generated_tests_results_optimized = parse_test_xml(
result_file_path,
test_type=TestType.GENERATED_REGRESSION,
test_config=test_cfg,
)
try:
test_results = parse_test_return_values_bin(
get_run_tmp_file(f"test_return_values_{j}.bin"),

8
codeflash/models.py Normal file
View file

@ -0,0 +1,8 @@
from pydantic.dataclasses import dataclass
@dataclass(frozen=True)
class TestConfig:
test_root: str
project_root_path: str
test_framework: str

View file

@ -0,0 +1,3 @@
EXPLAIN_MODEL = "gpt-4" # "gpt-3.5-turbo-16k"
PLAN_MODEL = "gpt-4" # "gpt-3.5-turbo-16k"
EXECUTE_MODEL = "gpt-4"

View file

@ -5,6 +5,8 @@ import os
import sys
from typing import List, Tuple
from codeflash.verification import EXPLAIN_MODEL, PLAN_MODEL, EXECUTE_MODEL
if sys.version_info < (3, 9, 0):
from astunparse import unparse as ast_unparse
@ -34,9 +36,9 @@ def regression_tests_from_function_with_inspiration(
unit_test_package: str = "pytest", # unit testing package; use the name as it appears in the import statement
approx_min_cases_to_cover: int = 7, # minimum number of test case categories to cover (approximate)
print_text: bool = False, # optionally prints text; helpful for understanding the function & debugging
explain_model: str = "gpt-3.5-turbo-16k", # model used to generate text plans in step 1
plan_model: str = "gpt-3.5-turbo-16k", # model used to generate text plans in steps 2 and 2b
execute_model: str = "gpt-3.5-turbo-16k", # model used to generate code in step 3
explain_model: str = EXPLAIN_MODEL, # model used to generate text plans in step 1
plan_model: str = PLAN_MODEL, # model used to generate text plans in steps 2 and 2b
execute_model: str = EXECUTE_MODEL, # model used to generate code in step 3
temperature: float = 0.4, # temperature = 0 can sometimes get stuck in repetitive loops, so we use 0.4
reruns_if_fail: int = 1, # if the output code cannot be parsed, this will re-run the function up to N times
) -> str:
@ -78,9 +80,9 @@ def regression_tests_from_function_with_inspiration(
unit_test_package=unit_test_package,
approx_min_cases_to_cover=approx_min_cases_to_cover,
print_text=print_text,
explain_model="gpt-3.5-turbo-16k",
plan_model="gpt-3.5-turbo-16k",
execute_model="gpt-3.5-turbo-16k",
explain_model=EXPLAIN_MODEL,
plan_model=PLAN_MODEL,
execute_model=EXECUTE_MODEL,
temperature=temperature,
reruns_if_fail=reruns_if_fail - 1, # decrement rerun counter when calling again
)
@ -172,9 +174,9 @@ import {unit_test_package} # used for our unit tests
unit_test_package=unit_test_package,
approx_min_cases_to_cover=approx_min_cases_to_cover,
print_text=print_text,
explain_model="gpt-3.5-turbo-16k",
plan_model="gpt-3.5-turbo-16k",
execute_model="gpt-3.5-turbo-16k",
explain_model=EXPLAIN_MODEL,
plan_model=PLAN_MODEL,
execute_model=EXECUTE_MODEL,
temperature=temperature,
reruns_if_fail=reruns_if_fail - 1, # decrement rerun counter when calling again
)
@ -212,9 +214,9 @@ import {unit_test_package} # used for our unit tests
unit_test_package=unit_test_package,
approx_min_cases_to_cover=approx_min_cases_to_cover,
print_text=print_text,
explain_model="gpt-3.5-turbo-16k",
plan_model="gpt-3.5-turbo-16k",
execute_model="gpt-3.5-turbo-16k",
explain_model=EXPLAIN_MODEL,
plan_model=PLAN_MODEL,
execute_model=EXECUTE_MODEL,
temperature=temperature,
reruns_if_fail=reruns_if_fail - 1, # decrement rerun counter when calling again
)

View file

@ -6,6 +6,7 @@ import os
import openai
from codeflash.code_utils.code_utils import ellipsis_in_ast
from codeflash.verification import EXPLAIN_MODEL, PLAN_MODEL, EXECUTE_MODEL
openai.api_key = os.environ["CODEFLASH_API_KEY"]
if "CODEFLASH_ORG_KEY" in os.environ:
@ -45,9 +46,9 @@ def regression_tests_from_function(
unit_test_package: str = "pytest", # unit testing package; use the name as it appears in the import statement
approx_min_cases_to_cover: int = 7, # minimum number of test case categories to cover (approximate)
print_text: bool = False, # optionally prints text; helpful for understanding the function & debugging
explain_model: str = "gpt-3.5-turbo-16k", # model used to generate text plans in step 1
plan_model: str = "gpt-3.5-turbo-16k", # model used to generate text plans in steps 2 and 2b
execute_model: str = "gpt-3.5-turbo-16k", # model used to generate code in step 3
explain_model: str = EXPLAIN_MODEL, # model used to generate text plans in step 1
plan_model: str = PLAN_MODEL, # model used to generate text plans in steps 2 and 2b
execute_model: str = EXECUTE_MODEL, # model used to generate code in step 3
temperature: float = 0.4, # temperature = 0 can sometimes get stuck in repetitive loops, so we use 0.4
reruns_if_fail: int = 1, # if the output code cannot be parsed, this will re-run the function up to N times
) -> str:
@ -89,9 +90,9 @@ def regression_tests_from_function(
unit_test_package=unit_test_package,
approx_min_cases_to_cover=approx_min_cases_to_cover,
print_text=print_text,
explain_model="gpt-3.5-turbo-16k",
plan_model="gpt-3.5-turbo-16k",
execute_model="gpt-3.5-turbo-16k",
explain_model=EXPLAIN_MODEL,
plan_model=PLAN_MODEL,
execute_model=EXECUTE_MODEL,
temperature=temperature,
reruns_if_fail=reruns_if_fail - 1, # decrement rerun counter when calling again
)
@ -143,9 +144,9 @@ To help unit test the function above, list diverse scenarios that the function s
unit_test_package=unit_test_package,
approx_min_cases_to_cover=approx_min_cases_to_cover,
print_text=print_text,
explain_model="gpt-3.5-turbo-16k",
plan_model="gpt-3.5-turbo-16k",
execute_model="gpt-3.5-turbo-16k",
explain_model=EXPLAIN_MODEL,
plan_model=PLAN_MODEL,
execute_model=EXECUTE_MODEL,
temperature=temperature,
reruns_if_fail=reruns_if_fail - 1, # decrement rerun counter when calling again
)
@ -194,9 +195,9 @@ To help unit test the function above, list diverse scenarios that the function s
unit_test_package=unit_test_package,
approx_min_cases_to_cover=approx_min_cases_to_cover,
print_text=print_text,
explain_model="gpt-3.5-turbo-16k",
plan_model="gpt-3.5-turbo-16k",
execute_model="gpt-3.5-turbo-16k",
explain_model=EXPLAIN_MODEL,
plan_model=PLAN_MODEL,
execute_model=EXECUTE_MODEL,
temperature=temperature,
reruns_if_fail=reruns_if_fail - 1, # decrement rerun counter when calling again
)
@ -266,9 +267,9 @@ import {unit_test_package} # used for our unit tests
unit_test_package=unit_test_package,
approx_min_cases_to_cover=approx_min_cases_to_cover,
print_text=print_text,
explain_model="gpt-3.5-turbo-16k",
plan_model="gpt-3.5-turbo-16k",
execute_model="gpt-3.5-turbo-16k",
explain_model=EXPLAIN_MODEL,
plan_model=PLAN_MODEL,
execute_model=EXECUTE_MODEL,
temperature=temperature,
reruns_if_fail=reruns_if_fail - 1, # decrement rerun counter when calling again
)

View file

@ -1,8 +1,11 @@
import os
import pickle
import re
import xml.etree.ElementTree as ET
from junitparser.xunit2 import JUnitXml
from codeflash.code_utils.code_utils import get_module_name_from_file
from codeflash.models import TestConfig
from codeflash.verification.test_results import TestResults, FunctionTestInvocation, TestType
@ -44,36 +47,58 @@ def parse_test_return_values_bin(
def parse_test_xml(
test_xml_file_path: str, test_framework: str, test_type: TestType
test_xml_file_path: str, test_type: TestType, test_config: TestConfig
) -> TestResults:
test_results = TestResults()
# Parse unittest output
tree = ET.parse(test_xml_file_path)
root = tree.getroot()
for testcase in root.iter("testcase"):
class_name = testcase.attrib["classname"]
# name = testcase.attrib["name"]
file_name = testcase.attrib["file"]
result = testcase.find("failure") is None # TODO: See for the cases of ERROR and SKIPPED
# Parse test timing
system_out_content = ""
for system_out in testcase.iter("system-out"):
system_out_content += system_out.text
m = re.findall(r"#####([^#]*?)#####([\d\.]*?)\^\^\^\^\^", system_out_content)
for func_name, time_taken in m:
time_taken = int(time_taken)
test_results.add(
FunctionTestInvocation(
id=func_name,
test_name=class_name,
file_name=file_name,
runtime=time_taken,
test_framework=test_framework,
did_pass=result,
test_type=test_type,
return_value=None,
# Parse unittest output
xml = JUnitXml.fromfile(test_xml_file_path)
for suite in xml:
for testcase in suite:
class_name = testcase.classname
file_name = suite._elem.attrib.get("file") # TODO: Convert this to absolute paths
result = testcase.result == [] # TODO: See for the cases of ERROR and SKIPPED
test_module_path = get_module_name_from_file(file_name, test_config.project_root_path)
test_class = None
if class_name.startswith(test_module_path):
test_class = class_name[
len(test_module_path) + 1 :
] # +1 for the dot, gets Unittest class name
test_id = (test_class + "." if test_class else "") + testcase.name
# Parse test timing
# system_out_content = ""
# for system_out in testcase.system_out:
# system_out_content += system_out.text
if testcase.system_out:
m = re.findall(r"#####([^#]*?)#####([\d\.]*?)\^\^\^\^\^", testcase.system_out)
for func_name, time_taken in m:
time_taken = int(time_taken)
test_results.add(
FunctionTestInvocation(
id=func_name,
test_name=test_class,
file_name=file_name,
runtime=time_taken,
test_framework=test_config.test_framework,
did_pass=result,
test_type=test_type,
return_value=None,
)
)
else:
# TODO: The id here is incorrect, fix it
test_results.add(
FunctionTestInvocation(
id=class_name,
test_name=test_class,
file_name=file_name,
runtime=-99999999, # Negative = Did not get the runtime, strange number which should catch attention quickly if anything goes wrong
test_framework=test_config.test_framework,
did_pass=result,
test_type=test_type,
return_value=None,
)
)
)
return test_results

View file

@ -10,10 +10,10 @@ class TestType(Enum):
EXISTING_UNIT_TEST = 3
@dataclass
@dataclass(frozen=True)
class FunctionTestInvocation:
id: str # The fully qualified name of the function invocation (id)
test_name: str # the fully qualified name of the tester function.
test_name: str # The name of the test_function. Does not include the components of the file_name
file_name: str # The file where the test is defined
did_pass: bool # Whether the test this function invocation was part of, passed or failed
runtime: int # Time in nanoseconds

View file

@ -23,6 +23,7 @@ astunparse-fixed = {version = "^1.7.0", optional = true, python = ">=3.8.0,<3.9"
tomli = {version = "^2.0.1", optional = true, python = "<3.11"}
unittest-xml-reporting = "^3.2.0"
pydantic = "^2.4.2"
junitparser = "^3.1.0"
[tool.poetry.group.dev.dependencies]
ipython = "^8.12.0"