codeflash-agent/.codeflash/netflix/metaflow/infra/cloud-init.yaml
Kevin Turcios 3b59d97647 squash
2026-04-13 14:12:17 -05:00

560 lines
19 KiB
YAML

#cloud-config
#
# Benchmark VM provisioning for Netflix/metaflow
#
# Pure Python workflow framework -- targets: content_addressed_store (gzip, SHA1),
# multicore_utils (sleep polling). Python 3.12, pip editable install.
#
# Usage:
# az vm create ... --custom-data infra/cloud-init.yaml
#
# VM: Azure Standard_D2s_v5 (2 vCPU, 8 GB RAM, non-burstable)
# Non-burstable ensures consistent CPU -- no thermal throttling or turbo variability.
package_update: true
packages:
- git
- build-essential
- curl
- wget
- jq
write_files:
# --- Benchmark: content_addressed_store (gzip + SHA1) ---
- path: /home/azureuser/bench/bench_cas.py
owner: azureuser:azureuser
permissions: "0755"
defer: true
content: |
#!/usr/bin/env python3
"""
Benchmark content_addressed_store hot paths: gzip compression/decompression
and SHA1 hashing at various blob sizes.
Outputs JSON results to ~/results/cas/.
"""
import gzip
import hashlib
import json
import os
import time
from io import BytesIO
OUTDIR = os.path.expanduser("~/results/cas")
os.makedirs(OUTDIR, exist_ok=True)
# Simulate realistic artifact sizes: small (1KB pickled scalar),
# medium (100KB pickled array), large (10MB pickled dataframe)
BLOB_SIZES = {
"1KB": 1_000,
"10KB": 10_000,
"100KB": 100_000,
"1MB": 1_000_000,
"10MB": 10_000_000,
}
ITERATIONS = {
"1KB": 5000,
"10KB": 2000,
"100KB": 500,
"1MB": 50,
"10MB": 10,
}
def make_blob(size):
"""Pseudo-random blob (compressible, like pickled Python objects)."""
import random
random.seed(42)
# Mix of structured + random bytes to mimic pickled data
pattern = bytes(range(256)) * (size // 256 + 1)
return pattern[:size]
def bench_sha1(blob, iterations):
start = time.perf_counter()
for _ in range(iterations):
hashlib.sha1(blob).hexdigest()
elapsed = time.perf_counter() - start
return elapsed / iterations
def bench_gzip_compress(blob, iterations, level=3):
start = time.perf_counter()
for _ in range(iterations):
buf = BytesIO()
with gzip.GzipFile(fileobj=buf, mode="wb", compresslevel=level) as f:
f.write(blob)
buf.seek(0)
_ = buf.read()
elapsed = time.perf_counter() - start
return elapsed / iterations
def bench_gzip_decompress(compressed, iterations):
start = time.perf_counter()
for _ in range(iterations):
with gzip.GzipFile(fileobj=BytesIO(compressed), mode="rb") as f:
f.read()
elapsed = time.perf_counter() - start
return elapsed / iterations
def bench_zlib_compress(blob, iterations, level=3):
import zlib
start = time.perf_counter()
for _ in range(iterations):
zlib.compress(blob, level)
elapsed = time.perf_counter() - start
return elapsed / iterations
def bench_zlib_decompress(compressed, iterations):
import zlib
start = time.perf_counter()
for _ in range(iterations):
zlib.decompress(compressed)
elapsed = time.perf_counter() - start
return elapsed / iterations
def main():
results = {}
for label, size in BLOB_SIZES.items():
iters = ITERATIONS[label]
blob = make_blob(size)
print(f"\n=== {label} blob ({len(blob)} bytes, {iters} iterations) ===")
# SHA1
sha1_time = bench_sha1(blob, iters)
print(f" SHA1: {sha1_time*1000:.3f} ms/op")
# Gzip compress (current: level 3)
gzip_c_time = bench_gzip_compress(blob, iters, level=3)
print(f" gzip compress L3: {gzip_c_time*1000:.3f} ms/op")
# Gzip compress level 1 (fastest)
gzip_c1_time = bench_gzip_compress(blob, iters, level=1)
print(f" gzip compress L1: {gzip_c1_time*1000:.3f} ms/op")
# Prepare compressed blob for decompression bench
buf = BytesIO()
with gzip.GzipFile(fileobj=buf, mode="wb", compresslevel=3) as f:
f.write(blob)
compressed = buf.getvalue()
ratio = len(compressed) / len(blob)
print(f" compression ratio: {ratio:.3f} ({len(compressed)} bytes)")
# Gzip decompress
gzip_d_time = bench_gzip_decompress(compressed, iters)
print(f" gzip decompress: {gzip_d_time*1000:.3f} ms/op")
# zlib compress (no gzip header overhead)
import zlib
zlib_c_time = bench_zlib_compress(blob, iters, level=3)
print(f" zlib compress L3: {zlib_c_time*1000:.3f} ms/op")
zlib_compressed = zlib.compress(blob, 3)
zlib_d_time = bench_zlib_decompress(zlib_compressed, iters)
print(f" zlib decompress: {zlib_d_time*1000:.3f} ms/op")
results[label] = {
"blob_bytes": len(blob),
"iterations": iters,
"sha1_ms": round(sha1_time * 1000, 4),
"gzip_compress_L3_ms": round(gzip_c_time * 1000, 4),
"gzip_compress_L1_ms": round(gzip_c1_time * 1000, 4),
"gzip_decompress_ms": round(gzip_d_time * 1000, 4),
"gzip_compressed_bytes": len(compressed),
"gzip_ratio": round(ratio, 4),
"zlib_compress_L3_ms": round(zlib_c_time * 1000, 4),
"zlib_decompress_ms": round(zlib_d_time * 1000, 4),
}
# Try optional fast alternatives if available
try:
import xxhash
print("\n=== xxhash available ===")
for label, size in BLOB_SIZES.items():
iters = ITERATIONS[label]
blob = make_blob(size)
start = time.perf_counter()
for _ in range(iters):
xxhash.xxh64(blob).hexdigest()
elapsed = time.perf_counter() - start
xxh_time = elapsed / iters
results[label]["xxh64_ms"] = round(xxh_time * 1000, 4)
sha1_ms = results[label]["sha1_ms"]
print(f" {label}: xxh64={xxh_time*1000:.3f} ms vs sha1={sha1_ms:.3f} ms ({sha1_ms/xxh_time/1000*1000:.1f}x faster)")
except ImportError:
print("\n xxhash not installed, skipping")
try:
import lz4.frame
print("\n=== lz4 available ===")
for label, size in BLOB_SIZES.items():
iters = ITERATIONS[label]
blob = make_blob(size)
start = time.perf_counter()
for _ in range(iters):
lz4.frame.compress(blob)
elapsed = time.perf_counter() - start
lz4_c_time = elapsed / iters
lz4_compressed = lz4.frame.compress(blob)
start = time.perf_counter()
for _ in range(iters):
lz4.frame.decompress(lz4_compressed)
elapsed = time.perf_counter() - start
lz4_d_time = elapsed / iters
lz4_ratio = len(lz4_compressed) / len(blob)
results[label]["lz4_compress_ms"] = round(lz4_c_time * 1000, 4)
results[label]["lz4_decompress_ms"] = round(lz4_d_time * 1000, 4)
results[label]["lz4_ratio"] = round(lz4_ratio, 4)
gzip_ms = results[label]["gzip_compress_L3_ms"]
print(f" {label}: lz4={lz4_c_time*1000:.3f} ms vs gzip={gzip_ms:.3f} ms (ratio: lz4={lz4_ratio:.3f} vs gzip={results[label]['gzip_ratio']:.3f})")
except ImportError:
print("\n lz4 not installed, skipping")
with open(os.path.join(OUTDIR, "baseline.json"), "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {OUTDIR}/baseline.json")
if __name__ == "__main__":
main()
# --- Benchmark: multicore_utils (polling overhead) ---
- path: /home/azureuser/bench/bench_multicore.py
owner: azureuser:azureuser
permissions: "0755"
defer: true
content: |
#!/usr/bin/env python3
"""
Benchmark multicore_utils polling overhead.
Measures wall-clock time for parallel_map with trivial vs real workloads
to isolate the polling/sleep overhead from actual work.
"""
import json
import os
import sys
import time
sys.path.insert(0, os.path.expanduser("~/metaflow"))
from metaflow.multicore_utils import parallel_map, parallel_imap_unordered
OUTDIR = os.path.expanduser("~/results/multicore")
os.makedirs(OUTDIR, exist_ok=True)
def noop(x):
return x
def sleep_10ms(x):
time.sleep(0.01)
return x
def cpu_work(x):
"""~5ms of CPU work."""
total = 0
for i in range(100_000):
total += i * i
return total
def main():
results = {}
# Trivial workload -- exposes polling overhead
for n_items in [4, 16, 64]:
items = list(range(n_items))
# noop: all overhead is fork + polling
start = time.perf_counter()
parallel_map(noop, items, max_parallel=2)
noop_time = time.perf_counter() - start
# cpu_work: real work dominates
start = time.perf_counter()
parallel_map(cpu_work, items, max_parallel=2)
cpu_time = time.perf_counter() - start
overhead_pct = (noop_time / cpu_time) * 100 if cpu_time > 0 else 0
print(f"n={n_items}: noop={noop_time:.3f}s, cpu_work={cpu_time:.3f}s, overhead={overhead_pct:.1f}%")
results[f"n{n_items}"] = {
"items": n_items,
"noop_s": round(noop_time, 4),
"cpu_work_s": round(cpu_time, 4),
"overhead_pct": round(overhead_pct, 2),
}
# Sleep workload -- isolates polling gap
for n_items in [4, 16]:
items = list(range(n_items))
start = time.perf_counter()
parallel_map(sleep_10ms, items, max_parallel=2)
sleep_time = time.perf_counter() - start
ideal = (n_items / 2) * 0.01 # perfect parallelism
gap = sleep_time - ideal
print(f"n={n_items} sleep_10ms: actual={sleep_time:.3f}s, ideal={ideal:.3f}s, gap={gap:.3f}s")
results[f"n{n_items}_sleep"] = {
"items": n_items,
"actual_s": round(sleep_time, 4),
"ideal_s": round(ideal, 4),
"gap_s": round(gap, 4),
}
with open(os.path.join(OUTDIR, "baseline.json"), "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {OUTDIR}/baseline.json")
if __name__ == "__main__":
main()
# --- Benchmark: end-to-end save/load via CAS ---
- path: /home/azureuser/bench/bench_cas_e2e.py
owner: azureuser:azureuser
permissions: "0755"
defer: true
content: |
#!/usr/bin/env python3
"""
End-to-end benchmark of ContentAddressedStore save_blobs / load_blobs
using the local filesystem storage backend.
This tests the full pipeline: SHA1 hash -> dedup check -> gzip -> write -> read -> gunzip.
"""
import json
import os
import shutil
import sys
import tempfile
import time
sys.path.insert(0, os.path.expanduser("~/metaflow"))
from metaflow.datastore.content_addressed_store import ContentAddressedStore
from metaflow.plugins.datastores.local_storage import LocalStorage
OUTDIR = os.path.expanduser("~/results/cas_e2e")
os.makedirs(OUTDIR, exist_ok=True)
BLOB_SIZES = {
"1KB": 1_000,
"100KB": 100_000,
"1MB": 1_000_000,
"10MB": 10_000_000,
}
ITERATIONS = {
"1KB": 200,
"100KB": 100,
"1MB": 20,
"10MB": 5,
}
def make_blob(size):
import random
random.seed(42)
pattern = bytes(range(256)) * (size // 256 + 1)
return pattern[:size]
def main():
results = {}
for label, size in BLOB_SIZES.items():
iters = ITERATIONS[label]
blob = make_blob(size)
print(f"\n=== {label} blob ({len(blob)} bytes, {iters} iterations) ===")
save_times = []
load_times = []
for i in range(iters):
tmpdir = tempfile.mkdtemp(prefix="cas_bench_")
try:
storage = LocalStorage(tmpdir)
cas = ContentAddressedStore("cas", storage)
# Use unique blobs to avoid dedup short-circuit
unique_blob = blob + i.to_bytes(4, "big")
# Save
start = time.perf_counter()
result = cas.save_blobs(iter([unique_blob]))
save_elapsed = time.perf_counter() - start
save_times.append(save_elapsed)
key = result[0].key
# Load
start = time.perf_counter()
loaded = list(cas.load_blobs([key]))
load_elapsed = time.perf_counter() - start
load_times.append(load_elapsed)
# Verify correctness
assert loaded[0][1] == unique_blob, "Data mismatch!"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
avg_save = sum(save_times) / len(save_times)
avg_load = sum(load_times) / len(load_times)
print(f" save: {avg_save*1000:.3f} ms/op")
print(f" load: {avg_load*1000:.3f} ms/op")
print(f" total: {(avg_save+avg_load)*1000:.3f} ms/op")
results[label] = {
"blob_bytes": len(blob),
"iterations": iters,
"save_ms": round(avg_save * 1000, 4),
"load_ms": round(avg_load * 1000, 4),
"total_ms": round((avg_save + avg_load) * 1000, 4),
}
with open(os.path.join(OUTDIR, "baseline.json"), "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {OUTDIR}/baseline.json")
if __name__ == "__main__":
main()
# --- Benchmark: run all baselines ---
- path: /home/azureuser/bench/bench_baseline.sh
owner: azureuser:azureuser
permissions: "0755"
defer: true
content: |
#!/usr/bin/env bash
set -euo pipefail
PYTHON=~/metaflow/.venv/bin/python
echo "=== Running all baseline benchmarks ==="
echo ""
echo "--- CAS microbenchmarks (gzip, SHA1, alternatives) ---"
$PYTHON ~/bench/bench_cas.py
echo ""
echo "--- CAS end-to-end (save_blobs / load_blobs) ---"
$PYTHON ~/bench/bench_cas_e2e.py
echo ""
echo "--- Multicore utils (polling overhead) ---"
$PYTHON ~/bench/bench_multicore.py
echo ""
echo "=== All baselines complete ==="
echo "Results in ~/results/"
find ~/results/ -name "*.json" -exec echo {} \;
# --- Benchmark: A/B branch comparison ---
- path: /home/azureuser/bench/bench_compare.sh
owner: azureuser:azureuser
permissions: "0755"
defer: true
content: |
#!/usr/bin/env bash
set -euo pipefail
BRANCH="${1:?Usage: bench_compare.sh <branch-or-commit>}"
TS=$(date +%Y%m%d-%H%M%S)
OUTDIR="$HOME/results/${BRANCH//\//-}-${TS}"
mkdir -p "$OUTDIR"
PYTHON=~/metaflow/.venv/bin/python
cd ~/metaflow
git fetch origin
git checkout "$BRANCH"
# Reinstall after switching branches
$PYTHON -m pip install -e . -q
echo "=== Benchmarking branch: $BRANCH ==="
$PYTHON ~/bench/bench_cas.py
cp ~/results/cas/baseline.json "$OUTDIR/cas.json"
$PYTHON ~/bench/bench_cas_e2e.py
cp ~/results/cas_e2e/baseline.json "$OUTDIR/cas_e2e.json"
$PYTHON ~/bench/bench_multicore.py
cp ~/results/multicore/baseline.json "$OUTDIR/multicore.json"
echo ""
echo "Results saved to $OUTDIR/"
ls -la "$OUTDIR/"
# --- Benchmark: side-by-side two branches ---
- path: /home/azureuser/bench/bench_ab.sh
owner: azureuser:azureuser
permissions: "0755"
defer: true
content: |
#!/usr/bin/env bash
set -euo pipefail
BASE="${1:?Usage: bench_ab.sh <base-branch> <opt-branch>}"
OPT="${2:?Usage: bench_ab.sh <base-branch> <opt-branch>}"
echo "=== A/B comparison: $BASE vs $OPT ==="
bash ~/bench/bench_compare.sh "$BASE"
bash ~/bench/bench_compare.sh "$OPT"
echo ""
echo "Compare results in ~/results/"
ls ~/results/
# --- Unit test runner ---
- path: /home/azureuser/bench/run_tests.sh
owner: azureuser:azureuser
permissions: "0755"
defer: true
content: |
#!/usr/bin/env bash
set -euo pipefail
PYTHON=~/metaflow/.venv/bin/python
cd ~/metaflow
echo "=== Running metaflow unit tests ==="
$PYTHON -m pytest test/unit/ -v --tb=short --timeout=120 -m "not docker" "$@"
# --- Setup script (runs once via runcmd) ---
- path: /home/azureuser/setup.sh
owner: azureuser:azureuser
permissions: "0755"
defer: true
content: |
#!/usr/bin/env bash
set -euo pipefail
echo "=== Cloning metaflow ==="
git clone https://github.com/KRRT7/metaflow.git ~/metaflow
cd ~/metaflow
git remote add upstream https://github.com/Netflix/metaflow.git
echo "=== Installing Python 3.12 venv ==="
sudo apt-get install -y python3.12-venv python3-pip
python3 -m venv .venv
.venv/bin/pip install --upgrade pip
echo "=== Installing metaflow (editable) ==="
.venv/bin/pip install -e ".[dev]" || .venv/bin/pip install -e .
.venv/bin/pip install pytest pytest-timeout
echo "=== Installing benchmark dependencies ==="
.venv/bin/pip install xxhash lz4
echo "=== Creating results directory ==="
mkdir -p ~/results
echo "=== Verifying installation ==="
.venv/bin/python -c 'import metaflow; print("metaflow OK:", metaflow.__version__)'
echo "=== Running baseline benchmarks ==="
bash ~/bench/bench_baseline.sh
echo "=== Done ==="
runcmd:
- wget -q https://github.com/sharkdp/hyperfine/releases/download/v1.19.0/hyperfine_1.19.0_amd64.deb -O /tmp/hyperfine.deb
- dpkg -i /tmp/hyperfine.deb
- su - azureuser -c 'bash /home/azureuser/setup.sh'