mirror of
https://github.com/codeflash-ai/codeflash-agent.git
synced 2026-05-04 18:25:19 +00:00
* Fix mypy errors and apply ruff formatting across packages Fix ast.FunctionDef calls missing type_params for Python 3.12+, correct type: ignore error codes in _comparator and _plugin, and run ruff format on all package source and test files. * Switch CI to prek for lint/typecheck checks Use j178/prek-action for consistent lint+typecheck (ruff check, ruff format, interrogate, mypy) matching local pre-commit config. Keep test as a separate parallel job for test-env support.
330 lines
9 KiB
Python
330 lines
9 KiB
Python
"""FastAPI webhook server — thin dispatcher for CI mode.
|
|
|
|
Receives GitHub webhooks, clones the repo, writes event metadata to
|
|
``.codeflash/ci-context.json``, and invokes the agent. The agent
|
|
handles all GitHub interactions via ``gh`` CLI.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from contextlib import asynccontextmanager
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
import httpx
|
|
import uvicorn
|
|
from cachetools import TTLCache
|
|
from fastapi import FastAPI, Header, HTTPException, Request
|
|
|
|
from .agents import run_agent
|
|
from .auth import get_installation_token, verify_signature
|
|
from .config import Config
|
|
from .git import clone_repo
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import AsyncIterator, Callable, Coroutine
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(name)s %(levelname)s %(message)s",
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
_seen_deliveries: TTLCache[str, bool] = TTLCache(maxsize=4096, ttl=3600)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
|
|
"""Initialize shared state on startup, clean up on shutdown."""
|
|
cfg = Config()
|
|
running_tasks: set[asyncio.Task[None]] = set()
|
|
|
|
cfg.workspace_dir.mkdir(parents=True, exist_ok=True, mode=0o700)
|
|
|
|
async with httpx.AsyncClient(
|
|
headers={"Accept": "application/vnd.github+json"},
|
|
timeout=30.0,
|
|
) as http_client:
|
|
app.state.config = cfg
|
|
app.state.http_client = http_client
|
|
app.state.running_tasks = running_tasks
|
|
yield
|
|
if running_tasks:
|
|
log.info(
|
|
"Draining %d background tasks...",
|
|
len(running_tasks),
|
|
)
|
|
await asyncio.gather(
|
|
*running_tasks,
|
|
return_exceptions=True,
|
|
)
|
|
|
|
|
|
app = FastAPI(title="codeflash-service", lifespan=lifespan)
|
|
|
|
|
|
def _write_ci_context(repo_dir: str, context: dict[str, Any]) -> None:
|
|
"""Write ``.codeflash/ci-context.json`` into the cloned repo."""
|
|
from pathlib import Path
|
|
|
|
ci_dir = Path(repo_dir) / ".codeflash"
|
|
ci_dir.mkdir(parents=True, exist_ok=True)
|
|
(ci_dir / "ci-context.json").write_text(
|
|
json.dumps(context, indent=2),
|
|
)
|
|
|
|
|
|
@app.post("/webhook")
|
|
async def webhook(
|
|
request: Request,
|
|
x_github_event: str = Header(..., alias="X-GitHub-Event"),
|
|
x_hub_signature_256: str = Header(
|
|
...,
|
|
alias="X-Hub-Signature-256",
|
|
),
|
|
x_github_delivery: str = Header(
|
|
...,
|
|
alias="X-GitHub-Delivery",
|
|
),
|
|
) -> dict[str, str]:
|
|
"""Receive and dispatch GitHub webhook events."""
|
|
body = await request.body()
|
|
cfg: Config = request.app.state.config
|
|
|
|
if not verify_signature(body, x_hub_signature_256, cfg.webhook_secret):
|
|
raise HTTPException(status_code=401, detail="Invalid signature")
|
|
|
|
if x_github_delivery in _seen_deliveries:
|
|
log.info("Duplicate delivery %s, skipping", x_github_delivery)
|
|
return {"status": "duplicate", "delivery": x_github_delivery}
|
|
_seen_deliveries[x_github_delivery] = True
|
|
|
|
payload = await request.json()
|
|
log.info(
|
|
"Event %s delivery=%s action=%s",
|
|
x_github_event,
|
|
x_github_delivery,
|
|
payload.get("action"),
|
|
)
|
|
|
|
handler = EVENT_HANDLERS.get(x_github_event)
|
|
if handler is None:
|
|
return {"status": "ignored", "event": x_github_event}
|
|
|
|
http_client: httpx.AsyncClient = request.app.state.http_client
|
|
running_tasks: set[asyncio.Task[None]] = request.app.state.running_tasks
|
|
|
|
task = asyncio.create_task(
|
|
safe_handle(
|
|
handler,
|
|
payload,
|
|
config=cfg,
|
|
http_client=http_client,
|
|
),
|
|
)
|
|
running_tasks.add(task)
|
|
task.add_done_callback(running_tasks.discard)
|
|
|
|
return {"status": "accepted", "event": x_github_event}
|
|
|
|
|
|
async def safe_handle(
|
|
handler: Callable[..., Coroutine[Any, Any, None]],
|
|
payload: dict[str, Any],
|
|
**kwargs: object,
|
|
) -> None:
|
|
"""Run a handler, catching and logging any exceptions."""
|
|
try:
|
|
await handler(payload, **kwargs)
|
|
except Exception:
|
|
log.exception("Handler failed for event")
|
|
|
|
|
|
async def dispatch_issues(
|
|
payload: dict[str, Any],
|
|
*,
|
|
config: Config,
|
|
http_client: httpx.AsyncClient,
|
|
**_: object,
|
|
) -> None:
|
|
"""Handle issues events — write context and invoke agent."""
|
|
action = payload.get("action")
|
|
if action not in {"opened", "labeled"}:
|
|
return
|
|
|
|
repo_info = payload["repository"]
|
|
owner = repo_info["owner"]["login"]
|
|
repo = repo_info["name"]
|
|
installation_id = payload["installation"]["id"]
|
|
|
|
token = await get_installation_token(
|
|
config,
|
|
installation_id,
|
|
client=http_client,
|
|
)
|
|
repo_dir = await clone_repo(
|
|
owner,
|
|
repo,
|
|
repo_info["default_branch"],
|
|
token,
|
|
config.workspace_dir,
|
|
)
|
|
|
|
_write_ci_context(
|
|
str(repo_dir),
|
|
{
|
|
"event_type": "issues",
|
|
"action": action,
|
|
"owner": owner,
|
|
"repo": repo,
|
|
"number": payload["issue"]["number"],
|
|
},
|
|
)
|
|
|
|
await run_agent(config, repo_dir, token)
|
|
log.info("Agent handled issue %s/%s#%d", owner, repo, payload["issue"]["number"])
|
|
|
|
|
|
async def dispatch_pr(
|
|
payload: dict[str, Any],
|
|
*,
|
|
config: Config,
|
|
http_client: httpx.AsyncClient,
|
|
**_: object,
|
|
) -> None:
|
|
"""Handle pull_request events — write context and invoke agent."""
|
|
action = payload.get("action")
|
|
if action not in {"opened", "synchronize"}:
|
|
return
|
|
|
|
pr = payload["pull_request"]
|
|
repo_info = payload["repository"]
|
|
owner = repo_info["owner"]["login"]
|
|
repo = repo_info["name"]
|
|
installation_id = payload["installation"]["id"]
|
|
|
|
token = await get_installation_token(
|
|
config,
|
|
installation_id,
|
|
client=http_client,
|
|
)
|
|
repo_dir = await clone_repo(
|
|
owner,
|
|
repo,
|
|
pr["head"]["ref"],
|
|
token,
|
|
config.workspace_dir,
|
|
)
|
|
|
|
_write_ci_context(
|
|
str(repo_dir),
|
|
{
|
|
"event_type": "pull_request",
|
|
"action": action,
|
|
"owner": owner,
|
|
"repo": repo,
|
|
"number": pr["number"],
|
|
"base_ref": pr["base"]["ref"],
|
|
"head_ref": pr["head"]["ref"],
|
|
},
|
|
)
|
|
|
|
ci_prompt = (
|
|
"AUTONOMOUS MODE: Do NOT ask the user any questions — work fully "
|
|
"autonomously. Make all decisions yourself: generate a run tag from "
|
|
"today's date, identify benchmark tiers from available tests, choose "
|
|
"optimization targets from profiler output. If something is ambiguous, "
|
|
"pick the reasonable default and document your choice in HANDOFF.md.\n\n"
|
|
"Optimize the Python code in this repository. This is a CI run "
|
|
f"triggered by PR #{pr['number']} "
|
|
f"({pr['head']['ref']} → {pr['base']['ref']}).\n\n"
|
|
"After optimization is complete, commit your changes and push to the "
|
|
f"PR branch: git push origin HEAD:{pr['head']['ref']}\n\n"
|
|
"Follow the full pipeline: setup, unified profiling, experiment loop "
|
|
"with benchmarks, verification, pre-submit review, and adversarial "
|
|
"review. Do not skip steps."
|
|
)
|
|
|
|
await run_agent(config, repo_dir, token, agent="codeflash-deep", prompt=ci_prompt)
|
|
log.info("Agent handled PR %s/%s#%d", owner, repo, pr["number"])
|
|
|
|
|
|
async def dispatch_push(
|
|
payload: dict[str, Any],
|
|
*,
|
|
config: Config,
|
|
http_client: httpx.AsyncClient,
|
|
**_: object,
|
|
) -> None:
|
|
"""Handle push events — write context and invoke agent."""
|
|
repo_info = payload["repository"]
|
|
owner = repo_info["owner"]["login"]
|
|
repo = repo_info["name"]
|
|
ref = payload.get("ref", "")
|
|
default_branch = repo_info.get("default_branch", "main")
|
|
installation_id = payload["installation"]["id"]
|
|
|
|
# Only process pushes to the default branch.
|
|
if ref != f"refs/heads/{default_branch}":
|
|
return
|
|
|
|
token = await get_installation_token(
|
|
config,
|
|
installation_id,
|
|
client=http_client,
|
|
)
|
|
repo_dir = await clone_repo(
|
|
owner,
|
|
repo,
|
|
default_branch,
|
|
token,
|
|
config.workspace_dir,
|
|
)
|
|
|
|
_write_ci_context(
|
|
str(repo_dir),
|
|
{
|
|
"event_type": "push",
|
|
"action": None,
|
|
"owner": owner,
|
|
"repo": repo,
|
|
"head_sha": payload.get("after", ""),
|
|
"ref": ref,
|
|
},
|
|
)
|
|
|
|
await run_agent(config, repo_dir, token)
|
|
log.info("Agent handled push to %s/%s ref=%s", owner, repo, ref)
|
|
|
|
|
|
EVENT_HANDLERS: dict[
|
|
str,
|
|
Callable[..., Coroutine[Any, Any, None]],
|
|
] = {
|
|
"issues": dispatch_issues,
|
|
"pull_request": dispatch_pr,
|
|
"push": dispatch_push,
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health() -> dict[str, str]:
|
|
"""Health check endpoint."""
|
|
return {"status": "ok"}
|
|
|
|
|
|
def main() -> None:
|
|
"""Entry point for the codeflash-service server."""
|
|
startup_cfg = Config()
|
|
uvicorn.run(
|
|
"github_app.app:app",
|
|
host=startup_cfg.host,
|
|
port=startup_cfg.port,
|
|
log_level="info",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|