Fix cli tests

This commit is contained in:
Saurabh Misra 2024-10-23 14:38:23 -07:00
parent ed03f7a7a6
commit 853ec2c934
3 changed files with 45 additions and 220 deletions

View file

@ -1,8 +1,10 @@
import math
import time
def sleepfunc_sequence(t) -> float:
output = t / 100
time.sleep(output)
return output
def accurate_sleepfunc(t) -> float:
"""T is in seconds"""
start_time = time.perf_counter_ns()
while True:
if (time.perf_counter_ns() - start_time) / 10e9 >= t:
break
return t

View file

@ -173,6 +173,11 @@ def parse_test_xml(
# This means that the test failed to load, so we don't want to crash on it
logger.info("Test failed to load, skipping it.")
if run_result is not None:
if isinstance(run_result.stdout, str) and isinstance(run_result.stderr, str):
logger.info(
f"Test log - STDOUT : {run_result.stdout} \n STDERR : {run_result.stderr}",
)
else:
logger.info(
f"Test log - STDOUT : {run_result.stdout.decode()} \n STDERR : {run_result.stderr.decode()}",
)

View file

@ -2220,27 +2220,18 @@ def test_code_replacement10() -> None:
def test_time_correction_instrumentation() -> None:
code = """from code_to_optimize.sleeptime import sleepfunc_sequence
code = """from code_to_optimize.sleeptime import accurate_sleepfunc
import pytest
@pytest.mark.parametrize("n, expected_total_sleep_time", [
(1, 0.010),
(2, 0.020),
(3, 0.030),
(4, 0.040),
(0.01, 0.010),
(0.02, 0.020),
(0.03, 0.030),
(0.04, 0.040),
])
def test_sleepfunc_sequence_short(n, expected_total_sleep_time):
output = sleepfunc_sequence(n)
output = accurate_sleepfunc(n)
assert output == expected_total_sleep_time
@pytest.mark.parametrize("n, expected_total_sleep_time", [
(10, 0.10),
(20, 0.20),
(30, 0.30),
(40, 0.40),
])
def test_sleepfunc_sequence_long(n, expected_total_sleep_time):
output = sleepfunc_sequence(n)
assert output == expected_total_sleep_time
"""
expected = """import gc
@ -2251,7 +2242,7 @@ import time
import dill as pickle
import pytest
from code_to_optimize.sleeptime import sleepfunc_sequence
from code_to_optimize.sleeptime import accurate_sleepfunc
def codeflash_wrap(wrapped, test_module_name, test_class_name, test_name, function_name, line_id, loop_index, codeflash_cur, codeflash_con, *args, **kwargs):
@ -2278,25 +2269,14 @@ def codeflash_wrap(wrapped, test_module_name, test_class_name, test_name, functi
codeflash_con.commit()
return return_value
@pytest.mark.parametrize('n, expected_total_sleep_time', [(1, 0.01), (2, 0.02), (3, 0.03), (4, 0.04)])
@pytest.mark.parametrize('n, expected_total_sleep_time', [(0.01, 0.01), (0.02, 0.02), (0.03, 0.03), (0.04, 0.04)])
def test_sleepfunc_sequence_short(n, expected_total_sleep_time):
codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION']
codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX'])
codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite')
codeflash_cur = codeflash_con.cursor()
codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB)')
output = codeflash_wrap(sleepfunc_sequence, '{module_path}', None, 'test_sleepfunc_sequence_short', 'sleepfunc_sequence', '0', codeflash_loop_index, codeflash_cur, codeflash_con, n)
assert output == expected_total_sleep_time
codeflash_con.close()
@pytest.mark.parametrize('n, expected_total_sleep_time', [(10, 0.1), (20, 0.2), (30, 0.3), (40, 0.4)])
def test_sleepfunc_sequence_long(n, expected_total_sleep_time):
codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION']
codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX'])
codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite')
codeflash_cur = codeflash_con.cursor()
codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB)')
output = codeflash_wrap(sleepfunc_sequence, '{module_path}', None, 'test_sleepfunc_sequence_long', 'sleepfunc_sequence', '0', codeflash_loop_index, codeflash_cur, codeflash_con, n)
output = codeflash_wrap(accurate_sleepfunc, '{module_path}', None, 'test_sleepfunc_sequence_short', 'accurate_sleepfunc', '0', codeflash_loop_index, codeflash_cur, codeflash_con, n)
assert output == expected_total_sleep_time
codeflash_con.close()
"""
@ -2314,14 +2294,14 @@ def test_sleepfunc_sequence_long(n, expected_total_sleep_time):
original_cwd = Path.cwd()
run_cwd = Path(__file__).parent.parent.resolve()
func = FunctionToOptimize(
function_name="sleepfunc_sequence",
function_name="accurate_sleepfunc",
parents=[],
file_path=Path("module.py"),
)
os.chdir(run_cwd)
success, new_test = inject_profiling_into_existing_test(
test_path,
[CodePosition(10, 13), CodePosition(20, 13)],
[CodePosition(10, 13)],
func,
project_root_path,
"pytest",
@ -2371,8 +2351,7 @@ def test_sleepfunc_sequence_long(n, expected_total_sleep_time):
testing_time=0.1,
)
# random validations for successful excution of the test
assert test_results[0].id.function_getting_tested == "sleepfunc_sequence"
assert test_results[0].id.function_getting_tested == "accurate_sleepfunc"
assert test_results[0].id.iteration_id == "0_0"
assert test_results[0].id.test_class_name is None
assert test_results[0].id.test_function_name == "test_sleepfunc_sequence_short"
@ -2381,161 +2360,29 @@ def test_sleepfunc_sequence_long(n, expected_total_sleep_time):
== "code_to_optimize.tests.pytest.test_time_correction_instrumentation_temp"
)
assert test_results[4].did_pass
total_passed_runtime: int = 0
# time validation with 10% for test suite 1 and 1% tolerance for test suite 2, i.e. ~0.001 to ~0.004 seconds
assert len(test_results) == 12
for i, test_result in enumerate(test_results):
total_passed_runtime += test_result.runtime or 0 # Defaults to 0 if runtime is None
expected_runtime = (i % 8 + 1) * 1e7 if (i % 8) < 4 else (i % 8 - 3) * 1e8
rel_tolerance = 1e-1 if i < 4 else 9e-2
is_close = math.isclose(test_result.runtime, expected_runtime, rel_tol=rel_tolerance)
assert is_close, f"Test {i} failed: runtime {test_result.runtime} not within tolerance of {expected_runtime} with rel_tol={rel_tolerance}"
# Validate total_passed_runtime
expected_total = (sum((i + 1) * 1e7 for i in range(4)) + sum((i - 3) * 1e8 for i in range(4, 8))) * 3
assert math.isclose(
total_passed_runtime,
expected_total,
rel_tol=1e-2,
), f"Total passed runtime: {total_passed_runtime} does not match the expected: {expected_total} sum."
assert test_result.did_pass
assert math.isclose(test_result.runtime, ((i % 4) + 1) * 100_000_000, rel_tol=0.01)
finally:
test_path.unlink(missing_ok=True)
def test_time_correction_only_replay_test() -> None:
code = """import dill as pickle
import pytest
from codeflash.tracing.replay_test import get_next_arg_and_return
from codeflash.validation.equivalence import compare_results
from code_to_optimize.sleeptime import sleepfunc_sequence
@pytest.mark.parametrize("n, expected_total_sleep_time", [
(1, 0.010),
(2, 0.020),
(3, 0.030),
(4, 0.040),
])
def test_sleepfunc_sequence_short(n, expected_total_sleep_time):
for arg_val_pkl, return_val_pkl in get_next_arg_and_return('/home/saurabh/packagename/traces/first.trace', 3):
args = pickle.loads(arg_val_pkl)
return_val_1= pickle.loads(return_val_pkl)
ret = sleepfunc_sequence(**args)
assert compare_results(return_val_1, ret)
"""
expected = """import gc
import os
import sqlite3
import time
import dill as pickle
import pytest
from code_to_optimize.sleeptime import sleepfunc_sequence
from codeflash.tracing.replay_test import get_next_arg_and_return
from codeflash.validation.equivalence import compare_results
def codeflash_wrap(wrapped, test_module_name, test_class_name, test_name, function_name, line_id, loop_index, codeflash_cur, codeflash_con, *args, **kwargs):
test_id = f'{{test_module_name}}:{{test_class_name}}:{{test_name}}:{{line_id}}:{{loop_index}}'
if not hasattr(codeflash_wrap, 'index'):
codeflash_wrap.index = {{}}
if test_id in codeflash_wrap.index:
codeflash_wrap.index[test_id] += 1
else:
codeflash_wrap.index[test_id] = 0
codeflash_test_index = codeflash_wrap.index[test_id]
invocation_id = f'{{line_id}}_{{codeflash_test_index}}'
"""
if sys.version_info < (3, 12):
expected += """print(f"!######{{test_module_name}}:{{(test_class_name + '.' if test_class_name else '')}}{{test_name}}:{{function_name}}:{{loop_index}}:{{invocation_id}}######!")"""
else:
expected += """print(f'!######{{test_module_name}}:{{(test_class_name + '.' if test_class_name else '')}}{{test_name}}:{{function_name}}:{{loop_index}}:{{invocation_id}}######!')"""
expected += """
gc.disable()
counter = time.perf_counter_ns()
return_value = wrapped(*args, **kwargs)
codeflash_duration = time.perf_counter_ns() - counter
gc.enable()
if loop_index == 1:
pickled_return_value = pickle.dumps(return_value)
else:
pickled_return_value = None
codeflash_cur.execute('INSERT INTO test_results VALUES (?, ?, ?, ?, ?, ?, ?, ?)', (test_module_name, test_class_name, test_name, function_name, loop_index, invocation_id, codeflash_duration, pickled_return_value))
codeflash_con.commit()
return return_value
@pytest.mark.parametrize('n, expected_total_sleep_time', [(1, 0.01), (2, 0.02), (3, 0.03), (4, 0.04)])
def test_sleepfunc_sequence_short(n, expected_total_sleep_time):
codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION']
codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX'])
codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite')
codeflash_cur = codeflash_con.cursor()
codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB)')
"""
if sys.version_info < (3, 11):
expected += """ for (arg_val_pkl, return_val_pkl) in get_next_arg_and_return('/home/saurabh/packagename/traces/first.trace', 3):
"""
else:
expected += """ for arg_val_pkl, return_val_pkl in get_next_arg_and_return('/home/saurabh/packagename/traces/first.trace', 3):
"""
expected += """ args = pickle.loads(arg_val_pkl)
return_val_1 = pickle.loads(return_val_pkl)
ret = codeflash_wrap(sleepfunc_sequence, '{module_path}', None, 'test_sleepfunc_sequence_short', 'sleepfunc_sequence', '0_2', codeflash_loop_index, codeflash_cur, codeflash_con, **args)
assert compare_results(return_val_1, ret)
codeflash_con.close()
"""
with tempfile.NamedTemporaryFile(mode="w") as f:
f.write(code)
f.flush()
func = FunctionToOptimize(
function_name="sleepfunc_sequence",
parents=[],
file_path=Path("module.py"),
)
original_cwd = Path.cwd()
run_cwd = Path(__file__).parent.parent.resolve()
os.chdir(run_cwd)
success, new_test = inject_profiling_into_existing_test(
Path(f.name),
[CodePosition(16, 15)],
func,
Path(f.name).parent,
"pytest",
)
os.chdir(original_cwd)
assert success
assert new_test == expected.format(
module_path=Path(f.name).name,
tmp_dir_path=get_run_tmp_file(Path("test_return_values")),
)
def test_time_correction_instrumentation_unittest() -> None:
code = """import unittest
from parameterized import parameterized
from code_to_optimize.sleeptime import sleepfunc_sequence
from code_to_optimize.sleeptime import accurate_sleepfunc
class TestPigLatin(unittest.TestCase):
@parameterized.expand([
(1, 0.010),
(2, 0.020),
(3, 0.030),
(4, 0.040),
(0.01, 0.010),
(0.02, 0.020),
])
def test_sleepfunc_sequence_short(self, n, expected_total_sleep_time):
output = sleepfunc_sequence(n)
self.assertEqual(n, expected_total_sleep_time)
@parameterized.expand([
(10, 0.10),
(20, 0.20),
(30, 0.30),
(40, 0.40),
])
def test_sleepfunc_sequence_slightlong(self, n, expected_total_sleep_time):
output = sleepfunc_sequence(n)
self.assertEqual(n, expected_total_sleep_time)
output = accurate_sleepfunc(n)
"""
expected = """import gc
@ -2548,7 +2395,7 @@ import dill as pickle
import timeout_decorator
from parameterized import parameterized
from code_to_optimize.sleeptime import sleepfunc_sequence
from code_to_optimize.sleeptime import accurate_sleepfunc
def codeflash_wrap(wrapped, test_module_name, test_class_name, test_name, function_name, line_id, loop_index, codeflash_cur, codeflash_con, *args, **kwargs):
@ -2577,7 +2424,7 @@ def codeflash_wrap(wrapped, test_module_name, test_class_name, test_name, functi
class TestPigLatin(unittest.TestCase):
@parameterized.expand([(1, 0.01), (2, 0.02), (3, 0.03), (4, 0.04)])
@parameterized.expand([(0.01, 0.01), (0.02, 0.02)])
@timeout_decorator.timeout(15)
def test_sleepfunc_sequence_short(self, n, expected_total_sleep_time):
codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION']
@ -2585,20 +2432,7 @@ class TestPigLatin(unittest.TestCase):
codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite')
codeflash_cur = codeflash_con.cursor()
codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB)')
output = codeflash_wrap(sleepfunc_sequence, '{module_path}', 'TestPigLatin', 'test_sleepfunc_sequence_short', 'sleepfunc_sequence', '0', codeflash_loop_index, codeflash_cur, codeflash_con, n)
self.assertEqual(n, expected_total_sleep_time)
codeflash_con.close()
@parameterized.expand([(10, 0.1), (20, 0.2), (30, 0.3), (40, 0.4)])
@timeout_decorator.timeout(15)
def test_sleepfunc_sequence_slightlong(self, n, expected_total_sleep_time):
codeflash_iteration = os.environ['CODEFLASH_TEST_ITERATION']
codeflash_loop_index = int(os.environ['CODEFLASH_LOOP_INDEX'])
codeflash_con = sqlite3.connect(f'{tmp_dir_path}_{{codeflash_iteration}}.sqlite')
codeflash_cur = codeflash_con.cursor()
codeflash_cur.execute('CREATE TABLE IF NOT EXISTS test_results (test_module_path TEXT, test_class_name TEXT, test_function_name TEXT, function_getting_tested TEXT, loop_index INTEGER, iteration_id TEXT, runtime INTEGER, return_value BLOB)')
output = codeflash_wrap(sleepfunc_sequence, '{module_path}', 'TestPigLatin', 'test_sleepfunc_sequence_slightlong', 'sleepfunc_sequence', '0', codeflash_loop_index, codeflash_cur, codeflash_con, n)
self.assertEqual(n, expected_total_sleep_time)
output = codeflash_wrap(accurate_sleepfunc, '{module_path}', 'TestPigLatin', 'test_sleepfunc_sequence_short', 'accurate_sleepfunc', '0', codeflash_loop_index, codeflash_cur, codeflash_con, n)
codeflash_con.close()
"""
@ -2615,14 +2449,14 @@ class TestPigLatin(unittest.TestCase):
original_cwd = Path.cwd()
run_cwd = Path(__file__).parent.parent.resolve()
func = FunctionToOptimize(
function_name="sleepfunc_sequence",
function_name="accurate_sleepfunc",
parents=[],
file_path=Path("module.py"),
)
os.chdir(run_cwd)
success, new_test = inject_profiling_into_existing_test(
test_path,
[CodePosition(14, 18), CodePosition(24, 18)],
[CodePosition(12, 17)],
func,
project_root_path,
"unittest",
@ -2667,13 +2501,10 @@ class TestPigLatin(unittest.TestCase):
test_files=test_files,
optimization_iteration=0,
test_functions=None,
pytest_min_loops=1,
pytest_max_loops=1,
testing_time=0.1,
)
# random validations for successful excution of the test
assert test_results[0].id.function_getting_tested == "sleepfunc_sequence"
assert test_results[0].id.function_getting_tested == "accurate_sleepfunc"
assert test_results[0].id.iteration_id == "0_0"
assert test_results[0].id.test_class_name == "TestPigLatin"
assert test_results[0].id.test_function_name == "test_sleepfunc_sequence_short"
@ -2681,24 +2512,11 @@ class TestPigLatin(unittest.TestCase):
test_results[0].id.test_module_path
== "code_to_optimize.tests.unittest.test_time_correction_instrumentation_unittest_temp"
)
assert test_results[4].did_pass
total_passed_runtime: int = 0
# time validation with 10% for test suite 1 and 1% tolerance for test suite 2, i.e. ~0.001 to ~0.004 seconds
assert len(test_results) == 2
for i, test_result in enumerate(test_results):
total_passed_runtime += test_result.runtime or 0 # Defaults to 0 if runtime is None
expected_runtime = (i % 8 + 1) * 1e7 if (i % 8) < 4 else (i % 8 - 3) * 1e8
rel_tolerance = 1e-1 if i < 4 else 9e-2
is_close = math.isclose(test_result.runtime, expected_runtime, rel_tol=rel_tolerance)
assert is_close, f"Test {i} failed: runtime {test_result.runtime} not within tolerance of {expected_runtime} with rel_tol={rel_tolerance}"
# Validate total_passed_runtime
expected_total = sum((i + 1) * 1e7 for i in range(4)) + sum((i - 3) * 1e8 for i in range(4, 8))
assert math.isclose(
total_passed_runtime,
expected_total,
rel_tol=1e-2,
), f"Total passed runtime: {total_passed_runtime} does not match the expected: {expected_total} sum."
assert test_result.did_pass
assert math.isclose(test_result.runtime, ((i % 2) + 1) * 100_000_000, rel_tol=0.01)
finally:
test_path.unlink(missing_ok=True)