mirror of
https://github.com/codeflash-ai/codeflash-internal.git
synced 2026-05-04 18:25:18 +00:00
Frequentist comparison analysis of time series
This commit is contained in:
parent
753c4c8b34
commit
a90afc26d0
4 changed files with 79 additions and 9 deletions
|
|
@ -1,6 +1,8 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="PydanticConfigService">
|
||||
<option name="mypyInitTyped" value="true" />
|
||||
<option name="mypyWarnUntypedFields" value="true" />
|
||||
<option name="warnUntypedFields" value="true" />
|
||||
</component>
|
||||
</project>
|
||||
41
cli/codeflash/verification/statistical_analysis.py
Normal file
41
cli/codeflash/verification/statistical_analysis.py
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import numpy as np
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import numpy.typing as npt
|
||||
|
||||
TWO_SIGMA = 2
|
||||
|
||||
|
||||
def bootstrap_minima(series: list[int], bootstrap_size: int) -> npt.NDArray[np.int64]:
|
||||
rng = np.random.default_rng()
|
||||
return np.array([np.min(rng.choice(series, len(series), replace=True)) for _ in range(bootstrap_size)])
|
||||
|
||||
|
||||
def bootstrap_noise_floor(series: list[int], bootstrap_size: int) -> np.float64:
|
||||
return np.std(bootstrap_minima(series, bootstrap_size))
|
||||
|
||||
|
||||
def combined_series_noise_floor(series1: list[int], series2: list[int], bootstrap_size: int) -> float:
|
||||
noise_floor1 = bootstrap_noise_floor(series1, bootstrap_size)
|
||||
noise_floor2 = bootstrap_noise_floor(series2, bootstrap_size)
|
||||
return math.sqrt(noise_floor1 * noise_floor1 + noise_floor2 * noise_floor2)
|
||||
|
||||
|
||||
def series2_faster_95_confidence(
|
||||
series1: list[int], series2: list[int], bootstrap_size: int
|
||||
) -> tuple[float, float] | None:
|
||||
min1 = min(series1)
|
||||
min_diff = min1 - min(series2)
|
||||
if min_diff <= 0:
|
||||
return None
|
||||
combined_noise_floor = combined_series_noise_floor(series1, series2, bootstrap_size)
|
||||
percent_diff = 100 * min_diff / min1
|
||||
uncertainty = TWO_SIGMA * combined_noise_floor / min1
|
||||
if combined_noise_floor == 0 or min_diff / combined_noise_floor > TWO_SIGMA:
|
||||
return percent_diff, uncertainty
|
||||
return None
|
||||
|
|
@ -1,9 +1,10 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from collections.abc import Iterator
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Optional, cast
|
||||
from typing import Optional, cast
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic.dataclasses import dataclass
|
||||
|
|
@ -151,22 +152,30 @@ class TestResults(BaseModel):
|
|||
)
|
||||
return tree
|
||||
|
||||
def usable_runtime_data_by_test_case(self) -> dict[InvocationId, list[int]]:
|
||||
for result in self.test_results:
|
||||
if result.did_pass and not result.runtime:
|
||||
logger.debug(
|
||||
f"Ignoring test case that passed but had no runtime -> {result.id}, Loop # {result.loop_index}"
|
||||
)
|
||||
usable_runtimes = [
|
||||
(result.id, result.runtime) for result in self.test_results if result.did_pass and result.runtime
|
||||
]
|
||||
return {
|
||||
usable_id: [runtime[1] for runtime in usable_runtimes if runtime[0] == usable_id]
|
||||
for usable_id in {runtime[0] for runtime in usable_runtimes}
|
||||
}
|
||||
|
||||
def total_passed_runtime(self) -> int:
|
||||
"""Calculate the sum of runtimes of all test cases that passed, where a testcase runtime
|
||||
is the minimum value of all looped execution runtimes.
|
||||
|
||||
:return: The runtime in nanoseconds.
|
||||
"""
|
||||
for result in self.test_results:
|
||||
if result.did_pass and not result.runtime:
|
||||
logger.debug(
|
||||
f"Ignoring test case that passed but had no runtime -> {result.id}, Loop # {result.loop_index}"
|
||||
)
|
||||
usable_results = [result for result in self.test_results if result.did_pass and result.runtime]
|
||||
return sum(
|
||||
[
|
||||
min([result.runtime for result in usable_results if result.id == invocation_id])
|
||||
for invocation_id in {result.id for result in usable_results}
|
||||
min(usable_runtime_data)
|
||||
for invocation_id, usable_runtime_data in self.usable_runtime_data_by_test_case().items()
|
||||
]
|
||||
)
|
||||
|
||||
|
|
|
|||
18
cli/tests/test_statistical_analysis.py
Normal file
18
cli/tests/test_statistical_analysis.py
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
import numpy as np
|
||||
from codeflash.verification.statistical_analysis import series2_faster_95_confidence
|
||||
|
||||
|
||||
def create_timing_series(size: int, mean: int, std_dev: int) -> list[int]:
|
||||
mu = np.log(mean**2 / np.sqrt(std_dev**2 + mean**2))
|
||||
sigma = np.sqrt(np.log(1 + (std_dev**2 / mean**2)))
|
||||
rng = np.random.default_rng()
|
||||
return np.round(rng.lognormal(mu, sigma, size)).astype(int).tolist()
|
||||
|
||||
|
||||
def test_compare_timing_series() -> None:
|
||||
original_timing_series = create_timing_series(50000, 2000, 100)
|
||||
optimized_timing_series = create_timing_series(50000, 1700, 85)
|
||||
result = series2_faster_95_confidence(original_timing_series, optimized_timing_series, 5000)
|
||||
assert result is not None
|
||||
assert 12 < result[0] < 18
|
||||
assert 0.01 < result[1] < 0.1
|
||||
Loading…
Reference in a new issue