Update parse_test_output.py

Update end-to-end-test-coverage.yaml

fix tests

Revert "print blocklisted functions"
add is_in_CI utility
print blocklisted functions
add coverage collection to E2E tests
escape correctly
Update end_to_end_test_bubblesort_pytest.py
revert GHA is noisier with coverage, let's see

change name?

feedback loop
This commit is contained in:
Kevin Turcios 2024-11-19 04:22:39 -04:00
parent c548c3405b
commit 7eee64ab0b
9 changed files with 143 additions and 68 deletions

View file

@ -56,3 +56,8 @@ def ensure_pr_number() -> bool:
"Codeflash can comment on the right PR"
)
return True
@lru_cache(maxsize=1)
def is_in_CI() -> bool:
return any([bool(get_pr_number()), bool(os.environ.get("CI")), bool(os.environ.get("GITHUB_ACTIONS"))])

View file

@ -5,7 +5,7 @@ import re
from pathlib import Path
from typing import Any, Collection, Optional, Union
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict
from pydantic.dataclasses import dataclass
from codeflash.cli_cmds.console import console, logger
@ -19,7 +19,7 @@ from codeflash.models.models import CodeOptimizationContext
from codeflash.verification.test_results import TestResults
@dataclass(config=dict(arbitrary_types_allowed=True))
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class CoverageData:
"""Represents the coverage data for a specific function in a source file, using one or more test files."""

View file

@ -41,7 +41,7 @@ from codeflash.code_utils.static_analysis import analyze_imported_modules
from codeflash.code_utils.time_utils import humanize_runtime
from codeflash.discovery.discover_unit_tests import discover_unit_tests
from codeflash.discovery.functions_to_optimize import FunctionToOptimize, get_functions_to_optimize
from codeflash.models.Coverage import CoverageData, OriginalCodeBaseline
from codeflash.models.coverage import CoverageData, OriginalCodeBaseline
from codeflash.models.ExperimentMetadata import ExperimentMetadata
from codeflash.models.models import (
BestOptimization,
@ -879,7 +879,7 @@ class Optimizer:
optimization_iteration=0,
test_functions=only_run_these_test_functions_for_test_files,
testing_time=TOTAL_LOOPING_TIME,
enable_coverage=True,
enable_coverage=False,
function_name=function_name,
source_file=function_file_path,
code_context=code_context,
@ -939,7 +939,9 @@ class Optimizer:
)
console.rule()
logger.debug(f"Total original code runtime (ns): {total_timing}")
in_github_actions_mode = os.getenv("GITHUB_ACTIONS") == "true"
# in_github_actions_mode = bool(env_utils.is_in_CI())
in_github_actions_mode = bool(env_utils.get_pr_number())
logger.info(f"{in_github_actions_mode=}: {env_utils.get_pr_number()=}")
if in_github_actions_mode:
console.print(coverage_results)
return Success(

View file

@ -20,7 +20,7 @@ from codeflash.code_utils.code_utils import (
module_name_from_file_path,
)
from codeflash.discovery.discover_unit_tests import discover_parameters_unittest
from codeflash.models.Coverage import CoverageData
from codeflash.models.coverage import CoverageData
from codeflash.models.models import TestFiles
from codeflash.verification.test_results import FunctionTestInvocation, InvocationId, TestResults

View file

@ -3,6 +3,8 @@ import pathlib
import re
import subprocess
import pytest
def main():
module_root = (pathlib.Path(__file__).parent.parent.parent / "code_to_optimize").resolve()
@ -53,6 +55,33 @@ def main():
num_unit_tests = int(unit_test_search.group(1))
assert num_unit_tests > 0, "Could not find existing unit tests"
pattern = r"""
main_func_coverage=FunctionCoverage\(
.*?coverage=(?P<coverage>[\d.]+),
\s*executed_lines=\[(?P<executed_lines>[\d,\s]*)\]
"""
match = re.search(pattern, stdout, re.VERBOSE)
if match:
coverage = float(match.group("coverage"))
executed_lines = list(map(int, match.group("executed_lines").split(", ")))
assert coverage == 100.0, f"Coverage was {coverage} instead of 100.0"
assert executed_lines == [
2,
3,
4,
5,
6,
7,
8,
], f"Executed lines were {executed_lines} instead of [2, 3, 4, 6, 9]"
else:
pytest.fail("Failed to find coverage data in stdout")
if __name__ == "__main__":
main()

View file

@ -4,50 +4,6 @@ import re
import subprocess
def futurehouse_coverage() -> None:
cwd = (
pathlib.Path(__file__).parent.parent.parent / "code_to_optimize" / "code_directories" / "futurehouse_structure"
).resolve()
command = ["python", "../../../codeflash/main.py", "--file", "src/aviary/common_tags.py", "--no-pr"]
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=str(cwd), env=os.environ.copy()
)
output = []
for line in process.stdout:
print(line, end="") # Print each line in real-time
output.append(line) # Store each line in the output variable
return_code = process.wait()
stdout = "".join(output)
assert return_code == 0, f"The codeflash command returned exit code {return_code} instead of 0"
assert "CoverageData(" in stdout, "Failed to find CoverageData in stdout"
coverage_search = re.search(
r"main_func_coverage=FunctionCoverage\(\n\s+name='find_common_tags',\n\s+coverage=([\d.]+),\n\s+executed_lines=\[(.+)\],",
stdout,
)
coverage = float(coverage_search.group(1))
assert coverage == 100.0, f"Coverage was {coverage} instead of 100.0"
executed_lines = list(map(int, coverage_search.group(2).split(", ")))
assert executed_lines == [
5,
6,
7,
8,
9,
11,
12,
13,
14,
], f"Executed lines were {executed_lines} instead of [5, 6, 7, 8, 9, 11, 12, 13, 14]"
print(f"Coverage was {coverage} and executed lines were {executed_lines}, as expected")
def mybestrepocoverage() -> None:
cwd = (
pathlib.Path(__file__).parent.parent.parent / "code_to_optimize" / "code_directories" / "my-best-repo"
@ -79,36 +35,48 @@ def mybestrepocoverage() -> None:
assert executed_lines == [2], f"Executed lines were {executed_lines} instead of [2]"
add_one_level_depth_coverage_search = re.search(
r"dependent_func_coverage=FunctionCoverage\(\n\s+name='add',\n\s+coverage=([\d.]+),\n\s+executed_lines=\[(.+)\],",
stdout,
)
coverage = float(add_one_level_depth_coverage_search.group(1))
assert coverage == 100.0, f"Coverage was {coverage} instead of 100.0"
executed_lines = list(map(int, add_one_level_depth_coverage_search.group(2).split(", ")))
assert executed_lines == [48], f"Executed lines were {executed_lines} instead of [48]"
print(f"Coverage was {coverage} and executed lines were {executed_lines}, as expected")
dependent_func_coverage_search = re.search(
add_one_level_depth_coverage_search = re.search(
r"main_func_coverage=FunctionCoverage\(\n\s+name='add_one_level_depth',\n\s+coverage=([\d.]+),\n\s+executed_lines=\[(.+)\],",
stdout,
)
add_one_level_depth_coverage = float(add_one_level_depth_coverage_search.group(1))
assert add_one_level_depth_coverage == 100.0, f"Coverage was {add_one_level_depth_coverage} instead of 100.0"
add_one_level_depth_executed_lines = list(map(int, add_one_level_depth_coverage_search.group(2).split(", ")))
assert add_one_level_depth_executed_lines == [
41
], f"Executed lines were {add_one_level_depth_executed_lines} instead of [41]"
print(
f"Coverage was {add_one_level_depth_coverage} and executed lines were {add_one_level_depth_executed_lines}, as expected"
)
add_one_level_depth_dependent_coverage_search = re.search(
r"dependent_func_coverage=FunctionCoverage\(\n\s+name='add',\n\s+coverage=([\d.]+),\n\s+executed_lines=\[(.+)\],",
stdout,
)
coverage = float(dependent_func_coverage_search.group(1))
add_one_level_depth_dependent_coverage = float(add_one_level_depth_dependent_coverage_search.group(1))
assert coverage == 100.0, f"Coverage was {coverage} instead of 100.0"
assert (
add_one_level_depth_dependent_coverage == 100.0
), f"Coverage was {add_one_level_depth_dependent_coverage} instead of 100.0"
executed_lines = list(map(int, dependent_func_coverage_search.group(2).split(", ")))
add_one_level_depth_dependent_executed_lines = list(
map(int, add_one_level_depth_dependent_coverage_search.group(2).split(", "))
)
assert executed_lines == [48], f"Executed lines were {executed_lines} instead of [48]"
assert add_one_level_depth_dependent_executed_lines == [
44
], f"Executed lines were {add_one_level_depth_dependent_executed_lines} instead of [44]"
def main() -> None:
futurehouse_coverage()
mybestrepocoverage()

View file

@ -35,6 +35,29 @@ def main():
num_unit_tests = int(unit_test_search.group(1))
assert num_unit_tests == 2, "Could not find existing unit tests"
assert "CoverageData(" in stdout, "Failed to find CoverageData in stdout"
coverage_search = re.search(
r"main_func_coverage=FunctionCoverage\(\n\s+name='find_common_tags',\n\s+coverage=([\d.]+),\n\s+executed_lines=\[(.+)\],",
stdout,
)
coverage = float(coverage_search.group(1))
assert coverage == 100.0, f"Coverage was {coverage} instead of 100.0"
executed_lines = list(map(int, coverage_search.group(2).split(", ")))
assert executed_lines == [
5,
6,
7,
8,
9,
11,
12,
13,
14,
], f"Executed lines were {executed_lines} instead of [5, 6, 7, 8, 9, 11, 12, 13, 14]"
if __name__ == "__main__":
main()

View file

@ -53,6 +53,35 @@ def main():
num_unit_tests = int(unit_test_search.group(1))
assert num_unit_tests > 0, "Could not find existing unit tests"
# main_func_coverage=FunctionCoverage(
# name='Graph.topologicalSort',
# coverage=100.0,
# executed_lines=[22, 23, 25, 26, 27, 29],
# unexecuted_lines=[],
# executed_branches=[[25, 26], [25, 29], [26, 25], [26, 27]],
# unexecuted_branches=[]
# ),
topological_sort_coverage_search = re.search(
r"main_func_coverage=FunctionCoverage\(\n\s+name='Graph.topologicalSort',\n\s+coverage=([\d.]+),\n\s+executed_lines=\[(.+)\],",
stdout,
)
coverage = float(topological_sort_coverage_search.group(1))
assert coverage == 100.0, f"Coverage was {coverage} instead of 100.0"
topological_sort_executed_lines = list(map(int, topological_sort_coverage_search.group(2).split(", ")))
assert topological_sort_executed_lines == [
22,
23,
25,
26,
27,
29,
], f"Executed lines were {topological_sort_executed_lines} instead of [22, 23, 25, 26, 27, 29]"
if __name__ == "__main__":
main()

View file

@ -61,6 +61,25 @@ def main():
assert passed > 0, f"Expected >0 passed replay tests, found {passed}"
assert failed == 0, f"Expected 0 failed replay tests, found {failed}"
funca_coverage_search = re.search(
r"main_func_coverage=FunctionCoverage\(\n\s+name='funcA',\n\s+coverage=([\d.]+),\n\s+executed_lines=\[(.+)\],",
stdout,
)
funca_coverage = float(funca_coverage_search.group(1))
assert funca_coverage == 100.0, f"Coverage was {funca_coverage} instead of 100.0"
funca_executed_lines = list(map(int, funca_coverage_search.group(2).split(", ")))
assert funca_executed_lines == [
2,
3,
4,
6,
9,
], f"Executed lines were {funca_executed_lines} instead of [2, 3, 4, 6, 9]"
if __name__ == "__main__":
main()