"""FastAPI webhook server — thin dispatcher for CI mode. Receives GitHub webhooks, clones the repo, writes event metadata to ``.codeflash/ci-context.json``, and invokes the agent. The agent handles all GitHub interactions via ``gh`` CLI. """ from __future__ import annotations import asyncio import json import logging from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any import httpx import uvicorn from cachetools import TTLCache from fastapi import FastAPI, Header, HTTPException, Request from .agents import run_agent from .auth import get_installation_token, verify_signature from .config import Config from .git import clone_repo if TYPE_CHECKING: from collections.abc import AsyncIterator, Callable, Coroutine logging.basicConfig( level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s", ) log = logging.getLogger(__name__) _seen_deliveries: TTLCache[str, bool] = TTLCache(maxsize=4096, ttl=3600) @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: """Initialize shared state on startup, clean up on shutdown.""" cfg = Config() running_tasks: set[asyncio.Task[None]] = set() cfg.workspace_dir.mkdir(parents=True, exist_ok=True, mode=0o700) async with httpx.AsyncClient( headers={"Accept": "application/vnd.github+json"}, timeout=30.0, ) as http_client: app.state.config = cfg app.state.http_client = http_client app.state.running_tasks = running_tasks yield if running_tasks: log.info( "Draining %d background tasks...", len(running_tasks), ) await asyncio.gather( *running_tasks, return_exceptions=True, ) app = FastAPI(title="codeflash-service", lifespan=lifespan) def _write_ci_context(repo_dir: str, context: dict[str, Any]) -> None: """Write ``.codeflash/ci-context.json`` into the cloned repo.""" from pathlib import Path ci_dir = Path(repo_dir) / ".codeflash" ci_dir.mkdir(parents=True, exist_ok=True) (ci_dir / "ci-context.json").write_text( json.dumps(context, indent=2), ) @app.post("/webhook") async def webhook( request: Request, x_github_event: str = Header(..., alias="X-GitHub-Event"), x_hub_signature_256: str = Header( ..., alias="X-Hub-Signature-256", ), x_github_delivery: str = Header( ..., alias="X-GitHub-Delivery", ), ) -> dict[str, str]: """Receive and dispatch GitHub webhook events.""" body = await request.body() cfg: Config = request.app.state.config if not verify_signature(body, x_hub_signature_256, cfg.webhook_secret): raise HTTPException(status_code=401, detail="Invalid signature") if x_github_delivery in _seen_deliveries: log.info("Duplicate delivery %s, skipping", x_github_delivery) return {"status": "duplicate", "delivery": x_github_delivery} _seen_deliveries[x_github_delivery] = True payload = await request.json() log.info( "Event %s delivery=%s action=%s", x_github_event, x_github_delivery, payload.get("action"), ) handler = EVENT_HANDLERS.get(x_github_event) if handler is None: return {"status": "ignored", "event": x_github_event} http_client: httpx.AsyncClient = request.app.state.http_client running_tasks: set[asyncio.Task[None]] = request.app.state.running_tasks task = asyncio.create_task( safe_handle( handler, payload, config=cfg, http_client=http_client, ), ) running_tasks.add(task) task.add_done_callback(running_tasks.discard) return {"status": "accepted", "event": x_github_event} async def safe_handle( handler: Callable[..., Coroutine[Any, Any, None]], payload: dict[str, Any], **kwargs: object, ) -> None: """Run a handler, catching and logging any exceptions.""" try: await handler(payload, **kwargs) except Exception: log.exception("Handler failed for event") async def dispatch_issues( payload: dict[str, Any], *, config: Config, http_client: httpx.AsyncClient, **_: object, ) -> None: """Handle issues events — write context and invoke agent.""" action = payload.get("action") if action not in {"opened", "labeled"}: return repo_info = payload["repository"] owner = repo_info["owner"]["login"] repo = repo_info["name"] installation_id = payload["installation"]["id"] token = await get_installation_token( config, installation_id, client=http_client, ) repo_dir = await clone_repo( owner, repo, repo_info["default_branch"], token, config.workspace_dir, ) _write_ci_context( str(repo_dir), { "event_type": "issues", "action": action, "owner": owner, "repo": repo, "number": payload["issue"]["number"], }, ) await run_agent(config, repo_dir, token) log.info("Agent handled issue %s/%s#%d", owner, repo, payload["issue"]["number"]) async def dispatch_pr( payload: dict[str, Any], *, config: Config, http_client: httpx.AsyncClient, **_: object, ) -> None: """Handle pull_request events — write context and invoke agent.""" action = payload.get("action") if action not in {"opened", "synchronize"}: return pr = payload["pull_request"] repo_info = payload["repository"] owner = repo_info["owner"]["login"] repo = repo_info["name"] installation_id = payload["installation"]["id"] token = await get_installation_token( config, installation_id, client=http_client, ) repo_dir = await clone_repo( owner, repo, pr["head"]["ref"], token, config.workspace_dir, ) _write_ci_context( str(repo_dir), { "event_type": "pull_request", "action": action, "owner": owner, "repo": repo, "number": pr["number"], "base_ref": pr["base"]["ref"], "head_ref": pr["head"]["ref"], }, ) ci_prompt = ( "AUTONOMOUS MODE: Do NOT ask the user any questions — work fully " "autonomously. Make all decisions yourself: generate a run tag from " "today's date, identify benchmark tiers from available tests, choose " "optimization targets from profiler output. If something is ambiguous, " "pick the reasonable default and document your choice in HANDOFF.md.\n\n" "Optimize the Python code in this repository. This is a CI run " f"triggered by PR #{pr['number']} " f"({pr['head']['ref']} → {pr['base']['ref']}).\n\n" "After optimization is complete, commit your changes and push to the " f"PR branch: git push origin HEAD:{pr['head']['ref']}\n\n" "Follow the full pipeline: setup, unified profiling, experiment loop " "with benchmarks, verification, pre-submit review, and adversarial " "review. Do not skip steps." ) await run_agent(config, repo_dir, token, agent="codeflash-deep", prompt=ci_prompt) log.info("Agent handled PR %s/%s#%d", owner, repo, pr["number"]) async def dispatch_push( payload: dict[str, Any], *, config: Config, http_client: httpx.AsyncClient, **_: object, ) -> None: """Handle push events — write context and invoke agent.""" repo_info = payload["repository"] owner = repo_info["owner"]["login"] repo = repo_info["name"] ref = payload.get("ref", "") default_branch = repo_info.get("default_branch", "main") installation_id = payload["installation"]["id"] # Only process pushes to the default branch. if ref != f"refs/heads/{default_branch}": return token = await get_installation_token( config, installation_id, client=http_client, ) repo_dir = await clone_repo( owner, repo, default_branch, token, config.workspace_dir, ) _write_ci_context( str(repo_dir), { "event_type": "push", "action": None, "owner": owner, "repo": repo, "head_sha": payload.get("after", ""), "ref": ref, }, ) await run_agent(config, repo_dir, token) log.info("Agent handled push to %s/%s ref=%s", owner, repo, ref) EVENT_HANDLERS: dict[ str, Callable[..., Coroutine[Any, Any, None]], ] = { "issues": dispatch_issues, "pull_request": dispatch_pr, "push": dispatch_push, } @app.get("/health") async def health() -> dict[str, str]: """Health check endpoint.""" return {"status": "ok"} def main() -> None: """Entry point for the codeflash-service server.""" startup_cfg = Config() uvicorn.run( "github_app.app:app", host=startup_cfg.host, port=startup_cfg.port, log_level="info", ) if __name__ == "__main__": main()