import os from typing import Any, Dict, Optional import pandas as pd from pandas import DataFrame from scipy.stats import hmean from sqlalchemy import create_engine def load_data( experiment_id: str, database_uri: str = os.environ.get("DATABASE_URL") ) -> DataFrame: engine = create_engine(database_uri) with engine.connect() as connection: query = """ SELECT * FROM optimization_features WHERE (trace_id LIKE %s OR trace_id LIKE %s) AND experiment_metadata->>'id' = %s """ return pd.read_sql_query( query, connection, params=("%EXP0", "%EXP1", experiment_id) ) def remove_orphans(df: DataFrame) -> list[int]: """Cleans up the given column by filling in missing values from the other column in the EXP0 / EXP1 pair. :param df: :param column_name: :return: """ grouped = df.groupby(df["trace_id"].str[:-4]) non_orphan_ids = [] exp1 = 0 for _, group in grouped: if len(group) == 2: non_orphan_ids.append(int(group.index[0])) non_orphan_ids.append(int(group.index[1])) else: assert len(group) == 1 # assert group.iloc[0].trace_id.endswith("EXP0") if group.iloc[0].trace_id.endswith("EXP1"): exp1 += 1 print(f"removed {df.shape[0] - len(non_orphan_ids)} ids") print(f"local exists but deployed does not: {exp1}") return non_orphan_ids def process_column_pairs(df: DataFrame, column_name: str) -> DataFrame: """Cleans up the given column by filling in missing values from the other column in the EXP0 / EXP1 pair. :param df: :param column_name: :return: """ new_df = df.copy() grouped = new_df.groupby(new_df["trace_id"].str[:-4]) for _, group in grouped: if len(group) == 2: exp0_row = group[group["trace_id"].str.endswith("EXP0")] exp1_row = group[group["trace_id"].str.endswith("EXP1")] if pd.isnull(exp0_row.iloc[0][column_name]) and not pd.isnull( exp1_row.iloc[0][column_name] ): new_df.at[exp0_row.index[0], column_name] = exp1_row.iloc[0][ column_name ] elif pd.isnull(exp1_row.iloc[0][column_name]) and not pd.isnull( exp0_row.iloc[0][column_name] ): new_df.at[exp1_row.index[0], column_name] = exp0_row.iloc[0][ column_name ] return new_df def calculate_validity(df: DataFrame, perf_threshold: float = 0.05) -> Dict[str, Any]: # Calculate the percentage of valid PRs given that the original function run succeeded successful_runs = df[(~df["original_runtime"].isna())] successful_runs_above_thres = successful_runs[ successful_runs["best_correct_speedup_ratio"] >= perf_threshold ] valid_prs = len(successful_runs_above_thres) percent_valid_pr = (valid_prs / len(df)) * 100 # Calculate the percentage of valid candidates generated given that original function run succeeded valid_candidates = successful_runs[ successful_runs["is_correct"].apply(lambda x: any(x.values())) ] percent_valid_candidates = ( len(valid_candidates) / len(successful_runs) * 100 if len(successful_runs) > 0 else 0 ) return { "percent_valid_pr": percent_valid_pr, "percent_valid_candidates": percent_valid_candidates, } def calculate_performance( df: DataFrame, perf_threshold: float = 0.05 ) -> Dict[str, Any]: # Filter out the rows where a valid candidate above the perf threshold was found valid_candidates_above_thres = df[ (df["best_correct_speedup_ratio"] >= perf_threshold) ] # (1) Calculate the average speedup ratio of the PR average_percentage_gain_pr = ( valid_candidates_above_thres["best_correct_speedup_ratio"].mean() * 100 ) # (1a) Calculate the harmonic mean of the PR speedup ratio with ref=1x harmonic_mean_gain_pr = hmean( valid_candidates_above_thres["best_correct_speedup_ratio"] + 1 ) # (2) Calculate the mean average speedup ratio of all the valid candidates mean_average_percentage_gain_all = df["best_correct_speedup_ratio"].mean() * 100 # (2a) Calculate the harmonic mean of all candidates speedup ratio with ref=1x harmonic_mean_gain_all = hmean(df["best_correct_speedup_ratio"].dropna() + 1) def calculate_time_saved_for_row(row: pd.Series): if row["optimized_runtime"] is not None and row["is_correct"] is not None: correct_runtimes = [ runtime for opt_id, runtime in row["optimized_runtime"].items() if row["is_correct"].get(opt_id) ] else: correct_runtimes = [] if correct_runtimes: return row["original_runtime"] - min(correct_runtimes) return None # (3) The average time saved in a PR given that a valid candidate was found above the perf threshold. pr_time_saved = ( valid_candidates_above_thres.apply( lambda row: calculate_time_saved_for_row(row), axis=1 ) .dropna() .mean() ) # (4) Calculate the mean average time saved for all the valid candidates all_candidates_time_saved = ( df.apply(lambda row: calculate_time_saved_for_row(row), axis=1).dropna().mean() ) return { "average_percentage_gain_pr": average_percentage_gain_pr, "harmonic_mean_gain_pr": harmonic_mean_gain_pr, "mean_average_percentage_gain_all": mean_average_percentage_gain_all, "harmonic_mean_gain_all": harmonic_mean_gain_all, "average_time_saved_pr": pr_time_saved, "mean_average_time_saved_all": all_candidates_time_saved, } def calculate_coverage(df: DataFrame) -> Dict[str, Any]: successful_runs = df[~df["original_runtime"].isna()] def calculate_percent_optimization_successful_runs( opt_runs: Dict[str, Optional[float]], ) -> float: if opt_runs is None: return 0.0 total_runs = len(opt_runs) successful_optimization_runs = sum( 1 for runtime in opt_runs.values() if runtime is not None ) return ( successful_optimization_runs / total_runs * 100 if total_runs > 0 else 0.0 ) df["percent_successful_optimization_runs"] = df["optimized_runtime"].apply( calculate_percent_optimization_successful_runs ) total_optimizations = sum( len(runs) for runs in successful_runs["optimized_runtime"] if runs is not None ) successful_optimizations = sum( len([runtime for runtime in runs.values() if runtime is not None]) for runs in successful_runs["optimized_runtime"] if runs is not None ) average_percent_successful_optimization_runs = df[ "percent_successful_optimization_runs" ].mean() percent_successful_optimizations = ( successful_optimizations / total_optimizations * 100 if total_optimizations > 0 else 0 ) percent_successful_original_runs = len(successful_runs) / len(df) * 100 return { "average_percent_successful_optimization_runs": average_percent_successful_optimization_runs, "percent_successful_optimizations": percent_successful_optimizations, "percent_successful_original_runs": percent_successful_original_runs, } def paired_comparison_coverage( df: DataFrame, model_a_suffix: str = "EXP0", model_b_suffix: str = "EXP1" ) -> Dict[str, Any]: paired_coverage_results = { "model_a_more_successful": 0, "equal_successful": 0, "model_b_more_successful": 0, } grouped = df.groupby(df["trace_id"].str[:-4]) for _, group in grouped: if len(group) == 2: model_a_row = group[group["trace_id"].str.endswith(model_a_suffix)] model_b_row = group[group["trace_id"].str.endswith(model_b_suffix)] if model_a_row["optimized_runtime"].values[0] is None: model_a_success_count = 0 else: model_a_success_count = sum( 1 for runtime in model_a_row["optimized_runtime"].values[0].values() if runtime is not None ) if model_b_row["optimized_runtime"].values[0] is None: model_b_success_count = 0 else: model_b_success_count = sum( 1 for runtime in model_b_row["optimized_runtime"].values[0].values() if runtime is not None ) if model_a_success_count > model_b_success_count: paired_coverage_results["model_a_more_successful"] += 1 elif model_a_success_count < model_b_success_count: paired_coverage_results["model_b_more_successful"] += 1 else: paired_coverage_results["equal_successful"] += 1 return paired_coverage_results # Implement coverage calculations here def paired_comparison_validity( df: DataFrame, model_a_suffix: str = "EXP0", model_b_suffix: str = "EXP1" ) -> Dict[str, Any]: # Paired - Calculate the percentage of runs where model A generated more, equal, or less valid candidates than model B paired_validity_results = { "model_a_more_valid": 0, "equal_valid": 0, "model_b_more_valid": 0, } grouped = df.groupby(df["trace_id"].str[:-4]) for _, group in grouped: if len(group) == 2: model_a_row = group[group["trace_id"].str.endswith(model_a_suffix)] model_b_row = group[group["trace_id"].str.endswith(model_b_suffix)] if model_a_row["is_correct"].values[0] is None: model_a_valid_count = 0 else: model_a_valid_count = sum(model_a_row["is_correct"].values[0].values()) if model_b_row["is_correct"].values[0] is None: model_b_valid_count = 0 else: model_b_valid_count = sum(model_b_row["is_correct"].values[0].values()) if model_a_valid_count > model_b_valid_count: paired_validity_results["model_a_more_valid"] += 1 elif model_a_valid_count < model_b_valid_count: paired_validity_results["model_b_more_valid"] += 1 else: paired_validity_results["equal_valid"] += 1 return paired_validity_results def paired_comparison_performance( df: DataFrame, model_a_suffix: str = "EXP0", model_b_suffix: str = "EXP1" ) -> Dict[str, Any]: paired_results = {"model_a_better": 0, "equal": 0, "model_b_better": 0} # Group by the trace_id without the suffix grouped = df.groupby(df["trace_id"].str[:-4]) for _, group in grouped: if len(group) == 2: model_a_row = group[group["trace_id"].str.endswith(model_a_suffix)] model_b_row = group[group["trace_id"].str.endswith(model_b_suffix)] model_a_speedup = model_a_row["best_correct_speedup_ratio"].values[0] model_b_speedup = model_b_row["best_correct_speedup_ratio"].values[0] if pd.isna(model_a_speedup) and not pd.isna(model_b_speedup): paired_results["model_b_better"] += 1 elif not pd.isna(model_a_speedup) and pd.isna(model_b_speedup): paired_results["model_a_better"] += 1 elif pd.isna(model_a_speedup) and pd.isna(model_b_speedup): # If both are NaN, do nothing pass elif model_a_speedup > model_b_speedup: paired_results["model_a_better"] += 1 elif model_a_speedup < model_b_speedup: paired_results["model_b_better"] += 1 else: paired_results["equal"] += 1 return paired_results def augment_with_best_correct_speedup_ratio(df: DataFrame) -> DataFrame: # Extract the best speedup ratio from the speedup_ratio dictionary, accounting for empty dictionaries def get_best_correct_speedup_ratio( speedup_ratios: Dict[str, float], is_correct: Dict[str, bool] ) -> Optional[float]: correct_speedup_ratios = ( { uuid: ratio for uuid, ratio in speedup_ratios.items() if is_correct.get(uuid) } if speedup_ratios is not None else {} ) if correct_speedup_ratios: return max(correct_speedup_ratios.values()) return None df["best_correct_speedup_ratio"] = df.apply( lambda row: get_best_correct_speedup_ratio( row["speedup_ratio"], row["is_correct"] ) if row["speedup_ratio"] is not None else None, axis=1, ) return df def main() -> None: df = load_data("test_gemini_aseem_apr16") non_orphan_ids = remove_orphans(df) df = df.iloc[non_orphan_ids] # df = process_column_pairs(df, "metadata") # df = process_column_pairs(df, "test_framework") # df = process_column_pairs(df, "generated_test") df = augment_with_best_correct_speedup_ratio(df) exp0_df = df[df["trace_id"].str.endswith("EXP0")] exp1_df = df[df["trace_id"].str.endswith("EXP1")] # Calculate metrics for each experiment exp0_performance_metrics = calculate_performance(exp0_df) exp1_performance_metrics = calculate_performance(exp1_df) exp0_validity_metrics = calculate_validity(exp0_df) exp1_validity_metrics = calculate_validity(exp1_df) exp0_coverage_metrics = calculate_coverage(exp0_df) exp1_coverage_metrics = calculate_coverage(exp1_df) paired_performance_metrics = paired_comparison_performance(df) paired_validity_metrics = paired_comparison_validity(df) paired_coverage_metrics = paired_comparison_coverage(df) exp0_pr_metrics = {"total_prs": exp0_df["pull_request"].count()} exp1_pr_metrics = {"total_prs": exp1_df["pull_request"].count()} exp0_speedup_metrics = { "speedup5+": exp0_df[(exp0_df["best_correct_speedup_ratio"] >= 0.05)].shape[0] } exp1_speedup_metrics = { "speedup5+": exp1_df[(exp1_df["best_correct_speedup_ratio"] >= 0.05)].shape[0] } exp0_or_metrics = {"total_or": exp0_df["original_runtime"].count()} exp1_or_metrics = {"total_or": exp1_df["original_runtime"].count()} pd.options.display.float_format = "{:.2f}".format # Combine metrics into a DataFrame metrics_df = pd.DataFrame( { "EXP0": { **exp0_performance_metrics, **exp0_validity_metrics, **exp0_coverage_metrics, **exp0_pr_metrics, **exp0_speedup_metrics, **exp0_or_metrics, }, "EXP1": { **exp1_performance_metrics, **exp1_validity_metrics, **exp1_coverage_metrics, **exp1_pr_metrics, **exp1_speedup_metrics, **exp1_or_metrics, }, } ) # Transpose to have experiments as rows and metrics as columns # Output the combined metrics DataFrame print(metrics_df) print(paired_performance_metrics) print(paired_validity_metrics) print(paired_coverage_metrics) if __name__ == "__main__": main()