# Unified CPU + Memory + GC profiling script for the primary optimizer. # This is the MANDATORY first step — gives the cross-domain view that # single-domain agents lack. # # Usage: # $RUNNER /tmp/deep_profile.py -- [args...] # # Examples: # $RUNNER /tmp/deep_profile.py src -- pytest tests/test_pipeline.py -x # $RUNNER /tmp/deep_profile.py mypackage -- python -c "from mypackage import run; run()" # $RUNNER /tmp/deep_profile.py . -- python scripts/benchmark.py # # is the directory containing project source code. Only # functions under this path appear in the CPU report. Read this from # .codeflash/setup.md ("Project root" + the package directory), or pass # "." to include everything. # # Everything after "--" is the command to profile. The script runs it as # a subprocess under cProfile + tracemalloc + GC tracking. import cProfile import gc import os import pstats import subprocess import sys import tempfile import time import tracemalloc BASELINE_PATH = "/tmp/deep_baseline_total" def parse_args(args): """Parse -- from argv.""" if "--" not in args or len(args) < 3: # noqa: PLR2004 print( "Usage: python deep_profile.py -- [args...]", file=sys.stderr, ) print( "Example: python deep_profile.py src -- pytest tests/ -x", file=sys.stderr, ) sys.exit(1) sep = args.index("--") source_root = os.path.abspath(args[sep - 1]) cmd = args[sep + 1 :] if not os.path.isdir(source_root): # noqa: PTH112 print( f"Error: source root '{source_root}' is not a directory", file=sys.stderr, ) sys.exit(1) return source_root, cmd def profile_command(cmd): """Run cmd under cProfile + tracemalloc + GC tracking.""" gc_times = [] def gc_callback(phase, _info): if phase == "start": gc_callback._start = time.perf_counter() elif phase == "stop": gc_times.append(time.perf_counter() - gc_callback._start) gc.callbacks.append(gc_callback) tracemalloc.start() profiler = cProfile.Profile() profiler.enable() result = subprocess.run(cmd, check=False) # noqa: S603 profiler.disable() mem_snapshot = tracemalloc.take_snapshot() with tempfile.NamedTemporaryFile( suffix=".prof", prefix="deep_cpu_", delete=False ) as prof_file: prof_path = prof_file.name profiler.dump_stats(prof_path) return result, mem_snapshot, gc_times, prof_path def report_results(source_root, mem_snapshot, gc_times, prof_path): """Print unified profile report.""" print("\n" + "=" * 60) print("UNIFIED PROFILE RESULTS") print("=" * 60) # Memory top allocators print("\n=== MEMORY: Top allocators ===") for stat in mem_snapshot.statistics("lineno")[:15]: print(stat) # GC impact total_gc = sum(gc_times) print(f"\n=== GC: {len(gc_times)} collections, {total_gc:.3f}s total ===") # CPU top functions (project-only) print(f"\n=== CPU: Top project functions (source root: {source_root}) ===") p = pstats.Stats(prof_path) project_funcs = [] for (file, line, name), (cc, nc, tt, ct, callers) in p.stats.items(): if not os.path.abspath(file).startswith(source_root): continue project_funcs.append((ct, tt, nc, name, file, line)) project_funcs.sort(reverse=True) total = project_funcs[0][0] if project_funcs else 1 # Baseline delta tracking if not os.path.exists(BASELINE_PATH): with open(BASELINE_PATH, "w") as f: f.write(str(total)) print(f" (baseline recorded: {total:.3f}s)") else: with open(BASELINE_PATH) as f: baseline = float(f.read().strip()) delta = (total - baseline) / baseline * 100 print( f" (baseline: {baseline:.3f}s, current: {total:.3f}s, delta: {delta:+.1f}%)" ) for ct, tt, nc, name, file, line in project_funcs[:15]: pct = ct / total * 100 relpath = os.path.relpath(file) print( f" {name:30s} {pct:5.1f}% cumtime {tt:.3f}s self {nc:>6d} calls {relpath}:{line}" ) def main(): source_root, cmd = parse_args(sys.argv[1:]) result, mem_snapshot, gc_times, prof_path = profile_command(cmd) report_results(source_root, mem_snapshot, gc_times, prof_path) sys.exit(result.returncode) if __name__ == "__main__": main()