codeflash-agent/packages/codeflash-python/tests/test_instrument_async_tests.py

772 lines
23 KiB
Python
Raw Normal View History

2026-04-03 22:36:50 +00:00
import sys
import tempfile
from pathlib import Path
import pytest
from codeflash_python._model import FunctionParent, TestingMode
from codeflash_python.analysis._discovery import FunctionToOptimize
from codeflash_python.test_discovery.models import CodePosition
from codeflash_python.testing._instrument_async import (
2026-04-03 22:36:50 +00:00
ASYNC_HELPER_FILENAME,
add_async_decorator_to_function,
get_decorator_name_for_mode,
)
from codeflash_python.testing._instrumentation import (
2026-04-03 22:36:50 +00:00
inject_profiling_into_existing_test,
)
@pytest.fixture
def temp_dir():
"""Create a temporary directory for test files."""
with tempfile.TemporaryDirectory() as temp:
yield Path(temp)
# @pytest.fixture
# def unique_test_iteration():
# """Provide a unique test iteration ID and clean up database after test."""
# # Generate unique iteration ID
# iteration_id = str(uuid.uuid4())[:8]
# # Store original environment variable
# original_iteration = os.environ.get("CODEFLASH_TEST_ITERATION")
# # Set unique iteration for this test
# os.environ["CODEFLASH_TEST_ITERATION"] = iteration_id
# try:
# yield iteration_id
# finally:
# # Cleanup: restore original environment and delete database file
# if original_iteration is not None:
# os.environ["CODEFLASH_TEST_ITERATION"] = original_iteration
# elif "CODEFLASH_TEST_ITERATION" in os.environ:
# del os.environ["CODEFLASH_TEST_ITERATION"]
# # Clean up database file
# try:
# from codeflash.code_utils.codeflash_wrap_decorator import get_run_tmp_file
# db_path = get_run_tmp_file(Path(f"test_return_values_{iteration_id}.sqlite"))
# if db_path.exists():
# db_path.unlink()
# except Exception:
# pass # Ignore cleanup errors
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_async_decorator_application_behavior_mode(temp_dir):
async_function_code = '''
import asyncio
async def async_function(x: int, y: int) -> int:
"""Simple async function for testing."""
await asyncio.sleep(0.01)
return x * y
'''
test_file = temp_dir / "test_async.py"
test_file.write_text(async_function_code)
func = FunctionToOptimize(
function_name="async_function",
file_path=test_file,
parents=[],
is_async=True,
)
decorator_added, _ = add_async_decorator_to_function(
test_file, func, TestingMode.BEHAVIOR
)
assert decorator_added
modified_code = test_file.read_text()
from codeflash_python.analysis._formatter import sort_imports
2026-04-03 22:36:50 +00:00
decorator_name = get_decorator_name_for_mode(TestingMode.BEHAVIOR)
code_with_decorator = async_function_code.replace(
"async def async_function",
f"@{decorator_name}\nasync def async_function",
)
code_with_import = f"from codeflash_async_wrapper import {decorator_name}\n{code_with_decorator}"
expected = sort_imports(code=code_with_import, float_to_top=True)
assert modified_code.strip() == expected.strip()
assert (temp_dir / ASYNC_HELPER_FILENAME).exists()
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_async_decorator_application_performance_mode(temp_dir):
async_function_code = '''
import asyncio
async def async_function(x: int, y: int) -> int:
"""Simple async function for testing."""
await asyncio.sleep(0.01)
return x * y
'''
test_file = temp_dir / "test_async.py"
test_file.write_text(async_function_code)
func = FunctionToOptimize(
function_name="async_function",
file_path=test_file,
parents=[],
is_async=True,
)
decorator_added, _ = add_async_decorator_to_function(
test_file, func, TestingMode.PERFORMANCE
)
assert decorator_added
modified_code = test_file.read_text()
from codeflash_python.analysis._formatter import sort_imports
2026-04-03 22:36:50 +00:00
decorator_name = get_decorator_name_for_mode(TestingMode.PERFORMANCE)
code_with_decorator = async_function_code.replace(
"async def async_function",
f"@{decorator_name}\nasync def async_function",
)
code_with_import = f"from codeflash_async_wrapper import {decorator_name}\n{code_with_decorator}"
expected = sort_imports(code=code_with_import, float_to_top=True)
assert modified_code.strip() == expected.strip()
assert (temp_dir / ASYNC_HELPER_FILENAME).exists()
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_async_decorator_application_concurrency_mode(temp_dir):
"""Test that CONCURRENCY mode applies the codeflash_concurrency_async decorator."""
async_function_code = '''
import asyncio
async def async_function(x: int, y: int) -> int:
"""Simple async function for testing."""
await asyncio.sleep(0.01)
return x * y
'''
test_file = temp_dir / "test_async.py"
test_file.write_text(async_function_code)
func = FunctionToOptimize(
function_name="async_function",
file_path=test_file,
parents=[],
is_async=True,
)
decorator_added, _ = add_async_decorator_to_function(
test_file, func, TestingMode.CONCURRENCY
)
assert decorator_added
modified_code = test_file.read_text()
from codeflash_python.analysis._formatter import sort_imports
2026-04-03 22:36:50 +00:00
decorator_name = get_decorator_name_for_mode(TestingMode.CONCURRENCY)
code_with_decorator = async_function_code.replace(
"async def async_function",
f"@{decorator_name}\nasync def async_function",
)
code_with_import = f"from codeflash_async_wrapper import {decorator_name}\n{code_with_decorator}"
expected = sort_imports(code=code_with_import, float_to_top=True)
assert modified_code.strip() == expected.strip()
assert (temp_dir / ASYNC_HELPER_FILENAME).exists()
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_async_class_method_decorator_application(temp_dir):
async_class_code = '''
import asyncio
class Calculator:
"""Test class with async methods."""
async def async_method(self, a: int, b: int) -> int:
"""Async method in class."""
await asyncio.sleep(0.005)
return a ** b
def sync_method(self, a: int, b: int) -> int:
"""Sync method in class."""
return a - b
'''
test_file = temp_dir / "test_async.py"
test_file.write_text(async_class_code)
func = FunctionToOptimize(
function_name="async_method",
file_path=test_file,
parents=[FunctionParent(name="Calculator", type="ClassDef")],
is_async=True,
)
decorator_added, _ = add_async_decorator_to_function(
test_file, func, TestingMode.BEHAVIOR
)
assert decorator_added
modified_code = test_file.read_text()
from codeflash_python.analysis._formatter import sort_imports
2026-04-03 22:36:50 +00:00
decorator_name = get_decorator_name_for_mode(TestingMode.BEHAVIOR)
code_with_decorator = async_class_code.replace(
" async def async_method",
f" @{decorator_name}\n async def async_method",
)
code_with_import = f"from codeflash_async_wrapper import {decorator_name}\n{code_with_decorator}"
expected = sort_imports(code=code_with_import, float_to_top=True)
assert modified_code.strip() == expected.strip()
assert (temp_dir / ASYNC_HELPER_FILENAME).exists()
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_async_decorator_no_duplicate_application(temp_dir):
# Case 1: Old-style import already present — injector should detect and skip
already_decorated_code = '''
from codeflash_python.runtime._codeflash_wrap_decorator import codeflash_behavior_async
import asyncio
@codeflash_behavior_async
async def async_function(x: int, y: int) -> int:
"""Already decorated async function."""
await asyncio.sleep(0.01)
return x * y
'''
test_file = temp_dir / "test_async.py"
test_file.write_text(already_decorated_code)
func = FunctionToOptimize(
function_name="async_function",
file_path=test_file,
parents=[],
is_async=True,
)
decorator_added, _ = add_async_decorator_to_function(
test_file, func, TestingMode.BEHAVIOR
)
# Should not add duplicate decorator
assert not decorator_added
# Case 2: Inline definition already present — injector should detect and skip
already_inline_code = '''
import asyncio
def codeflash_behavior_async(func):
return func
@codeflash_behavior_async
async def async_function(x: int, y: int) -> int:
"""Already decorated async function."""
await asyncio.sleep(0.01)
return x * y
'''
test_file2 = temp_dir / "test_async2.py"
test_file2.write_text(already_inline_code)
func2 = FunctionToOptimize(
function_name="async_function",
file_path=test_file2,
parents=[],
is_async=True,
)
decorator_added2, _ = add_async_decorator_to_function(
test_file2, func2, TestingMode.BEHAVIOR
)
# Should not add duplicate decorator
assert not decorator_added2
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_inject_profiling_async_function_behavior_mode(temp_dir):
source_module_code = '''
import asyncio
async def async_function(x: int, y: int) -> int:
"""Simple async function for testing."""
await asyncio.sleep(0.01)
return x * y
'''
source_file = temp_dir / "my_module.py"
source_file.write_text(source_module_code)
async_test_code = '''
import asyncio
import pytest
from my_module import async_function
@pytest.mark.asyncio
async def test_async_function():
"""Test async function behavior."""
result = await async_function(5, 3)
assert result == 15
result2 = await async_function(2, 4)
assert result2 == 8
'''
test_file = temp_dir / "test_async.py"
test_file.write_text(async_test_code)
func = FunctionToOptimize(
function_name="async_function",
parents=[],
file_path=Path("my_module.py"),
is_async=True,
)
# First instrument the source module
from codeflash_python.testing._instrument_async import (
2026-04-03 22:36:50 +00:00
add_async_decorator_to_function,
)
source_success, _ = add_async_decorator_to_function(
source_file, func, TestingMode.BEHAVIOR
)
assert source_success is True
# Verify the file was modified with exact expected output
instrumented_source = source_file.read_text()
from codeflash_python.analysis._formatter import sort_imports
2026-04-03 22:36:50 +00:00
decorator_name = get_decorator_name_for_mode(TestingMode.BEHAVIOR)
code_with_decorator = source_module_code.replace(
"async def async_function",
f"@{decorator_name}\nasync def async_function",
)
code_with_import = f"from codeflash_async_wrapper import {decorator_name}\n{code_with_decorator}"
expected = sort_imports(code=code_with_import, float_to_top=True)
assert instrumented_source.strip() == expected.strip()
assert (temp_dir / ASYNC_HELPER_FILENAME).exists()
success, instrumented_test_code = inject_profiling_into_existing_test(
test_file,
[CodePosition(8, 18), CodePosition(11, 19)],
func,
temp_dir,
mode=TestingMode.BEHAVIOR,
)
# For async functions, once source is decorated, test injection should fail
# This is expected behavior - async instrumentation happens at the decorator level
assert success is False
assert instrumented_test_code is None
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_inject_profiling_async_function_performance_mode(temp_dir):
source_module_code = '''
import asyncio
async def async_function(x: int, y: int) -> int:
"""Simple async function for testing."""
await asyncio.sleep(0.01)
return x * y
'''
source_file = temp_dir / "my_module.py"
source_file.write_text(source_module_code)
# Create the test file
async_test_code = '''
import asyncio
import pytest
from my_module import async_function
@pytest.mark.asyncio
async def test_async_function():
"""Test async function performance."""
result = await async_function(5, 3)
assert result == 15
'''
test_file = temp_dir / "test_async.py"
test_file.write_text(async_test_code)
func = FunctionToOptimize(
function_name="async_function",
parents=[],
file_path=Path("my_module.py"),
is_async=True,
)
# First instrument the source module
from codeflash_python.testing._instrument_async import (
2026-04-03 22:36:50 +00:00
add_async_decorator_to_function,
)
source_success, _ = add_async_decorator_to_function(
source_file, func, TestingMode.PERFORMANCE
)
assert source_success is True
# Verify the file was modified with exact expected output
instrumented_source = source_file.read_text()
from codeflash_python.analysis._formatter import sort_imports
2026-04-03 22:36:50 +00:00
decorator_name = get_decorator_name_for_mode(TestingMode.PERFORMANCE)
code_with_decorator = source_module_code.replace(
"async def async_function",
f"@{decorator_name}\nasync def async_function",
)
code_with_import = f"from codeflash_async_wrapper import {decorator_name}\n{code_with_decorator}"
expected = sort_imports(code=code_with_import, float_to_top=True)
assert instrumented_source.strip() == expected.strip()
assert (temp_dir / ASYNC_HELPER_FILENAME).exists()
# Now test the full pipeline with source module path
success, instrumented_test_code = inject_profiling_into_existing_test(
test_file,
[CodePosition(8, 18)],
func,
temp_dir,
mode=TestingMode.PERFORMANCE,
)
# For async functions, once source is decorated, test injection should fail
# This is expected behavior - async instrumentation happens at the decorator level
assert success is False
assert instrumented_test_code is None
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_mixed_sync_async_instrumentation(temp_dir):
source_module_code = '''
import asyncio
def sync_function(x: int, y: int) -> int:
"""Regular sync function."""
return x * y
async def async_function(x: int, y: int) -> int:
"""Simple async function."""
await asyncio.sleep(0.01)
return x * y
'''
source_file = temp_dir / "my_module.py"
source_file.write_text(source_module_code)
mixed_test_code = '''
import asyncio
import pytest
from my_module import sync_function, async_function
@pytest.mark.asyncio
async def test_mixed_functions():
"""Test both sync and async functions."""
sync_result = sync_function(10, 5)
assert sync_result == 50
async_result = await async_function(3, 4)
assert async_result == 12
'''
test_file = temp_dir / "test_mixed.py"
test_file.write_text(mixed_test_code)
async_func = FunctionToOptimize(
function_name="async_function",
parents=[],
file_path=Path("my_module.py"),
is_async=True,
)
from codeflash_python.testing._instrument_async import (
2026-04-03 22:36:50 +00:00
add_async_decorator_to_function,
)
source_success, _ = add_async_decorator_to_function(
source_file, async_func, TestingMode.BEHAVIOR
)
assert source_success
# Verify the file was modified
instrumented_source = source_file.read_text()
from codeflash_python.analysis._formatter import sort_imports
2026-04-03 22:36:50 +00:00
decorator_name = get_decorator_name_for_mode(TestingMode.BEHAVIOR)
code_with_decorator = source_module_code.replace(
"async def async_function",
f"@{decorator_name}\nasync def async_function",
)
code_with_import = f"from codeflash_async_wrapper import {decorator_name}\n{code_with_decorator}"
expected = sort_imports(code=code_with_import, float_to_top=True)
assert instrumented_source.strip() == expected.strip()
assert (temp_dir / ASYNC_HELPER_FILENAME).exists()
success, instrumented_test_code = inject_profiling_into_existing_test(
test_file,
[CodePosition(8, 18), CodePosition(11, 19)],
async_func,
temp_dir,
mode=TestingMode.BEHAVIOR,
)
# Async functions should not be instrumented at the test level
assert not success
assert instrumented_test_code is None
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_async_function_qualified_name_handling(temp_dir):
nested_async_code = '''
import asyncio
class OuterClass:
class InnerClass:
async def nested_async_method(self, x: int) -> int:
"""Nested async method."""
await asyncio.sleep(0.001)
return x * 2
'''
test_file = temp_dir / "test_nested.py"
test_file.write_text(nested_async_code)
func = FunctionToOptimize(
function_name="nested_async_method",
file_path=test_file,
parents=[
FunctionParent(name="OuterClass", type="ClassDef"),
FunctionParent(name="InnerClass", type="ClassDef"),
],
is_async=True,
)
decorator_added, _ = add_async_decorator_to_function(
test_file, func, TestingMode.BEHAVIOR
)
assert decorator_added
modified_code = test_file.read_text()
from codeflash_python.analysis._formatter import sort_imports
2026-04-03 22:36:50 +00:00
decorator_name = get_decorator_name_for_mode(TestingMode.BEHAVIOR)
code_with_decorator = nested_async_code.replace(
" async def nested_async_method",
f" @{decorator_name}\n async def nested_async_method",
)
code_with_import = f"from codeflash_async_wrapper import {decorator_name}\n{code_with_decorator}"
expected = sort_imports(code=code_with_import, float_to_top=True)
assert modified_code.strip() == expected.strip()
assert (temp_dir / ASYNC_HELPER_FILENAME).exists()
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_async_decorator_with_existing_decorators(temp_dir):
"""Test async decorator application when function already has other decorators."""
decorated_async_code = '''
import asyncio
from functools import wraps
def my_decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)
return wrapper
@my_decorator
async def async_function(x: int, y: int) -> int:
"""Async function with existing decorator."""
await asyncio.sleep(0.01)
return x * y
'''
test_file = temp_dir / "test_async.py"
test_file.write_text(decorated_async_code)
func = FunctionToOptimize(
function_name="async_function",
file_path=test_file,
parents=[],
is_async=True,
)
decorator_added, _ = add_async_decorator_to_function(
test_file, func, TestingMode.BEHAVIOR
)
assert decorator_added
modified_code = test_file.read_text()
# Should add codeflash decorator above existing decorators
assert "@codeflash_behavior_async" in modified_code
assert "@my_decorator" in modified_code
# Codeflash decorator should come first
codeflash_pos = modified_code.find("@codeflash_behavior_async")
my_decorator_pos = modified_code.find("@my_decorator")
assert codeflash_pos < my_decorator_pos
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_sync_function_not_affected_by_async_logic(temp_dir):
sync_function_code = '''
def sync_function(x: int, y: int) -> int:
"""Regular sync function."""
return x + y
'''
test_file = temp_dir / "test_sync.py"
test_file.write_text(sync_function_code)
sync_func = FunctionToOptimize(
function_name="sync_function",
file_path=test_file,
parents=[],
is_async=False,
)
decorator_added, _ = add_async_decorator_to_function(
test_file, sync_func, TestingMode.BEHAVIOR
)
assert not decorator_added
# File should not be modified for sync functions
modified_code = test_file.read_text()
assert modified_code == sync_function_code
@pytest.mark.skipif(
sys.platform == "win32", reason="pending support for asyncio on windows"
)
def test_inject_profiling_async_multiple_calls_same_test(temp_dir):
"""Test that multiple async function calls within the same test function get correctly numbered 0, 1, 2, etc."""
source_module_code = '''
import asyncio
async def async_sorter(items):
"""Simple async sorter for testing."""
await asyncio.sleep(0.001)
return sorted(items)
'''
source_file = temp_dir / "async_sorter.py"
source_file.write_text(source_module_code)
test_code_multiple_calls = """
import asyncio
import pytest
from async_sorter import async_sorter
@pytest.mark.asyncio
async def test_single_call():
result = await async_sorter([42])
assert result == [42]
@pytest.mark.asyncio
async def test_multiple_calls():
result1 = await async_sorter([3, 1, 2])
result2 = await async_sorter([5, 4])
result3 = await async_sorter([9, 8, 7, 6])
assert result1 == [1, 2, 3]
assert result2 == [4, 5]
assert result3 == [6, 7, 8, 9]
"""
test_file = temp_dir / "test_async_sorter.py"
test_file.write_text(test_code_multiple_calls)
func = FunctionToOptimize(
function_name="async_sorter",
parents=[],
file_path=Path("async_sorter.py"),
is_async=True,
)
# First instrument the source module with async decorators
from codeflash_python.testing._instrument_async import (
2026-04-03 22:36:50 +00:00
add_async_decorator_to_function,
)
source_success, _ = add_async_decorator_to_function(
source_file, func, TestingMode.BEHAVIOR
)
assert source_success
# Verify the file was modified
instrumented_source = source_file.read_text()
assert "@codeflash_behavior_async" in instrumented_source
import ast
tree = ast.parse(test_code_multiple_calls)
call_positions = []
for node in ast.walk(tree):
if isinstance(node, ast.Await) and isinstance(node.value, ast.Call):
if (
hasattr(node.value.func, "id")
and node.value.func.id == "async_sorter"
) or (
hasattr(node.value.func, "attr")
and node.value.func.attr == "async_sorter"
):
call_positions.append(
CodePosition(node.lineno, node.col_offset)
)
assert len(call_positions) == 4
success, instrumented_test_code = inject_profiling_into_existing_test(
test_file, call_positions, func, temp_dir, mode=TestingMode.BEHAVIOR
)
assert success
assert instrumented_test_code is not None
assert "_codeflash_call_site.set('8')" in instrumented_test_code
assert "_codeflash_call_site.set('13')" in instrumented_test_code
assert "_codeflash_call_site.set('14')" in instrumented_test_code
assert "_codeflash_call_site.set('15')" in instrumented_test_code
2026-04-03 22:36:50 +00:00
assert 1 == instrumented_test_code.count(
"_codeflash_call_site.set('8')"
2026-04-03 22:36:50 +00:00
)
assert 1 == instrumented_test_code.count(
"_codeflash_call_site.set('13')"
2026-04-03 22:36:50 +00:00
)
assert 1 == instrumented_test_code.count(
"_codeflash_call_site.set('14')"
2026-04-03 22:36:50 +00:00
)
assert 1 == instrumented_test_code.count(
"_codeflash_call_site.set('15')"
2026-04-03 22:36:50 +00:00
)