from __future__ import annotations import os import sys import tempfile from argparse import Namespace from pathlib import Path from codeflash.code_utils.code_utils import get_run_tmp_file from codeflash.code_utils.instrument_existing_tests import inject_profiling_into_existing_test from codeflash.discovery.functions_to_optimize import FunctionToOptimize from codeflash.models.models import CodePosition, FunctionParent, TestFile, TestFiles, TestingMode, TestType from codeflash.optimization.optimizer import Optimizer from codeflash.verification.equivalence import compare_test_results from codeflash.languages.python.instrument_codeflash_capture import instrument_codeflash_capture # Used by cli instrumentation codeflash_wrap_string = """def codeflash_wrap(codeflash_wrapped, codeflash_test_module_name, codeflash_test_class_name, codeflash_test_name, codeflash_function_name, codeflash_line_id, codeflash_loop_index, codeflash_cur, codeflash_con, *args, **kwargs): test_id = f'{{codeflash_test_module_name}}:{{codeflash_test_class_name}}:{{codeflash_test_name}}:{{codeflash_line_id}}:{{codeflash_loop_index}}' if not hasattr(codeflash_wrap, 'index'): codeflash_wrap.index = {{}} if test_id in codeflash_wrap.index: codeflash_wrap.index[test_id] += 1 else: codeflash_wrap.index[test_id] = 0 codeflash_test_index = codeflash_wrap.index[test_id] invocation_id = f'{{codeflash_line_id}}_{{codeflash_test_index}}' test_stdout_tag = f"{{codeflash_test_module_name}}:{{(codeflash_test_class_name + '.' if codeflash_test_class_name else '')}}{{codeflash_test_name}}:{{codeflash_function_name}}:{{codeflash_loop_index}}:{{invocation_id}}" print(f"!$######{{test_stdout_tag}}######$!") exception = None gc.disable() try: counter = time.perf_counter_ns() return_value = codeflash_wrapped(*args, **kwargs) codeflash_duration = time.perf_counter_ns() - counter except Exception as e: codeflash_duration = time.perf_counter_ns() - counter exception = e gc.enable() print(f"!######{{test_stdout_tag}}######!") pickled_return_value = pickle.dumps(exception) if exception else pickle.dumps(return_value) codeflash_cur.execute('INSERT INTO test_results VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', (codeflash_test_module_name, codeflash_test_class_name, codeflash_test_name, codeflash_function_name, codeflash_loop_index, invocation_id, codeflash_duration, pickled_return_value, 'function_call')) codeflash_con.commit() if exception: raise exception return return_value """ def test_bubble_sort_behavior_results() -> None: code = """from code_to_optimize.bubble_sort import sorter def test_sort(): input = [5, 4, 3, 2, 1, 0] output = sorter(input) assert output == [0, 1, 2, 3, 4, 5] input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0] output = sorter(input) assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]""" expected = ( """import gc import inspect import os import sqlite3 import time import dill as pickle from code_to_optimize.bubble_sort import sorter """ + codeflash_wrap_string + """ def test_sort(): codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX']) codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION'] codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite') codeflash_cur = codeflash_con.cursor() codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB, verification_type TEXT)') input = [5, 4, 3, 2, 1, 0] _call__bound__arguments = inspect.signature(sorter).bind(input) _call__bound__arguments.apply_defaults() output = codeflash_wrap(sorter, '{module_path}', None, 'test_sort', 'sorter', '1', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs) assert output == [0, 1, 2, 3, 4, 5] input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0] _call__bound__arguments = inspect.signature(sorter).bind(input) _call__bound__arguments.apply_defaults() output = codeflash_wrap(sorter, '{module_path}', None, 'test_sort', 'sorter', '4', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs) assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] codeflash_con.close() """ ) test_path = ( Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/test_perfinjector_bubble_sort_results_temp.py" ).resolve() test_path_perf = ( Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/test_perfinjector_bubble_sort_results_perf_temp.py" ).resolve() fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/bubble_sort.py").resolve() original_code = fto_path.read_text("utf-8") try: with test_path.open("w") as f: f.write(code) tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve() project_root_path = (Path(__file__).parent / "..").resolve() original_cwd = Path.cwd() run_cwd = Path(__file__).parent.parent.resolve() func = FunctionToOptimize(function_name="sorter", parents=[], file_path=Path(fto_path)) os.chdir(run_cwd) success, new_test = inject_profiling_into_existing_test( test_path, [CodePosition(6, 13), CodePosition(10, 13)], func, project_root_path, mode=TestingMode.BEHAVIOR ) os.chdir(original_cwd) assert success assert new_test is not None assert new_test.replace('"', "'") == expected.format( module_path="code_to_optimize.tests.pytest.test_perfinjector_bubble_sort_results_temp", tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix(), ).replace('"', "'") with test_path.open("w") as f: f.write(new_test) # add codeflash capture instrument_codeflash_capture(func, {}, tests_root) opt = Optimizer( Namespace( project_root=project_root_path, disable_telemetry=True, tests_root=tests_root, test_framework="pytest", pytest_cmd="pytest", experiment_id=None, test_project_root=project_root_path, ) ) test_env = os.environ.copy() test_env["CODEFLASH_TEST_ITERATION"] = "0" test_env["CODEFLASH_LOOP_INDEX"] = "1" test_type = TestType.EXISTING_UNIT_TEST func_optimizer = opt.create_function_optimizer(func) func_optimizer.test_files = TestFiles( test_files=[ TestFile( instrumented_behavior_file_path=test_path, test_type=test_type, original_file_path=test_path, benchmarking_file_path=test_path_perf, ) ] ) test_results, coverage_data = func_optimizer.run_and_parse_tests( testing_type=TestingMode.BEHAVIOR, test_env=test_env, test_files=func_optimizer.test_files, optimization_iteration=0, pytest_min_loops=1, pytest_max_loops=1, testing_time=0.1, ) out_str = """codeflash stdout: Sorting list result: [0, 1, 2, 3, 4, 5] """ assert test_results[0].stdout == out_str assert out_str == test_results[0].stdout assert test_results[0].id.function_getting_tested == "sorter" assert test_results[0].id.iteration_id == "1_0" assert test_results[0].id.test_class_name is None assert test_results[0].id.test_function_name == "test_sort" assert ( test_results[0].id.test_module_path == "code_to_optimize.tests.pytest.test_perfinjector_bubble_sort_results_temp" ) assert test_results[0].runtime > 0 assert test_results[0].did_pass assert test_results[0].return_value == ([0, 1, 2, 3, 4, 5],) out_str = """codeflash stdout: Sorting list result: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] """ assert out_str == test_results[1].stdout assert test_results[1].id.function_getting_tested == "sorter" assert test_results[1].id.iteration_id == "4_0" assert test_results[1].id.test_class_name is None assert test_results[1].id.test_function_name == "test_sort" assert ( test_results[1].id.test_module_path == "code_to_optimize.tests.pytest.test_perfinjector_bubble_sort_results_temp" ) assert test_results[1].runtime > 0 assert test_results[1].did_pass out_str = """codeflash stdout: Sorting list result: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] """ assert test_results[1].stdout == out_str results2, _ = func_optimizer.run_and_parse_tests( testing_type=TestingMode.BEHAVIOR, test_env=test_env, test_files=func_optimizer.test_files, optimization_iteration=0, pytest_min_loops=1, pytest_max_loops=1, testing_time=0.1, ) out_str = """codeflash stdout: Sorting list result: [0, 1, 2, 3, 4, 5] """ assert out_str == results2[0].stdout match, _ = compare_test_results(test_results, results2) assert match finally: fto_path.write_text(original_code, "utf-8") test_path.unlink(missing_ok=True) test_path_perf.unlink(missing_ok=True) def test_method_full_instrumentation() -> None: code = """from code_to_optimize.bubble_sort_method import BubbleSorter def test_sort(): input = [5, 4, 3, 2, 1, 0] sort_class = BubbleSorter() output = sort_class.sorter(input) assert output == [0, 1, 2, 3, 4, 5] input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0] sort_class = BubbleSorter() output = sort_class.sorter(input) assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]""" expected = ( """import gc import inspect import os import sqlite3 import time import dill as pickle from code_to_optimize.bubble_sort_method import BubbleSorter """ + codeflash_wrap_string + """ def test_sort(): codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX']) codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION'] codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite') codeflash_cur = codeflash_con.cursor() codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB, verification_type TEXT)') input = [5, 4, 3, 2, 1, 0] sort_class = BubbleSorter() _call__bound__arguments = inspect.signature(sort_class.sorter).bind(input) _call__bound__arguments.apply_defaults() output = codeflash_wrap(sort_class.sorter, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter', '2', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs) assert output == [0, 1, 2, 3, 4, 5] input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0] sort_class = BubbleSorter() _call__bound__arguments = inspect.signature(sort_class.sorter).bind(input) _call__bound__arguments.apply_defaults() output = codeflash_wrap(sort_class.sorter, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter', '6', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs) assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] codeflash_con.close() """ ) fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/bubble_sort_method.py").resolve() original_code = fto_path.read_text("utf-8") fto = FunctionToOptimize( function_name="sorter", parents=[FunctionParent(name="BubbleSorter", type="ClassDef")], file_path=Path(fto_path) ) with tempfile.TemporaryDirectory() as tmpdirname: tmp_test_path = Path(tmpdirname) / "test_class_method_behavior_results_temp.py" tmp_test_path.write_text(code, encoding="utf-8") success, new_test = inject_profiling_into_existing_test( tmp_test_path, [CodePosition(7, 13), CodePosition(12, 13)], fto, tmp_test_path.parent ) assert success assert new_test.replace('"', "'") == expected.format( module_path=tmp_test_path.stem, tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix() ).replace('"', "'") tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve() test_path = tests_root / "test_class_method_behavior_results_temp.py" test_path_perf = tests_root / "test_class_method_behavior_results_perf_temp.py" project_root_path = (Path(__file__).parent / "..").resolve() try: new_test = expected.format( module_path="code_to_optimize.tests.pytest.test_class_method_behavior_results_temp", tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix(), ) with test_path.open("w") as f: f.write(new_test) # Add codeflash capture instrument_codeflash_capture(fto, {}, tests_root) opt = Optimizer( Namespace( project_root=project_root_path, disable_telemetry=True, tests_root=tests_root, test_framework="pytest", pytest_cmd="pytest", experiment_id=None, test_project_root=project_root_path, ) ) test_env = os.environ.copy() test_env["CODEFLASH_TEST_ITERATION"] = "0" test_env["CODEFLASH_LOOP_INDEX"] = "1" test_type = TestType.EXISTING_UNIT_TEST func_optimizer = opt.create_function_optimizer(fto) func_optimizer.test_files = TestFiles( test_files=[ TestFile( instrumented_behavior_file_path=test_path, test_type=test_type, original_file_path=test_path, benchmarking_file_path=test_path_perf, ) ] ) test_results, coverage_data = func_optimizer.run_and_parse_tests( testing_type=TestingMode.BEHAVIOR, test_env=test_env, test_files=func_optimizer.test_files, optimization_iteration=0, pytest_min_loops=1, pytest_max_loops=1, testing_time=0.1, ) assert len(test_results) == 4 assert test_results[0].id.function_getting_tested == "BubbleSorter.__init__" assert test_results[0].id.test_function_name == "test_sort" assert test_results[0].did_pass assert test_results[0].return_value[0] == {"x": 0} assert test_results[1].id.function_getting_tested == "BubbleSorter.sorter" assert test_results[1].id.iteration_id == "2_0" assert test_results[1].id.test_class_name is None assert test_results[1].id.test_function_name == "test_sort" assert ( test_results[1].id.test_module_path == "code_to_optimize.tests.pytest.test_class_method_behavior_results_temp" ) assert test_results[1].runtime > 0 assert test_results[1].did_pass assert test_results[1].return_value == ([0, 1, 2, 3, 4, 5],) out_str = """codeflash stdout : BubbleSorter.sorter() called\n""" assert test_results[1].stdout == out_str match, _ = compare_test_results(test_results, test_results) assert match assert test_results[2].id.function_getting_tested == "BubbleSorter.__init__" assert test_results[2].id.test_function_name == "test_sort" assert test_results[2].did_pass assert test_results[2].return_value[0] == {"x": 0} assert test_results[3].id.function_getting_tested == "BubbleSorter.sorter" assert test_results[3].id.iteration_id == "6_0" assert test_results[3].id.test_class_name is None assert test_results[3].id.test_function_name == "test_sort" assert ( test_results[3].id.test_module_path == "code_to_optimize.tests.pytest.test_class_method_behavior_results_temp" ) assert test_results[3].runtime > 0 assert test_results[3].did_pass assert test_results[3].stdout == """codeflash stdout : BubbleSorter.sorter() called\n""" results2, _ = func_optimizer.run_and_parse_tests( testing_type=TestingMode.BEHAVIOR, test_env=test_env, test_files=func_optimizer.test_files, optimization_iteration=0, pytest_min_loops=1, pytest_max_loops=1, testing_time=0.1, ) match, _ = compare_test_results(test_results, results2) assert match # Replace with optimized code that mutated instance attribute optimized_code = """ class BubbleSorter: def __init__(self, x=1): self.x = x def sorter(self, arr): for i in range(len(arr)): for j in range(len(arr) - 1): if arr[j] > arr[j + 1]: temp = arr[j] arr[j] = arr[j + 1] arr[j + 1] = temp return arr """ fto_path.write_text(optimized_code, "utf-8") # Force reload of module import importlib module_name = "code_to_optimize.bubble_sort_method" if module_name not in sys.modules: __import__(module_name) importlib.reload(sys.modules[module_name]) # Add codeflash capture instrument_codeflash_capture(fto, {}, tests_root) opt = Optimizer( Namespace( project_root=project_root_path, disable_telemetry=True, tests_root=tests_root, test_framework="pytest", pytest_cmd="pytest", experiment_id=None, test_project_root=project_root_path, ) ) func_optimizer = opt.create_function_optimizer(fto) func_optimizer.test_files = TestFiles( test_files=[ TestFile( instrumented_behavior_file_path=test_path, test_type=test_type, original_file_path=test_path, benchmarking_file_path=test_path_perf, ) ] ) new_test_results, coverage_data = func_optimizer.run_and_parse_tests( testing_type=TestingMode.BEHAVIOR, test_env=test_env, test_files=func_optimizer.test_files, optimization_iteration=0, pytest_min_loops=1, pytest_max_loops=1, testing_time=0.1, ) assert len(new_test_results) == 4 assert new_test_results[0].id.function_getting_tested == "BubbleSorter.__init__" assert new_test_results[0].id.test_function_name == "test_sort" assert new_test_results[0].did_pass assert new_test_results[0].return_value[0] == {"x": 1} assert new_test_results[1].id.function_getting_tested == "BubbleSorter.sorter" assert new_test_results[1].id.iteration_id == "2_0" assert new_test_results[1].id.test_class_name is None assert new_test_results[1].id.test_function_name == "test_sort" assert ( new_test_results[1].id.test_module_path == "code_to_optimize.tests.pytest.test_class_method_behavior_results_temp" ) assert new_test_results[1].runtime > 0 assert new_test_results[1].did_pass assert new_test_results[1].return_value == ([0, 1, 2, 3, 4, 5],) assert new_test_results[2].id.function_getting_tested == "BubbleSorter.__init__" assert new_test_results[2].id.test_function_name == "test_sort" assert new_test_results[2].did_pass assert new_test_results[2].return_value[0] == {"x": 1} assert new_test_results[3].id.function_getting_tested == "BubbleSorter.sorter" assert new_test_results[3].id.iteration_id == "6_0" assert new_test_results[3].id.test_class_name is None assert new_test_results[3].id.test_function_name == "test_sort" assert ( new_test_results[3].id.test_module_path == "code_to_optimize.tests.pytest.test_class_method_behavior_results_temp" ) assert new_test_results[3].runtime > 0 assert new_test_results[3].did_pass match, _ = compare_test_results(test_results, new_test_results) assert not match finally: fto_path.write_text(original_code, "utf-8") test_path.unlink(missing_ok=True) test_path_perf.unlink(missing_ok=True) def test_classmethod_full_instrumentation() -> None: code = """from code_to_optimize.bubble_sort_method import BubbleSorter def test_sort(): input = [5, 4, 3, 2, 1, 0] output = BubbleSorter.sorter_classmethod(input) assert output == [0, 1, 2, 3, 4, 5] input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0] output = BubbleSorter.sorter_classmethod(input) assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]""" expected = ( """import gc import inspect import os import sqlite3 import time import dill as pickle from code_to_optimize.bubble_sort_method import BubbleSorter """ + codeflash_wrap_string + """ def test_sort(): codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX']) codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION'] codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite') codeflash_cur = codeflash_con.cursor() codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB, verification_type TEXT)') input = [5, 4, 3, 2, 1, 0] _call__bound__arguments = inspect.signature(BubbleSorter.sorter_classmethod).bind(input) _call__bound__arguments.apply_defaults() output = codeflash_wrap(BubbleSorter.sorter_classmethod, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter_classmethod', '1', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs) assert output == [0, 1, 2, 3, 4, 5] input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0] _call__bound__arguments = inspect.signature(BubbleSorter.sorter_classmethod).bind(input) _call__bound__arguments.apply_defaults() output = codeflash_wrap(BubbleSorter.sorter_classmethod, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter_classmethod', '4', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs) assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] codeflash_con.close() """ ) fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/bubble_sort_method.py").resolve() original_code = fto_path.read_text("utf-8") fto = FunctionToOptimize( function_name="sorter_classmethod", parents=[FunctionParent(name="BubbleSorter", type="ClassDef")], file_path=Path(fto_path), ) with tempfile.TemporaryDirectory() as tmpdirname: tmp_test_path = Path(tmpdirname) / "test_classmethod_behavior_results_temp.py" tmp_test_path.write_text(code, encoding="utf-8") success, new_test = inject_profiling_into_existing_test( tmp_test_path, [CodePosition(6, 13), CodePosition(10, 13)], fto, tmp_test_path.parent ) assert success assert new_test.replace('"', "'") == expected.format( module_path=tmp_test_path.stem, tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix() ).replace('"', "'") tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve() test_path = tests_root / "test_classmethod_behavior_results_temp.py" test_path_perf = tests_root / "test_classmethod_behavior_results_perf_temp.py" project_root_path = (Path(__file__).parent / "..").resolve() try: new_test = expected.format( module_path="code_to_optimize.tests.pytest.test_classmethod_behavior_results_temp", tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix(), ) with test_path.open("w") as f: f.write(new_test) # Add codeflash capture instrument_codeflash_capture(fto, {}, tests_root) opt = Optimizer( Namespace( project_root=project_root_path, disable_telemetry=True, tests_root=tests_root, test_framework="pytest", pytest_cmd="pytest", experiment_id=None, test_project_root=project_root_path, ) ) test_env = os.environ.copy() test_env["CODEFLASH_TEST_ITERATION"] = "0" test_env["CODEFLASH_LOOP_INDEX"] = "1" test_type = TestType.EXISTING_UNIT_TEST func_optimizer = opt.create_function_optimizer(fto) func_optimizer.test_files = TestFiles( test_files=[ TestFile( instrumented_behavior_file_path=test_path, test_type=test_type, original_file_path=test_path, benchmarking_file_path=test_path_perf, ) ] ) test_results, coverage_data = func_optimizer.run_and_parse_tests( testing_type=TestingMode.BEHAVIOR, test_env=test_env, test_files=func_optimizer.test_files, optimization_iteration=0, pytest_min_loops=1, pytest_max_loops=1, testing_time=0.1, ) assert len(test_results) == 2 assert test_results[0].id.function_getting_tested == "BubbleSorter.sorter_classmethod" assert test_results[0].id.iteration_id == "1_0" assert test_results[0].id.test_class_name is None assert test_results[0].id.test_function_name == "test_sort" assert ( test_results[0].id.test_module_path == "code_to_optimize.tests.pytest.test_classmethod_behavior_results_temp" ) assert test_results[0].runtime > 0 assert test_results[0].did_pass assert test_results[0].return_value == ([0, 1, 2, 3, 4, 5],) out_str = """codeflash stdout : BubbleSorter.sorter_classmethod() called """ assert test_results[0].stdout == out_str match, _ = compare_test_results(test_results, test_results) assert match assert test_results[1].id.function_getting_tested == "BubbleSorter.sorter_classmethod" assert test_results[1].id.iteration_id == "4_0" assert test_results[1].id.test_class_name is None assert test_results[1].id.test_function_name == "test_sort" assert ( test_results[1].id.test_module_path == "code_to_optimize.tests.pytest.test_classmethod_behavior_results_temp" ) assert test_results[1].runtime > 0 assert test_results[1].did_pass assert ( test_results[1].stdout == """codeflash stdout : BubbleSorter.sorter_classmethod() called """ ) results2, _ = func_optimizer.run_and_parse_tests( testing_type=TestingMode.BEHAVIOR, test_env=test_env, test_files=func_optimizer.test_files, optimization_iteration=0, pytest_min_loops=1, pytest_max_loops=1, testing_time=0.1, ) match, _ = compare_test_results(test_results, results2) assert match finally: fto_path.write_text(original_code, "utf-8") test_path.unlink(missing_ok=True) test_path_perf.unlink(missing_ok=True) def test_staticmethod_full_instrumentation() -> None: code = """from code_to_optimize.bubble_sort_method import BubbleSorter def test_sort(): input = [5, 4, 3, 2, 1, 0] output = BubbleSorter.sorter_staticmethod(input) assert output == [0, 1, 2, 3, 4, 5] input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0] output = BubbleSorter.sorter_staticmethod(input) assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]""" expected = ( """import gc import inspect import os import sqlite3 import time import dill as pickle from code_to_optimize.bubble_sort_method import BubbleSorter """ + codeflash_wrap_string + """ def test_sort(): codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX']) codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION'] codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite') codeflash_cur = codeflash_con.cursor() codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB, verification_type TEXT)') input = [5, 4, 3, 2, 1, 0] _call__bound__arguments = inspect.signature(BubbleSorter.sorter_staticmethod).bind(input) _call__bound__arguments.apply_defaults() output = codeflash_wrap(BubbleSorter.sorter_staticmethod, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter_staticmethod', '1', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs) assert output == [0, 1, 2, 3, 4, 5] input = [5.0, 4.0, 3.0, 2.0, 1.0, 0.0] _call__bound__arguments = inspect.signature(BubbleSorter.sorter_staticmethod).bind(input) _call__bound__arguments.apply_defaults() output = codeflash_wrap(BubbleSorter.sorter_staticmethod, '{module_path}', None, 'test_sort', 'BubbleSorter.sorter_staticmethod', '4', codeflash_loop_index, codeflash_cur, codeflash_con, *_call__bound__arguments.args, **_call__bound__arguments.kwargs) assert output == [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] codeflash_con.close() """ ) fto_path = (Path(__file__).parent.resolve() / "../code_to_optimize/bubble_sort_method.py").resolve() original_code = fto_path.read_text("utf-8") fto = FunctionToOptimize( function_name="sorter_staticmethod", parents=[FunctionParent(name="BubbleSorter", type="ClassDef")], file_path=Path(fto_path), ) with tempfile.TemporaryDirectory() as tmpdirname: tmp_test_path = Path(tmpdirname) / "test_staticmethod_behavior_results_temp.py" tmp_test_path.write_text(code, encoding="utf-8") success, new_test = inject_profiling_into_existing_test( tmp_test_path, [CodePosition(6, 13), CodePosition(10, 13)], fto, tmp_test_path.parent ) assert success assert new_test.replace('"', "'") == expected.format( module_path=tmp_test_path.stem, tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix() ).replace('"', "'") tests_root = (Path(__file__).parent.resolve() / "../code_to_optimize/tests/pytest/").resolve() test_path = tests_root / "test_staticmethod_behavior_results_temp.py" test_path_perf = tests_root / "test_staticmethod_behavior_results_perf_temp.py" project_root_path = (Path(__file__).parent / "..").resolve() try: new_test = expected.format( module_path="code_to_optimize.tests.pytest.test_staticmethod_behavior_results_temp", tmp_dir_path=get_run_tmp_file(Path("test_return_values")).as_posix(), ) with test_path.open("w") as f: f.write(new_test) # Add codeflash capture instrument_codeflash_capture(fto, {}, tests_root) opt = Optimizer( Namespace( project_root=project_root_path, disable_telemetry=True, tests_root=tests_root, test_framework="pytest", pytest_cmd="pytest", experiment_id=None, test_project_root=project_root_path, ) ) test_env = os.environ.copy() test_env["CODEFLASH_TEST_ITERATION"] = "0" test_env["CODEFLASH_LOOP_INDEX"] = "1" test_type = TestType.EXISTING_UNIT_TEST func_optimizer = opt.create_function_optimizer(fto) func_optimizer.test_files = TestFiles( test_files=[ TestFile( instrumented_behavior_file_path=test_path, test_type=test_type, original_file_path=test_path, benchmarking_file_path=test_path_perf, ) ] ) test_results, coverage_data = func_optimizer.run_and_parse_tests( testing_type=TestingMode.BEHAVIOR, test_env=test_env, test_files=func_optimizer.test_files, optimization_iteration=0, pytest_min_loops=1, pytest_max_loops=1, testing_time=0.1, ) assert len(test_results) == 2 assert test_results[0].id.function_getting_tested == "BubbleSorter.sorter_staticmethod" assert test_results[0].id.iteration_id == "1_0" assert test_results[0].id.test_class_name is None assert test_results[0].id.test_function_name == "test_sort" assert ( test_results[0].id.test_module_path == "code_to_optimize.tests.pytest.test_staticmethod_behavior_results_temp" ) assert test_results[0].runtime > 0 assert test_results[0].did_pass assert test_results[0].return_value == ([0, 1, 2, 3, 4, 5],) out_str = """codeflash stdout : BubbleSorter.sorter_staticmethod() called """ assert test_results[0].stdout == out_str match, _ = compare_test_results(test_results, test_results) assert match assert test_results[1].id.function_getting_tested == "BubbleSorter.sorter_staticmethod" assert test_results[1].id.iteration_id == "4_0" assert test_results[1].id.test_class_name is None assert test_results[1].id.test_function_name == "test_sort" assert ( test_results[1].id.test_module_path == "code_to_optimize.tests.pytest.test_staticmethod_behavior_results_temp" ) assert test_results[1].runtime > 0 assert test_results[1].did_pass assert ( test_results[1].stdout == """codeflash stdout : BubbleSorter.sorter_staticmethod() called """ ) results2, _ = func_optimizer.run_and_parse_tests( testing_type=TestingMode.BEHAVIOR, test_env=test_env, test_files=func_optimizer.test_files, optimization_iteration=0, pytest_min_loops=1, pytest_max_loops=1, testing_time=0.1, ) match, _ = compare_test_results(test_results, results2) assert match finally: fto_path.write_text(original_code, "utf-8") test_path.unlink(missing_ok=True) test_path_perf.unlink(missing_ok=True)