codeflash/tests/test_instrument_all_and_run.py
Kevin Turcios f43ee06859 refactor: restructure codebase for locality and faster CLI startup
Move files closer to their consumers:
- function_context.py merged into code_context_extractor.py
- FunctionOptimizer base class to languages/function_optimizer.py
- test_runner, instrument_codeflash_capture, parse_line_profile to languages/python/
- oauth_handler.py to cli_cmds/

Split cmd_init.py (1993 lines) into focused modules:
- init_config.py: config types, validation, writing, shared UI
- init_auth.py: API key management + GitHub app installation
- github_workflow.py: GitHub Actions workflow generation
- cmd_init.py: init orchestrator + Python setup (639 lines)

Defer heavy imports (cmd_init, posthog, sentry) from module-level to
usage sites, reducing CLI startup from ~600ms to ~250ms. Replace
set_defaults(func=) with direct args.command dispatch in main().
2026-03-07 08:21:27 -05:00

840 lines
35 KiB
Python

from __future__ import annotations
import os
import sys
import tempfile
from argparse import Namespace
from pathlib import Path
from codeflash.code_utils.code_utils import get_run_tmp_file
from codeflash.code_utils.instrument_existing_tests import inject_profiling_into_existing_test
from codeflash.discovery.functions_to_optimize import FunctionToOptimize
from codeflash.models.models import CodePosition, FunctionParent, TestFile, TestFiles, TestingMode, TestType
from codeflash.optimization.optimizer import Optimizer
from codeflash.verification.equivalence import compare_test_results
from codeflash.languages.python.instrument_codeflash_capture import instrument_codeflash_capture
# Used by cli instrumentation
codeflash_wrap_string = """def codeflash_wrap(codeflash_wrapped, codeflash_test_module_name, codeflash_test_class_name, codeflash_test_name, codeflash_function_name, codeflash_line_id, codeflash_loop_index, codeflash_cur, codeflash_con, *args, **kwargs):
test_id = f'{{codeflash_test_module_name}}:{{codeflash_test_class_name}}:{{codeflash_test_name}}:{{codeflash_line_id}}:{{codeflash_loop_index}}'
if not hasattr(codeflash_wrap, 'index'):
codeflash_wrap.index = {{}}
if test_id in codeflash_wrap.index:
codeflash_wrap.index[test_id] += 1
else:
codeflash_wrap.index[test_id] = 0
codeflash_test_index = codeflash_wrap.index[test_id]
invocation_id = f'{{codeflash_line_id}}_{{codeflash_test_index}}'
test_stdout_tag = f"{{codeflash_test_module_name}}:{{(codeflash_test_class_name + '.' if codeflash_test_class_name else '')}}{{codeflash_test_name}}:{{codeflash_function_name}}:{{codeflash_loop_index}}:{{invocation_id}}"
print(f"!$######{{test_stdout_tag}}######$!")
exception = None
gc.disable()
try:
counter = time.perf_counter_ns()
return_value = codeflash_wrapped(*args, **kwargs)
codeflash_duration = time.perf_counter_ns() - counter
except Exception as e:
codeflash_duration = time.perf_counter_ns() - counter
exception = e
gc.enable()
print(f"!######{{test_stdout_tag}}######!")
pickled_return_value = pickle.dumps(exception) if exception else pickle.dumps(return_value)
codeflash_cur.execute('INSERT INTO test_results VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', (codeflash_test_module_name, codeflash_test_class_name, codeflash_test_name, codeflash_function_name, codeflash_loop_index, invocation_id, codeflash_duration, pickled_return_value, 'function_call'))
codeflash_con.commit()
if exception:
raise exception
return return_value
"""
def test_bubble_sort_behavior_results() -> None:
code = """from code_to_optimize.bubble_sort import sorter
def test_sort():
input = [5, 4, 3, 2, 1, 0]
output = sorter(input)
assert output == [0, 1, 2, 3, 4, 5]
input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
output = sorter(input)
assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]"""
expected = (
"""import gc
import inspect
import os
import sqlite3
import time
import dill as pickle
from code_to_optimize.bubble_sort import sorter
"""
+ codeflash_wrap_string
+ """
def test_sort():
codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX'])
codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION']
codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite')
codeflash_cur = codeflash_con.cursor()
codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB, verification_type TEXT)')
input = [5, 4, 3, 2, 1, 0]
_call__bound__arguments = inspect.signature(sorter).bind(input)
_call__bound__arguments.apply_defaults()
output = codeflash_wrap(sorter, '{module_path}', None, 'test_sort', 'sorter', '1', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs)
assert output == [0, 1, 2, 3, 4, 5]
input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
_call__bound__arguments = inspect.signature(sorter).bind(input)
_call__bound__arguments.apply_defaults()
output = codeflash_wrap(sorter, '{module_path}', None, 'test_sort', 'sorter', '4', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs)
assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
codeflash_con.close()
"""
)
test_path = (
Path(__file__).parent.resolve()
/ "../code_to_optimize/tests/pytest/test_perfinjector_bubble_sort_results_temp.py"
).resolve()
test_path_perf = (
Path(__file__).parent.resolve()
/ "../code_to_optimize/tests/pytest/test_perfinjector_bubble_sort_results_perf_temp.py"
).resolve()
fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/bubble_sort.py").resolve()
original_code = fto_path.read_text("utf-8")
try:
with test_path.open("w") as f:
f.write(code)
tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve()
project_root_path = (Path(__file__).parent / "..").resolve()
original_cwd = Path.cwd()
run_cwd = Path(__file__).parent.parent.resolve()
func = FunctionToOptimize(function_name="sorter", parents=[], file_path=Path(fto_path))
os.chdir(run_cwd)
success, new_test = inject_profiling_into_existing_test(
test_path, [CodePosition(6, 13), CodePosition(10, 13)], func, project_root_path, mode=TestingMode.BEHAVIOR
)
os.chdir(original_cwd)
assert success
assert new_test is not None
assert new_test.replace('"', "'") == expected.format(
module_path="code_to_optimize.tests.pytest.test_perfinjector_bubble_sort_results_temp",
tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix(),
).replace('"', "'")
with test_path.open("w") as f:
f.write(new_test)
# add codeflash capture
instrument_codeflash_capture(func, {}, tests_root)
opt = Optimizer(
Namespace(
project_root=project_root_path,
disable_telemetry=True,
tests_root=tests_root,
test_framework="pytest",
pytest_cmd="pytest",
experiment_id=None,
test_project_root=project_root_path,
)
)
test_env = os.environ.copy()
test_env["CODEFLASH_TEST_ITERATION"] = "0"
test_env["CODEFLASH_LOOP_INDEX"] = "1"
test_type = TestType.EXISTING_UNIT_TEST
func_optimizer = opt.create_function_optimizer(func)
func_optimizer.test_files = TestFiles(
test_files=[
TestFile(
instrumented_behavior_file_path=test_path,
test_type=test_type,
original_file_path=test_path,
benchmarking_file_path=test_path_perf,
)
]
)
test_results, coverage_data = func_optimizer.run_and_parse_tests(
testing_type=TestingMode.BEHAVIOR,
test_env=test_env,
test_files=func_optimizer.test_files,
optimization_iteration=0,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
out_str = """codeflash stdout: Sorting list
result: [0, 1, 2, 3, 4, 5]
"""
assert test_results[0].stdout == out_str
assert out_str == test_results[0].stdout
assert test_results[0].id.function_getting_tested == "sorter"
assert test_results[0].id.iteration_id == "1_0"
assert test_results[0].id.test_class_name is None
assert test_results[0].id.test_function_name == "test_sort"
assert (
test_results[0].id.test_module_path
== "code_to_optimize.tests.pytest.test_perfinjector_bubble_sort_results_temp"
)
assert test_results[0].runtime > 0
assert test_results[0].did_pass
assert test_results[0].return_value == ([0, 1, 2, 3, 4, 5],)
out_str = """codeflash stdout: Sorting list
result: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
"""
assert out_str == test_results[1].stdout
assert test_results[1].id.function_getting_tested == "sorter"
assert test_results[1].id.iteration_id == "4_0"
assert test_results[1].id.test_class_name is None
assert test_results[1].id.test_function_name == "test_sort"
assert (
test_results[1].id.test_module_path
== "code_to_optimize.tests.pytest.test_perfinjector_bubble_sort_results_temp"
)
assert test_results[1].runtime > 0
assert test_results[1].did_pass
out_str = """codeflash stdout: Sorting list
result: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
"""
assert test_results[1].stdout == out_str
results2, _ = func_optimizer.run_and_parse_tests(
testing_type=TestingMode.BEHAVIOR,
test_env=test_env,
test_files=func_optimizer.test_files,
optimization_iteration=0,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
out_str = """codeflash stdout: Sorting list
result: [0, 1, 2, 3, 4, 5]
"""
assert out_str == results2[0].stdout
match, _ = compare_test_results(test_results, results2)
assert match
finally:
fto_path.write_text(original_code, "utf-8")
test_path.unlink(missing_ok=True)
test_path_perf.unlink(missing_ok=True)
def test_method_full_instrumentation() -> None:
code = """from code_to_optimize.bubble_sort_method import BubbleSorter
def test_sort():
input = [5, 4, 3, 2, 1, 0]
sort_class = BubbleSorter()
output = sort_class.sorter(input)
assert output == [0, 1, 2, 3, 4, 5]
input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
sort_class = BubbleSorter()
output = sort_class.sorter(input)
assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]"""
expected = (
"""import gc
import inspect
import os
import sqlite3
import time
import dill as pickle
from code_to_optimize.bubble_sort_method import BubbleSorter
"""
+ codeflash_wrap_string
+ """
def test_sort():
codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX'])
codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION']
codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite')
codeflash_cur = codeflash_con.cursor()
codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB, verification_type TEXT)')
input = [5, 4, 3, 2, 1, 0]
sort_class = BubbleSorter()
_call__bound__arguments = inspect.signature(sort_class.sorter).bind(input)
_call__bound__arguments.apply_defaults()
output = codeflash_wrap(sort_class.sorter, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter', '2', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs)
assert output == [0, 1, 2, 3, 4, 5]
input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
sort_class = BubbleSorter()
_call__bound__arguments = inspect.signature(sort_class.sorter).bind(input)
_call__bound__arguments.apply_defaults()
output = codeflash_wrap(sort_class.sorter, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter', '6', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs)
assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
codeflash_con.close()
"""
)
fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/bubble_sort_method.py").resolve()
original_code = fto_path.read_text("utf-8")
fto = FunctionToOptimize(
function_name="sorter", parents=[FunctionParent(name="BubbleSorter", type="ClassDef")], file_path=Path(fto_path)
)
with tempfile.TemporaryDirectory() as tmpdirname:
tmp_test_path = Path(tmpdirname) / "test_class_method_behavior_results_temp.py"
tmp_test_path.write_text(code, encoding="utf-8")
success, new_test = inject_profiling_into_existing_test(
tmp_test_path, [CodePosition(7, 13), CodePosition(12, 13)], fto, tmp_test_path.parent
)
assert success
assert new_test.replace('"', "'") == expected.format(
module_path=tmp_test_path.stem, tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix()
).replace('"', "'")
tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve()
test_path = tests_root / "test_class_method_behavior_results_temp.py"
test_path_perf = tests_root / "test_class_method_behavior_results_perf_temp.py"
project_root_path = (Path(__file__).parent / "..").resolve()
try:
new_test = expected.format(
module_path="code_to_optimize.tests.pytest.test_class_method_behavior_results_temp",
tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix(),
)
with test_path.open("w") as f:
f.write(new_test)
# Add codeflash capture
instrument_codeflash_capture(fto, {}, tests_root)
opt = Optimizer(
Namespace(
project_root=project_root_path,
disable_telemetry=True,
tests_root=tests_root,
test_framework="pytest",
pytest_cmd="pytest",
experiment_id=None,
test_project_root=project_root_path,
)
)
test_env = os.environ.copy()
test_env["CODEFLASH_TEST_ITERATION"] = "0"
test_env["CODEFLASH_LOOP_INDEX"] = "1"
test_type = TestType.EXISTING_UNIT_TEST
func_optimizer = opt.create_function_optimizer(fto)
func_optimizer.test_files = TestFiles(
test_files=[
TestFile(
instrumented_behavior_file_path=test_path,
test_type=test_type,
original_file_path=test_path,
benchmarking_file_path=test_path_perf,
)
]
)
test_results, coverage_data = func_optimizer.run_and_parse_tests(
testing_type=TestingMode.BEHAVIOR,
test_env=test_env,
test_files=func_optimizer.test_files,
optimization_iteration=0,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
assert len(test_results) == 4
assert test_results[0].id.function_getting_tested == "BubbleSorter.__init__"
assert test_results[0].id.test_function_name == "test_sort"
assert test_results[0].did_pass
assert test_results[0].return_value[0] == {"x": 0}
assert test_results[1].id.function_getting_tested == "BubbleSorter.sorter"
assert test_results[1].id.iteration_id == "2_0"
assert test_results[1].id.test_class_name is None
assert test_results[1].id.test_function_name == "test_sort"
assert (
test_results[1].id.test_module_path
== "code_to_optimize.tests.pytest.test_class_method_behavior_results_temp"
)
assert test_results[1].runtime > 0
assert test_results[1].did_pass
assert test_results[1].return_value == ([0, 1, 2, 3, 4, 5],)
out_str = """codeflash stdout : BubbleSorter.sorter() called\n"""
assert test_results[1].stdout == out_str
match, _ = compare_test_results(test_results, test_results)
assert match
assert test_results[2].id.function_getting_tested == "BubbleSorter.__init__"
assert test_results[2].id.test_function_name == "test_sort"
assert test_results[2].did_pass
assert test_results[2].return_value[0] == {"x": 0}
assert test_results[3].id.function_getting_tested == "BubbleSorter.sorter"
assert test_results[3].id.iteration_id == "6_0"
assert test_results[3].id.test_class_name is None
assert test_results[3].id.test_function_name == "test_sort"
assert (
test_results[3].id.test_module_path
== "code_to_optimize.tests.pytest.test_class_method_behavior_results_temp"
)
assert test_results[3].runtime > 0
assert test_results[3].did_pass
assert test_results[3].stdout == """codeflash stdout : BubbleSorter.sorter() called\n"""
results2, _ = func_optimizer.run_and_parse_tests(
testing_type=TestingMode.BEHAVIOR,
test_env=test_env,
test_files=func_optimizer.test_files,
optimization_iteration=0,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
match, _ = compare_test_results(test_results, results2)
assert match
# Replace with optimized code that mutated instance attribute
optimized_code = """
class BubbleSorter:
def __init__(self, x=1):
self.x = x
def sorter(self, arr):
for i in range(len(arr)):
for j in range(len(arr) - 1):
if arr[j] > arr[j + 1]:
temp = arr[j]
arr[j] = arr[j + 1]
arr[j + 1] = temp
return arr
"""
fto_path.write_text(optimized_code, "utf-8")
# Force reload of module
import importlib
module_name = "code_to_optimize.bubble_sort_method"
if module_name not in sys.modules:
__import__(module_name)
importlib.reload(sys.modules[module_name])
# Add codeflash capture
instrument_codeflash_capture(fto, {}, tests_root)
opt = Optimizer(
Namespace(
project_root=project_root_path,
disable_telemetry=True,
tests_root=tests_root,
test_framework="pytest",
pytest_cmd="pytest",
experiment_id=None,
test_project_root=project_root_path,
)
)
func_optimizer = opt.create_function_optimizer(fto)
func_optimizer.test_files = TestFiles(
test_files=[
TestFile(
instrumented_behavior_file_path=test_path,
test_type=test_type,
original_file_path=test_path,
benchmarking_file_path=test_path_perf,
)
]
)
new_test_results, coverage_data = func_optimizer.run_and_parse_tests(
testing_type=TestingMode.BEHAVIOR,
test_env=test_env,
test_files=func_optimizer.test_files,
optimization_iteration=0,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
assert len(new_test_results) == 4
assert new_test_results[0].id.function_getting_tested == "BubbleSorter.__init__"
assert new_test_results[0].id.test_function_name == "test_sort"
assert new_test_results[0].did_pass
assert new_test_results[0].return_value[0] == {"x": 1}
assert new_test_results[1].id.function_getting_tested == "BubbleSorter.sorter"
assert new_test_results[1].id.iteration_id == "2_0"
assert new_test_results[1].id.test_class_name is None
assert new_test_results[1].id.test_function_name == "test_sort"
assert (
new_test_results[1].id.test_module_path
== "code_to_optimize.tests.pytest.test_class_method_behavior_results_temp"
)
assert new_test_results[1].runtime > 0
assert new_test_results[1].did_pass
assert new_test_results[1].return_value == ([0, 1, 2, 3, 4, 5],)
assert new_test_results[2].id.function_getting_tested == "BubbleSorter.__init__"
assert new_test_results[2].id.test_function_name == "test_sort"
assert new_test_results[2].did_pass
assert new_test_results[2].return_value[0] == {"x": 1}
assert new_test_results[3].id.function_getting_tested == "BubbleSorter.sorter"
assert new_test_results[3].id.iteration_id == "6_0"
assert new_test_results[3].id.test_class_name is None
assert new_test_results[3].id.test_function_name == "test_sort"
assert (
new_test_results[3].id.test_module_path
== "code_to_optimize.tests.pytest.test_class_method_behavior_results_temp"
)
assert new_test_results[3].runtime > 0
assert new_test_results[3].did_pass
match, _ = compare_test_results(test_results, new_test_results)
assert not match
finally:
fto_path.write_text(original_code, "utf-8")
test_path.unlink(missing_ok=True)
test_path_perf.unlink(missing_ok=True)
def test_classmethod_full_instrumentation() -> None:
code = """from code_to_optimize.bubble_sort_method import BubbleSorter
def test_sort():
input = [5, 4, 3, 2, 1, 0]
output = BubbleSorter.sorter_classmethod(input)
assert output == [0, 1, 2, 3, 4, 5]
input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
output = BubbleSorter.sorter_classmethod(input)
assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]"""
expected = (
"""import gc
import inspect
import os
import sqlite3
import time
import dill as pickle
from code_to_optimize.bubble_sort_method import BubbleSorter
"""
+ codeflash_wrap_string
+ """
def test_sort():
codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX'])
codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION']
codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite')
codeflash_cur = codeflash_con.cursor()
codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB, verification_type TEXT)')
input = [5, 4, 3, 2, 1, 0]
_call__bound__arguments = inspect.signature(BubbleSorter.sorter_classmethod).bind(input)
_call__bound__arguments.apply_defaults()
output = codeflash_wrap(BubbleSorter.sorter_classmethod, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter_classmethod', '1', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs)
assert output == [0, 1, 2, 3, 4, 5]
input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
_call__bound__arguments = inspect.signature(BubbleSorter.sorter_classmethod).bind(input)
_call__bound__arguments.apply_defaults()
output = codeflash_wrap(BubbleSorter.sorter_classmethod, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter_classmethod', '4', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs)
assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
codeflash_con.close()
"""
)
fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/bubble_sort_method.py").resolve()
original_code = fto_path.read_text("utf-8")
fto = FunctionToOptimize(
function_name="sorter_classmethod",
parents=[FunctionParent(name="BubbleSorter", type="ClassDef")],
file_path=Path(fto_path),
)
with tempfile.TemporaryDirectory() as tmpdirname:
tmp_test_path = Path(tmpdirname) / "test_classmethod_behavior_results_temp.py"
tmp_test_path.write_text(code, encoding="utf-8")
success, new_test = inject_profiling_into_existing_test(
tmp_test_path, [CodePosition(6, 13), CodePosition(10, 13)], fto, tmp_test_path.parent
)
assert success
assert new_test.replace('"', "'") == expected.format(
module_path=tmp_test_path.stem, tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix()
).replace('"', "'")
tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve()
test_path = tests_root / "test_classmethod_behavior_results_temp.py"
test_path_perf = tests_root / "test_classmethod_behavior_results_perf_temp.py"
project_root_path = (Path(__file__).parent / "..").resolve()
try:
new_test = expected.format(
module_path="code_to_optimize.tests.pytest.test_classmethod_behavior_results_temp",
tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix(),
)
with test_path.open("w") as f:
f.write(new_test)
# Add codeflash capture
instrument_codeflash_capture(fto, {}, tests_root)
opt = Optimizer(
Namespace(
project_root=project_root_path,
disable_telemetry=True,
tests_root=tests_root,
test_framework="pytest",
pytest_cmd="pytest",
experiment_id=None,
test_project_root=project_root_path,
)
)
test_env = os.environ.copy()
test_env["CODEFLASH_TEST_ITERATION"] = "0"
test_env["CODEFLASH_LOOP_INDEX"] = "1"
test_type = TestType.EXISTING_UNIT_TEST
func_optimizer = opt.create_function_optimizer(fto)
func_optimizer.test_files = TestFiles(
test_files=[
TestFile(
instrumented_behavior_file_path=test_path,
test_type=test_type,
original_file_path=test_path,
benchmarking_file_path=test_path_perf,
)
]
)
test_results, coverage_data = func_optimizer.run_and_parse_tests(
testing_type=TestingMode.BEHAVIOR,
test_env=test_env,
test_files=func_optimizer.test_files,
optimization_iteration=0,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
assert len(test_results) == 2
assert test_results[0].id.function_getting_tested == "BubbleSorter.sorter_classmethod"
assert test_results[0].id.iteration_id == "1_0"
assert test_results[0].id.test_class_name is None
assert test_results[0].id.test_function_name == "test_sort"
assert (
test_results[0].id.test_module_path
== "code_to_optimize.tests.pytest.test_classmethod_behavior_results_temp"
)
assert test_results[0].runtime > 0
assert test_results[0].did_pass
assert test_results[0].return_value == ([0, 1, 2, 3, 4, 5],)
out_str = """codeflash stdout : BubbleSorter.sorter_classmethod() called
"""
assert test_results[0].stdout == out_str
match, _ = compare_test_results(test_results, test_results)
assert match
assert test_results[1].id.function_getting_tested == "BubbleSorter.sorter_classmethod"
assert test_results[1].id.iteration_id == "4_0"
assert test_results[1].id.test_class_name is None
assert test_results[1].id.test_function_name == "test_sort"
assert (
test_results[1].id.test_module_path
== "code_to_optimize.tests.pytest.test_classmethod_behavior_results_temp"
)
assert test_results[1].runtime > 0
assert test_results[1].did_pass
assert (
test_results[1].stdout
== """codeflash stdout : BubbleSorter.sorter_classmethod() called
"""
)
results2, _ = func_optimizer.run_and_parse_tests(
testing_type=TestingMode.BEHAVIOR,
test_env=test_env,
test_files=func_optimizer.test_files,
optimization_iteration=0,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
match, _ = compare_test_results(test_results, results2)
assert match
finally:
fto_path.write_text(original_code, "utf-8")
test_path.unlink(missing_ok=True)
test_path_perf.unlink(missing_ok=True)
def test_staticmethod_full_instrumentation() -> None:
code = """from code_to_optimize.bubble_sort_method import BubbleSorter
def test_sort():
input = [5, 4, 3, 2, 1, 0]
output = BubbleSorter.sorter_staticmethod(input)
assert output == [0, 1, 2, 3, 4, 5]
input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
output = BubbleSorter.sorter_staticmethod(input)
assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]"""
expected = (
"""import gc
import inspect
import os
import sqlite3
import time
import dill as pickle
from code_to_optimize.bubble_sort_method import BubbleSorter
"""
+ codeflash_wrap_string
+ """
def test_sort():
codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX'])
codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION']
codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite')
codeflash_cur = codeflash_con.cursor()
codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB, verification_type TEXT)')
input = [5, 4, 3, 2, 1, 0]
_call__bound__arguments = inspect.signature(BubbleSorter.sorter_staticmethod).bind(input)
_call__bound__arguments.apply_defaults()
output = codeflash_wrap(BubbleSorter.sorter_staticmethod, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter_staticmethod', '1', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs)
assert output == [0, 1, 2, 3, 4, 5]
input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
_call__bound__arguments = inspect.signature(BubbleSorter.sorter_staticmethod).bind(input)
_call__bound__arguments.apply_defaults()
output = codeflash_wrap(BubbleSorter.sorter_staticmethod, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter_staticmethod', '4', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs)
assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
codeflash_con.close()
"""
)
fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/bubble_sort_method.py").resolve()
original_code = fto_path.read_text("utf-8")
fto = FunctionToOptimize(
function_name="sorter_staticmethod",
parents=[FunctionParent(name="BubbleSorter", type="ClassDef")],
file_path=Path(fto_path),
)
with tempfile.TemporaryDirectory() as tmpdirname:
tmp_test_path = Path(tmpdirname) / "test_staticmethod_behavior_results_temp.py"
tmp_test_path.write_text(code, encoding="utf-8")
success, new_test = inject_profiling_into_existing_test(
tmp_test_path, [CodePosition(6, 13), CodePosition(10, 13)], fto, tmp_test_path.parent
)
assert success
assert new_test.replace('"', "'") == expected.format(
module_path=tmp_test_path.stem, tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix()
).replace('"', "'")
tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve()
test_path = tests_root / "test_staticmethod_behavior_results_temp.py"
test_path_perf = tests_root / "test_staticmethod_behavior_results_perf_temp.py"
project_root_path = (Path(__file__).parent / "..").resolve()
try:
new_test = expected.format(
module_path="code_to_optimize.tests.pytest.test_staticmethod_behavior_results_temp",
tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix(),
)
with test_path.open("w") as f:
f.write(new_test)
# Add codeflash capture
instrument_codeflash_capture(fto, {}, tests_root)
opt = Optimizer(
Namespace(
project_root=project_root_path,
disable_telemetry=True,
tests_root=tests_root,
test_framework="pytest",
pytest_cmd="pytest",
experiment_id=None,
test_project_root=project_root_path,
)
)
test_env = os.environ.copy()
test_env["CODEFLASH_TEST_ITERATION"] = "0"
test_env["CODEFLASH_LOOP_INDEX"] = "1"
test_type = TestType.EXISTING_UNIT_TEST
func_optimizer = opt.create_function_optimizer(fto)
func_optimizer.test_files = TestFiles(
test_files=[
TestFile(
instrumented_behavior_file_path=test_path,
test_type=test_type,
original_file_path=test_path,
benchmarking_file_path=test_path_perf,
)
]
)
test_results, coverage_data = func_optimizer.run_and_parse_tests(
testing_type=TestingMode.BEHAVIOR,
test_env=test_env,
test_files=func_optimizer.test_files,
optimization_iteration=0,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
assert len(test_results) == 2
assert test_results[0].id.function_getting_tested == "BubbleSorter.sorter_staticmethod"
assert test_results[0].id.iteration_id == "1_0"
assert test_results[0].id.test_class_name is None
assert test_results[0].id.test_function_name == "test_sort"
assert (
test_results[0].id.test_module_path
== "code_to_optimize.tests.pytest.test_staticmethod_behavior_results_temp"
)
assert test_results[0].runtime > 0
assert test_results[0].did_pass
assert test_results[0].return_value == ([0, 1, 2, 3, 4, 5],)
out_str = """codeflash stdout : BubbleSorter.sorter_staticmethod() called
"""
assert test_results[0].stdout == out_str
match, _ = compare_test_results(test_results, test_results)
assert match
assert test_results[1].id.function_getting_tested == "BubbleSorter.sorter_staticmethod"
assert test_results[1].id.iteration_id == "4_0"
assert test_results[1].id.test_class_name is None
assert test_results[1].id.test_function_name == "test_sort"
assert (
test_results[1].id.test_module_path
== "code_to_optimize.tests.pytest.test_staticmethod_behavior_results_temp"
)
assert test_results[1].runtime > 0
assert test_results[1].did_pass
assert (
test_results[1].stdout
== """codeflash stdout : BubbleSorter.sorter_staticmethod() called
"""
)
results2, _ = func_optimizer.run_and_parse_tests(
testing_type=TestingMode.BEHAVIOR,
test_env=test_env,
test_files=func_optimizer.test_files,
optimization_iteration=0,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
match, _ = compare_test_results(test_results, results2)
assert match
finally:
fto_path.write_text(original_code, "utf-8")
test_path.unlink(missing_ok=True)
test_path_perf.unlink(missing_ok=True)