codeflash-agent/packages/github-app/github_app/app.py
Kevin Turcios 2caaf6af7c
Fix CI: mypy errors, ruff formatting, switch to prek (#22)
* Fix mypy errors and apply ruff formatting across packages

Fix ast.FunctionDef calls missing type_params for Python 3.12+,
correct type: ignore error codes in _comparator and _plugin, and
run ruff format on all package source and test files.

* Switch CI to prek for lint/typecheck checks

Use j178/prek-action for consistent lint+typecheck (ruff check,
ruff format, interrogate, mypy) matching local pre-commit config.
Keep test as a separate parallel job for test-env support.
2026-04-15 02:52:47 -05:00

330 lines
9 KiB
Python

"""FastAPI webhook server — thin dispatcher for CI mode.
Receives GitHub webhooks, clones the repo, writes event metadata to
``.codeflash/ci-context.json``, and invokes the agent. The agent
handles all GitHub interactions via ``gh`` CLI.
"""
from __future__ import annotations
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any
import httpx
import uvicorn
from cachetools import TTLCache
from fastapi import FastAPI, Header, HTTPException, Request
from .agents import run_agent
from .auth import get_installation_token, verify_signature
from .config import Config
from .git import clone_repo
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Callable, Coroutine
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
log = logging.getLogger(__name__)
_seen_deliveries: TTLCache[str, bool] = TTLCache(maxsize=4096, ttl=3600)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""Initialize shared state on startup, clean up on shutdown."""
cfg = Config()
running_tasks: set[asyncio.Task[None]] = set()
cfg.workspace_dir.mkdir(parents=True, exist_ok=True, mode=0o700)
async with httpx.AsyncClient(
headers={"Accept": "application/vnd.github+json"},
timeout=30.0,
) as http_client:
app.state.config = cfg
app.state.http_client = http_client
app.state.running_tasks = running_tasks
yield
if running_tasks:
log.info(
"Draining %d background tasks...",
len(running_tasks),
)
await asyncio.gather(
*running_tasks,
return_exceptions=True,
)
app = FastAPI(title="codeflash-service", lifespan=lifespan)
def _write_ci_context(repo_dir: str, context: dict[str, Any]) -> None:
"""Write ``.codeflash/ci-context.json`` into the cloned repo."""
from pathlib import Path
ci_dir = Path(repo_dir) / ".codeflash"
ci_dir.mkdir(parents=True, exist_ok=True)
(ci_dir / "ci-context.json").write_text(
json.dumps(context, indent=2),
)
@app.post("/webhook")
async def webhook(
request: Request,
x_github_event: str = Header(..., alias="X-GitHub-Event"),
x_hub_signature_256: str = Header(
...,
alias="X-Hub-Signature-256",
),
x_github_delivery: str = Header(
...,
alias="X-GitHub-Delivery",
),
) -> dict[str, str]:
"""Receive and dispatch GitHub webhook events."""
body = await request.body()
cfg: Config = request.app.state.config
if not verify_signature(body, x_hub_signature_256, cfg.webhook_secret):
raise HTTPException(status_code=401, detail="Invalid signature")
if x_github_delivery in _seen_deliveries:
log.info("Duplicate delivery %s, skipping", x_github_delivery)
return {"status": "duplicate", "delivery": x_github_delivery}
_seen_deliveries[x_github_delivery] = True
payload = await request.json()
log.info(
"Event %s delivery=%s action=%s",
x_github_event,
x_github_delivery,
payload.get("action"),
)
handler = EVENT_HANDLERS.get(x_github_event)
if handler is None:
return {"status": "ignored", "event": x_github_event}
http_client: httpx.AsyncClient = request.app.state.http_client
running_tasks: set[asyncio.Task[None]] = request.app.state.running_tasks
task = asyncio.create_task(
safe_handle(
handler,
payload,
config=cfg,
http_client=http_client,
),
)
running_tasks.add(task)
task.add_done_callback(running_tasks.discard)
return {"status": "accepted", "event": x_github_event}
async def safe_handle(
handler: Callable[..., Coroutine[Any, Any, None]],
payload: dict[str, Any],
**kwargs: object,
) -> None:
"""Run a handler, catching and logging any exceptions."""
try:
await handler(payload, **kwargs)
except Exception:
log.exception("Handler failed for event")
async def dispatch_issues(
payload: dict[str, Any],
*,
config: Config,
http_client: httpx.AsyncClient,
**_: object,
) -> None:
"""Handle issues events — write context and invoke agent."""
action = payload.get("action")
if action not in {"opened", "labeled"}:
return
repo_info = payload["repository"]
owner = repo_info["owner"]["login"]
repo = repo_info["name"]
installation_id = payload["installation"]["id"]
token = await get_installation_token(
config,
installation_id,
client=http_client,
)
repo_dir = await clone_repo(
owner,
repo,
repo_info["default_branch"],
token,
config.workspace_dir,
)
_write_ci_context(
str(repo_dir),
{
"event_type": "issues",
"action": action,
"owner": owner,
"repo": repo,
"number": payload["issue"]["number"],
},
)
await run_agent(config, repo_dir, token)
log.info("Agent handled issue %s/%s#%d", owner, repo, payload["issue"]["number"])
async def dispatch_pr(
payload: dict[str, Any],
*,
config: Config,
http_client: httpx.AsyncClient,
**_: object,
) -> None:
"""Handle pull_request events — write context and invoke agent."""
action = payload.get("action")
if action not in {"opened", "synchronize"}:
return
pr = payload["pull_request"]
repo_info = payload["repository"]
owner = repo_info["owner"]["login"]
repo = repo_info["name"]
installation_id = payload["installation"]["id"]
token = await get_installation_token(
config,
installation_id,
client=http_client,
)
repo_dir = await clone_repo(
owner,
repo,
pr["head"]["ref"],
token,
config.workspace_dir,
)
_write_ci_context(
str(repo_dir),
{
"event_type": "pull_request",
"action": action,
"owner": owner,
"repo": repo,
"number": pr["number"],
"base_ref": pr["base"]["ref"],
"head_ref": pr["head"]["ref"],
},
)
ci_prompt = (
"AUTONOMOUS MODE: Do NOT ask the user any questions — work fully "
"autonomously. Make all decisions yourself: generate a run tag from "
"today's date, identify benchmark tiers from available tests, choose "
"optimization targets from profiler output. If something is ambiguous, "
"pick the reasonable default and document your choice in HANDOFF.md.\n\n"
"Optimize the Python code in this repository. This is a CI run "
f"triggered by PR #{pr['number']} "
f"({pr['head']['ref']}{pr['base']['ref']}).\n\n"
"After optimization is complete, commit your changes and push to the "
f"PR branch: git push origin HEAD:{pr['head']['ref']}\n\n"
"Follow the full pipeline: setup, unified profiling, experiment loop "
"with benchmarks, verification, pre-submit review, and adversarial "
"review. Do not skip steps."
)
await run_agent(config, repo_dir, token, agent="codeflash-deep", prompt=ci_prompt)
log.info("Agent handled PR %s/%s#%d", owner, repo, pr["number"])
async def dispatch_push(
payload: dict[str, Any],
*,
config: Config,
http_client: httpx.AsyncClient,
**_: object,
) -> None:
"""Handle push events — write context and invoke agent."""
repo_info = payload["repository"]
owner = repo_info["owner"]["login"]
repo = repo_info["name"]
ref = payload.get("ref", "")
default_branch = repo_info.get("default_branch", "main")
installation_id = payload["installation"]["id"]
# Only process pushes to the default branch.
if ref != f"refs/heads/{default_branch}":
return
token = await get_installation_token(
config,
installation_id,
client=http_client,
)
repo_dir = await clone_repo(
owner,
repo,
default_branch,
token,
config.workspace_dir,
)
_write_ci_context(
str(repo_dir),
{
"event_type": "push",
"action": None,
"owner": owner,
"repo": repo,
"head_sha": payload.get("after", ""),
"ref": ref,
},
)
await run_agent(config, repo_dir, token)
log.info("Agent handled push to %s/%s ref=%s", owner, repo, ref)
EVENT_HANDLERS: dict[
str,
Callable[..., Coroutine[Any, Any, None]],
] = {
"issues": dispatch_issues,
"pull_request": dispatch_pr,
"push": dispatch_push,
}
@app.get("/health")
async def health() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "ok"}
def main() -> None:
"""Entry point for the codeflash-service server."""
startup_cfg = Config()
uvicorn.run(
"github_app.app:app",
host=startup_cfg.host,
port=startup_cfg.port,
log_level="info",
)
if __name__ == "__main__":
main()