modified printing of results, handle errors when collecting benchmarks

This commit is contained in:
Alvin Ryanputra 2025-03-19 15:04:47 -07:00
parent c29c8bffba
commit 54fe71f336
12 changed files with 164 additions and 128 deletions

View file

@ -84,9 +84,5 @@ class CodeflashTrace:
return result
return wrapper
# Create a singleton instance
codeflash_trace = CodeflashTrace()

View file

@ -4,13 +4,11 @@ from pathlib import Path
from codeflash.discovery.functions_to_optimize import FunctionToOptimize
def get_function_benchmark_timings(trace_path: Path) -> dict[str, dict[str, float]]:
def get_function_benchmark_timings(trace_path: Path) -> dict[str, dict[str, int]]:
"""Process the trace file and extract timing data for all functions.
Args:
trace_path: Path to the trace file
all_functions_to_optimize: List of FunctionToOptimize objects (not used directly,
but kept for backward compatibility)
Returns:
A nested dictionary where:
@ -30,8 +28,7 @@ def get_function_benchmark_timings(trace_path: Path) -> dict[str, dict[str, floa
# Query the function_calls table for all function calls
cursor.execute(
"SELECT module_name, class_name, function_name, "
"benchmark_file_name, benchmark_function_name, benchmark_line_number, "
"(time_ns - overhead_time_ns) as actual_time_ns "
"benchmark_file_name, benchmark_function_name, benchmark_line_number, time_ns "
"FROM function_calls"
)
@ -66,7 +63,7 @@ def get_function_benchmark_timings(trace_path: Path) -> dict[str, dict[str, floa
return result
def get_benchmark_timings(trace_path: Path) -> dict[str, float]:
def get_benchmark_timings(trace_path: Path) -> dict[str, int]:
"""Extract total benchmark timings from trace files.
Args:
@ -75,32 +72,47 @@ def get_benchmark_timings(trace_path: Path) -> dict[str, float]:
Returns:
A dictionary mapping where:
- Keys are benchmark filename :: benchmark test function :: line number
- Values are total benchmark timing in milliseconds
- Values are total benchmark timing in milliseconds (with overhead subtracted)
"""
# Initialize the result dictionary
result = {}
overhead_by_benchmark = {}
# Connect to the SQLite database
connection = sqlite3.connect(trace_path)
cursor = connection.cursor()
try:
# Query the benchmark_timings table
# Query the function_calls table to get total overhead for each benchmark
cursor.execute(
"SELECT benchmark_file_name, benchmark_function_name, benchmark_line_number, SUM(overhead_time_ns) "
"FROM function_calls "
"GROUP BY benchmark_file_name, benchmark_function_name, benchmark_line_number"
)
# Process overhead information
for row in cursor.fetchall():
benchmark_file, benchmark_func, benchmark_line, total_overhead_ns = row
benchmark_key = f"{benchmark_file}::{benchmark_func}::{benchmark_line}"
overhead_by_benchmark[benchmark_key] = total_overhead_ns or 0 # Handle NULL sum case
# Query the benchmark_timings table for total times
cursor.execute(
"SELECT benchmark_file_name, benchmark_function_name, benchmark_line_number, time_ns "
"FROM benchmark_timings"
)
# Process each row
# Process each row and subtract overhead
for row in cursor.fetchall():
benchmark_file, benchmark_func, benchmark_line, time_ns = row
# Create the benchmark key (file::function::line)
benchmark_key = f"{benchmark_file}::{benchmark_func}::{benchmark_line}"
# Store the timing
result[benchmark_key] = time_ns
# Subtract overhead from total time
overhead = overhead_by_benchmark.get(benchmark_key, 0)
result[benchmark_key] = time_ns - overhead
finally:
# Close the connection

View file

@ -20,13 +20,14 @@ if __name__ == "__main__":
db.setup()
exitcode = pytest.main(
[benchmarks_root, "--codeflash-trace", "-p", "no:benchmark", "-s", "-o", "addopts="], plugins=[CodeFlashBenchmarkPlugin()]
)
) # Errors will be printed to stdout, not stderr
db.write_function_timings(codeflash_trace.function_calls_data)
db.write_benchmark_timings(CodeFlashBenchmarkPlugin.benchmark_timings)
db.print_function_timings()
db.print_benchmark_timings()
# db.print_function_timings()
# db.print_benchmark_timings()
db.close()
except Exception as e:
print(f"Failed to collect tests: {e!s}")
print(f"Failed to collect tests: {e!s}", file=sys.stderr)
exitcode = -1
sys.exit(exitcode)

View file

@ -142,8 +142,6 @@ trace_file_path = r"{trace_file}"
class_name = func.get("class_name")
file_name = func.get("file_name")
function_properties = func.get("function_properties")
print(f"Class: {class_name}, Function: {function_name}")
print(function_properties)
if not class_name:
alias = get_function_alias(module_name, function_name)
test_body = test_function_body.format(
@ -197,7 +195,7 @@ trace_file_path = r"{trace_file}"
return imports + "\n" + metadata + "\n" + test_template
def generate_replay_test(trace_file_path: Path, output_dir: Path, test_framework: str = "pytest", max_run_count: int = 100) -> None:
def generate_replay_test(trace_file_path: Path, output_dir: Path, test_framework: str = "pytest", max_run_count: int = 100) -> int:
"""Generate multiple replay tests from the traced function calls, grouping by benchmark name.
Args:
@ -211,6 +209,7 @@ def generate_replay_test(trace_file_path: Path, output_dir: Path, test_framework
Dictionary mapping benchmark names to generated test code
"""
count = 0
try:
# Connect to the database
conn = sqlite3.connect(trace_file_path.as_posix())
@ -253,7 +252,7 @@ def generate_replay_test(trace_file_path: Path, output_dir: Path, test_framework
})
if not functions_data:
print(f"No functions found for benchmark {benchmark_function_name} in {benchmark_file_name}")
logger.info(f"No functions found for benchmark {benchmark_function_name} in {benchmark_file_name}")
continue
# Generate the test code for this benchmark
@ -273,9 +272,11 @@ def generate_replay_test(trace_file_path: Path, output_dir: Path, test_framework
# Write test code to file, parents = true
output_dir.mkdir(parents=True, exist_ok=True)
output_file.write_text(test_code, "utf-8")
print(f"Replay test for benchmark `{benchmark_function_name}` in {benchmark_file_name} written to {output_file}")
count += 1
logger.info(f"Replay test for benchmark `{benchmark_function_name}` in {benchmark_file_name} written to {output_file}")
conn.close()
except Exception as e:
print(f"Error generating replay tests: {e}")
logger.info(f"Error generating replay tests: {e}")
return count

View file

@ -1,10 +1,15 @@
from __future__ import annotations
import re
from pytest import ExitCode
from codeflash.cli_cmds.console import logger
from codeflash.code_utils.compat import SAFE_SYS_EXECUTABLE
from pathlib import Path
import subprocess
def trace_benchmarks_pytest(benchmarks_root: Path, tests_root:Path, project_root: Path, trace_file: Path) -> None:
# set up .trace databases
result = subprocess.run(
[
SAFE_SYS_EXECUTABLE,
@ -19,5 +24,19 @@ def trace_benchmarks_pytest(benchmarks_root: Path, tests_root:Path, project_root
text=True,
env={"PYTHONPATH": str(project_root)},
)
print("stdout:", result.stdout)
print("stderr:", result.stderr)
if result.returncode != 0:
if "ERROR collecting" in result.stdout:
# Pattern matches "===== ERRORS =====" (any number of =) and captures everything after
error_pattern = r"={3,}\s*ERRORS\s*={3,}\n([\s\S]*?)(?:={3,}|$)"
match = re.search(error_pattern, result.stdout)
error_section = match.group(1) if match else result.stdout
elif "FAILURES" in result.stdout:
# Pattern matches "===== FAILURES =====" (any number of =) and captures everything after
error_pattern = r"={3,}\s*FAILURES\s*={3,}\n([\s\S]*?)(?:={3,}|$)"
match = re.search(error_pattern, result.stdout)
error_section = match.group(1) if match else result.stdout
else:
error_section = result.stdout
logger.warning(
f"Error collecting benchmarks - Pytest Exit code: {result.returncode}={ExitCode(result.returncode).name}\n {error_section}"
)

View file

@ -1,7 +1,12 @@
def print_benchmark_table(function_benchmark_timings, total_benchmark_timings):
def print_benchmark_table(function_benchmark_timings: dict[str,dict[str,int]], total_benchmark_timings: dict[str,int]):
# Define column widths
benchmark_col_width = 50
time_col_width = 15
# Print table header
print(f"{'Benchmark Test':<50} | {'Total Time (s)':<15} | {'Function Time (s)':<15} | {'Percentage (%)':<15}")
print("-" * 100)
header = f"{'Benchmark Test':{benchmark_col_width}} | {'Total Time (ms)':{time_col_width}} | {'Function Time (ms)':{time_col_width}} | {'Percentage (%)':{time_col_width}}"
print(header)
print("-" * len(header))
# Process each function's benchmark data
for func_path, test_times in function_benchmark_timings.items():
@ -14,13 +19,16 @@ def print_benchmark_table(function_benchmark_timings, total_benchmark_timings):
total_time = total_benchmark_timings.get(test_name, 0)
if total_time > 0:
percentage = (func_time / total_time) * 100
sorted_tests.append((test_name, total_time, func_time, percentage))
# Convert nanoseconds to milliseconds
func_time_ms = func_time / 1_000_000
total_time_ms = total_time / 1_000_000
sorted_tests.append((test_name, total_time_ms, func_time_ms, percentage))
sorted_tests.sort(key=lambda x: x[3], reverse=True)
# Print each test's data
for test_name, total_time, func_time, percentage in sorted_tests:
print(f"{test_name:<50} | {total_time:<15.3f} | {func_time:<15.3f} | {percentage:<15.2f}")
# Usage
benchmark_file, benchmark_func, benchmark_line = test_name.split("::")
benchmark_name = f"{benchmark_file}::{benchmark_func}"
print(f"{benchmark_name:{benchmark_col_width}} | {total_time:{time_col_width}.3f} | {func_time:{time_col_width}.3f} | {percentage:{time_col_width}.2f}")
print()

View file

@ -106,7 +106,7 @@ class FunctionWithReturnStatement(ast.NodeVisitor):
@dataclass(frozen=True, config={"arbitrary_types_allowed": True})
class FunctionToOptimize:
"""Represents a function that is a candidate for optimization.
"""Represent a function that is a candidate for optimization.
Attributes
----------
@ -121,6 +121,7 @@ class FunctionToOptimize:
method extends this with the module name from the project root.
"""
function_name: str
file_path: Path
parents: list[FunctionParent] # list[ClassDef | FunctionDef | AsyncFunctionDef]
@ -143,12 +144,6 @@ class FunctionToOptimize:
def qualified_name_with_modules_from_root(self, project_root_path: Path) -> str:
return f"{module_name_from_file_path(self.file_path, project_root_path)}.{self.qualified_name}"
#
# @property
# def qualified_name_with_file_name(self) -> str:
# class_name = self.parents[0].name if self.parents else None
# return f"{self.file_path}:{(class_name + ':' if class_name else '')}{self.function_name}"
def get_functions_to_optimize(
optimize_all: str | None,

View file

@ -77,6 +77,7 @@ class BestOptimization(BaseModel):
helper_functions: list[FunctionSource]
runtime: int
replay_runtime: int | None
replay_performance_gain: float | None
winning_behavioral_test_results: TestResults
winning_benchmarking_test_results: TestResults
winning_replay_benchmarking_test_results : TestResults | None = None

View file

@ -92,8 +92,8 @@ class FunctionOptimizer:
function_to_tests: dict[str, list[FunctionCalledInTest]] | None = None,
function_to_optimize_ast: ast.FunctionDef | None = None,
aiservice_client: AiServiceClient | None = None,
function_benchmark_timings: dict[str, dict[str, float]] | None = None,
total_benchmark_timings: dict[str, float] | None = None,
function_benchmark_timings: dict[str, dict[str, int]] | None = None,
total_benchmark_timings: dict[str, int] | None = None,
args: Namespace | None = None,
) -> None:
self.project_root = test_cfg.project_root_path
@ -276,30 +276,10 @@ class FunctionOptimizer:
best_runtime_ns=best_optimization.runtime,
function_name=function_to_optimize_qualified_name,
file_path=self.function_to_optimize.file_path,
replay_performance_gain=best_optimization.replay_performance_gain if self.args.benchmark else None,
fto_benchmark_timings = self.function_benchmark_timings[self.function_to_optimize.qualified_name_with_modules_from_root(self.project_root)] if self.args.benchmark else None,
total_benchmark_timings = self.total_benchmark_timings if self.args.benchmark else None,
)
speedup = explanation.speedup #
if self.args.benchmark:
original_replay_timing = original_code_baseline.benchmarking_test_results.total_replay_test_runtime()
fto_benchmark_timings = self.function_benchmark_timings[self.function_to_optimize.qualified_name_with_modules_from_root(self.project_root)]
for benchmark_key, og_benchmark_timing in fto_benchmark_timings.items():
# benchmark key is benchmark filename :: benchmark test function :: line number
try:
benchmark_file_name, benchmark_test_function, line_number = benchmark_key.split("::")
except ValueError:
print(f"Benchmark key {benchmark_key} is not in the expected format.")
continue
print(f"Calculating speedup for benchmark {benchmark_key}")
total_benchmark_timing = self.total_benchmark_timings[benchmark_key]
# find out expected new benchmark timing, then calculate how much total benchmark was sped up. print out intermediate values
print(f"Original benchmark timing: {total_benchmark_timing}")
replay_speedup = original_replay_timing / best_optimization.replay_runtime - 1
print(f"Replay speedup: {replay_speedup}")
expected_new_benchmark_timing = total_benchmark_timing - og_benchmark_timing + 1 / (replay_speedup + 1) * og_benchmark_timing
print(f"Expected new benchmark timing: {expected_new_benchmark_timing}")
benchmark_speedup_ratio = total_benchmark_timing / expected_new_benchmark_timing
benchmark_speedup_percent = (benchmark_speedup_ratio - 1) * 100
print(f"Benchmark speedup: {benchmark_speedup_percent:.2f}%")
self.log_successful_optimization(explanation, generated_tests)
@ -455,21 +435,21 @@ class FunctionOptimizer:
original_runtime_ns=original_code_replay_runtime,
optimized_runtime_ns=candidate_replay_runtime,
)
tree.add("Replay Benchmarking ")
tree.add(f"Original summed runtime: {humanize_runtime(original_code_replay_runtime)}")
tree.add(f"Original benchmark replay runtime: {humanize_runtime(original_code_replay_runtime)}")
tree.add(
f"Best summed runtime: {humanize_runtime(candidate_replay_runtime)} "
f"Best benchmark replay runtime: {humanize_runtime(candidate_replay_runtime)} "
f"(measured over {candidate_result.max_loop_count} "
f"loop{'s' if candidate_result.max_loop_count > 1 else ''})"
)
tree.add(f"Speedup percentage: {replay_perf_gain * 100:.1f}%")
tree.add(f"Speedup ratio: {replay_perf_gain + 1:.1f}X")
tree.add(f"Speedup percentage for benchmark replay test: {replay_perf_gain * 100:.1f}%")
tree.add(f"Speedup ratio for benchmark replay test: {replay_perf_gain + 1:.1f}X")
best_optimization = BestOptimization(
candidate=candidate,
helper_functions=code_context.helper_functions,
runtime=best_test_runtime,
replay_runtime=candidate_replay_runtime if self.args.benchmark else None,
winning_behavioral_test_results=candidate_result.behavior_test_results,
replay_performance_gain=replay_perf_gain if self.args.benchmark else None,
winning_benchmarking_test_results=candidate_result.benchmarking_test_results,
winning_replay_benchmarking_test_results=candidate_result.benchmarking_test_results,
)
@ -525,6 +505,7 @@ class FunctionOptimizer:
)
console.print(Group(explanation_panel, tests_panel))
else:
console.print(explanation_panel)
ph(
@ -682,7 +663,6 @@ class FunctionOptimizer:
existing_test_files_count += 1
elif test_type == TestType.REPLAY_TEST:
replay_test_files_count += 1
print("Replay test found")
elif test_type == TestType.CONCOLIC_COVERAGE_TEST:
concolic_coverage_test_files_count += 1
else:
@ -1157,7 +1137,6 @@ class FunctionOptimizer:
f"stdout: {run_result.stdout}\n"
f"stderr: {run_result.stderr}\n"
)
# print(test_files)
results, coverage_results = parse_test_results(
test_xml_path=result_file_path,
test_files=test_files,

View file

@ -11,8 +11,9 @@ from codeflash.api.aiservice import AiServiceClient, LocalAiServiceClient
from codeflash.benchmarking.replay_test import generate_replay_test
from codeflash.benchmarking.trace_benchmarks import trace_benchmarks_pytest
from codeflash.benchmarking.utils import print_benchmark_table
from codeflash.cli_cmds.console import console, logger
from codeflash.cli_cmds.console import console, logger, progress_bar
from codeflash.code_utils import env_utils
from codeflash.code_utils.code_extractor import add_needed_imports_from_module
from codeflash.code_utils.code_replacer import normalize_code, normalize_node
from codeflash.code_utils.code_utils import get_run_tmp_file
from codeflash.code_utils.static_analysis import analyze_imported_modules, get_first_top_level_function_or_method_ast
@ -96,9 +97,13 @@ class Optimizer:
project_root=self.args.project_root,
module_root=self.args.module_root,
)
all_functions_to_optimize = [
fto for functions_to_optimize in file_to_funcs_to_optimize.values() for fto in functions_to_optimize]
function_benchmark_timings = None
total_benchmark_timings = None
if self.args.benchmark:
with progress_bar(
f"Running benchmarks in {self.args.benchmarks_root}",
transient=True,
):
# Insert decorator
file_path_to_source_code = defaultdict(str)
for file in file_to_funcs_to_optimize:
@ -109,12 +114,16 @@ class Optimizer:
for fto in functions_to_optimize:
instrument_codeflash_trace_decorator(fto)
trace_file = Path(self.args.benchmarks_root) / "benchmarks.trace"
replay_tests_dir = Path(self.args.tests_root) / "codeflash_replay_tests"
trace_benchmarks_pytest(self.args.benchmarks_root, self.args.tests_root, self.args.project_root, trace_file) # Simply run all tests that use pytest-benchmark
generate_replay_test(trace_file, Path(self.args.tests_root) / "codeflash_replay_tests" )
replay_count = generate_replay_test(trace_file, replay_tests_dir)
if replay_count == 0:
logger.info(f"No valid benchmarks found in {self.args.benchmarks_root} for functions to optimize, continuing optimization")
else:
function_benchmark_timings = get_function_benchmark_timings(trace_file)
total_benchmark_timings = get_benchmark_timings(trace_file)
print(function_benchmark_timings)
print(total_benchmark_timings)
print_benchmark_table(function_benchmark_timings, total_benchmark_timings)
logger.info("Finished tracing existing benchmarks")
except Exception as e:
logger.info(f"Error while tracing existing benchmarks: {e}")
@ -125,13 +134,6 @@ class Optimizer:
with file.open("w", encoding="utf8") as f:
f.write(file_path_to_source_code[file])
# trace_dir = Path(self.args.benchmarks_root) / ".codeflash_trace"
# function_benchmark_timings = get_function_benchmark_timings(trace_dir, all_functions_to_optimize)
# total_benchmark_timings = get_benchmark_timings(trace_dir)
# print_benchmark_table(function_benchmark_timings, total_benchmark_timings)
# return
optimizations_found: int = 0
function_iterator_count: int = 0
if self.args.test_framework == "pytest":
@ -210,15 +212,10 @@ class Optimizer:
f"Skipping optimization."
)
continue
if self.args.benchmark:
if self.args.benchmark and function_benchmark_timings and total_benchmark_timings:
function_optimizer = self.create_function_optimizer(
function_to_optimize, function_to_optimize_ast, function_to_tests, validated_original_code[original_module_path].source_code, function_benchmark_timings, total_benchmark_timings
)
# function_optimizer = self.create_function_optimizer(
# function_to_optimize, function_to_optimize_ast, function_to_tests,
# validated_original_code[original_module_path].source_code
# )
else:
function_optimizer = self.create_function_optimizer(
function_to_optimize, function_to_optimize_ast, function_to_tests,
@ -250,9 +247,9 @@ class Optimizer:
if function_optimizer.test_cfg.concolic_test_root_dir:
shutil.rmtree(function_optimizer.test_cfg.concolic_test_root_dir, ignore_errors=True)
if self.args.benchmark:
trace_dir = Path(self.args.benchmarks_root) / "codeflash_replay_tests"
if trace_dir.exists():
shutil.rmtree(trace_dir, ignore_errors=True)
if replay_tests_dir.exists():
shutil.rmtree(replay_tests_dir, ignore_errors=True)
trace_file.unlink(missing_ok=True)
if hasattr(get_run_tmp_file, "tmpdir"):
get_run_tmp_file.tmpdir.cleanup()

View file

@ -15,6 +15,9 @@ class Explanation:
best_runtime_ns: int
function_name: str
file_path: Path
replay_performance_gain: float | None
fto_benchmark_timings: dict[str, int] | None
total_benchmark_timings: dict[str, int] | None
@property
def perf_improvement_line(self) -> str:
@ -37,12 +40,34 @@ class Explanation:
# TODO: Sometimes the explanation says something similar to "This is the code that was optimized", remove such parts
original_runtime_human = humanize_runtime(self.original_runtime_ns)
best_runtime_human = humanize_runtime(self.best_runtime_ns)
benchmark_info = ""
if self.replay_performance_gain:
benchmark_info += "Benchmark Performance Details:\n"
for benchmark_key, og_benchmark_timing in self.fto_benchmark_timings.items():
# benchmark key is benchmark filename :: benchmark test function :: line number
try:
benchmark_file_name, benchmark_test_function, line_number = benchmark_key.split("::")
except ValueError:
benchmark_info += f"Benchmark key {benchmark_key} is not in the expected format.\n"
continue
total_benchmark_timing = self.total_benchmark_timings[benchmark_key]
# find out expected new benchmark timing, then calculate how much total benchmark was sped up. print out intermediate values
benchmark_info += f"Original timing for {benchmark_file_name}::{benchmark_test_function}: {humanize_runtime(total_benchmark_timing)}\n"
replay_speedup = self.replay_performance_gain
expected_new_benchmark_timing = total_benchmark_timing - og_benchmark_timing + 1 / (
replay_speedup + 1) * og_benchmark_timing
benchmark_info += f"Expected new timing for {benchmark_file_name}::{benchmark_test_function}: {humanize_runtime(expected_new_benchmark_timing)}\n"
benchmark_speedup_ratio = total_benchmark_timing / expected_new_benchmark_timing
benchmark_speedup_percent = (benchmark_speedup_ratio - 1) * 100
benchmark_info += f"Benchmark speedup for {benchmark_file_name}::{benchmark_test_function}: {benchmark_speedup_percent:.2f}%\n\n"
return (
f"Optimized {self.function_name} in {self.file_path}\n"
f"{self.perf_improvement_line}\n"
f"Runtime went down from {original_runtime_human} to {best_runtime_human} \n\n"
+ "Explanation:\n"
+ (benchmark_info if benchmark_info else "")
+ self.raw_explanation_message
+ " \n\n"
+ "The new optimized code was tested for correctness. The results are listed below.\n"

View file

@ -59,6 +59,7 @@ def run_behavioral_tests(
)
test_files = list(set(test_files)) # remove multiple calls in the same test function
common_pytest_args = [
"--benchmark-skip",
"--capture=tee-sys",
f"--timeout={pytest_timeout}",
"-q",
@ -240,6 +241,7 @@ def run_benchmarking_tests(
test_files.append(str(file.benchmarking_file_path))
test_files = list(set(test_files)) # remove multiple calls in the same test function
pytest_args = [
"--benchmark-skip",
"--capture=tee-sys",
f"--timeout={pytest_timeout}",
"-q",