mirror of
https://github.com/codeflash-ai/codeflash-agent.git
synced 2026-05-04 18:25:19 +00:00
187 lines
5.1 KiB
Python
187 lines
5.1 KiB
Python
|
|
"""Pull GitHub PR data and standup notes into data.json for the Dash app."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import functools
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
from concurrent.futures import ThreadPoolExecutor
|
||
|
|
from datetime import datetime, timedelta, timezone
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import requests
|
||
|
|
|
||
|
|
REPOS = [
|
||
|
|
"codeflash",
|
||
|
|
"codeflash-internal",
|
||
|
|
"codeflash-agent",
|
||
|
|
"github-workflows",
|
||
|
|
]
|
||
|
|
ORG = "codeflash-ai"
|
||
|
|
NOTES_DIR = Path(__file__).parent / "notes"
|
||
|
|
DATA_FILE = Path(__file__).parent / "data.json"
|
||
|
|
REPO_ROOT = Path(__file__).parents[2]
|
||
|
|
CI_AUDIT_FILE = REPO_ROOT / "reports" / "codeflash-ci-audit" / "data.json"
|
||
|
|
|
||
|
|
|
||
|
|
@functools.lru_cache(maxsize=1)
|
||
|
|
def gh_token() -> str:
|
||
|
|
token = os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_TOKEN", "")
|
||
|
|
if not token:
|
||
|
|
import subprocess
|
||
|
|
|
||
|
|
result = subprocess.run(
|
||
|
|
["gh", "auth", "token"],
|
||
|
|
capture_output=True,
|
||
|
|
text=True,
|
||
|
|
check=False,
|
||
|
|
)
|
||
|
|
token = result.stdout.strip()
|
||
|
|
return token
|
||
|
|
|
||
|
|
|
||
|
|
def gh_headers() -> dict[str, str]:
|
||
|
|
return {
|
||
|
|
"Authorization": f"token {gh_token()}",
|
||
|
|
"Accept": "application/vnd.github+json",
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _fetch_repo_prs(
|
||
|
|
repo: str, state: str, headers: dict, since: datetime | None = None
|
||
|
|
) -> list[dict]:
|
||
|
|
url = f"https://api.github.com/repos/{ORG}/{repo}/pulls"
|
||
|
|
params: dict[str, str] = {
|
||
|
|
"state": state,
|
||
|
|
"per_page": "30",
|
||
|
|
"sort": "updated",
|
||
|
|
"direction": "desc",
|
||
|
|
}
|
||
|
|
resp = requests.get(url, headers=headers, params=params, timeout=15)
|
||
|
|
if resp.status_code != 200:
|
||
|
|
return []
|
||
|
|
prs = []
|
||
|
|
for pr in resp.json():
|
||
|
|
updated = datetime.fromisoformat(
|
||
|
|
pr["updated_at"].replace("Z", "+00:00")
|
||
|
|
)
|
||
|
|
if since and updated < since:
|
||
|
|
continue
|
||
|
|
prs.append(
|
||
|
|
{
|
||
|
|
"repo": repo,
|
||
|
|
"number": pr["number"],
|
||
|
|
"title": pr["title"],
|
||
|
|
"state": pr["state"],
|
||
|
|
"author": pr["user"]["login"],
|
||
|
|
"url": pr["html_url"],
|
||
|
|
"created_at": pr["created_at"],
|
||
|
|
"updated_at": pr["updated_at"],
|
||
|
|
"merged_at": pr.get("merged_at"),
|
||
|
|
"draft": pr.get("draft", False),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return prs
|
||
|
|
|
||
|
|
|
||
|
|
def fetch_prs(state: str, since: datetime | None = None) -> list[dict]:
|
||
|
|
headers = gh_headers()
|
||
|
|
with ThreadPoolExecutor(max_workers=len(REPOS)) as pool:
|
||
|
|
futures = [
|
||
|
|
pool.submit(_fetch_repo_prs, repo, state, headers, since)
|
||
|
|
for repo in REPOS
|
||
|
|
]
|
||
|
|
prs = []
|
||
|
|
for f in futures:
|
||
|
|
prs.extend(f.result())
|
||
|
|
return prs
|
||
|
|
|
||
|
|
|
||
|
|
def parse_note(path: Path) -> dict:
|
||
|
|
text = path.read_text(encoding="utf-8")
|
||
|
|
sections: dict[str, list[str]] = {}
|
||
|
|
current = None
|
||
|
|
title = None
|
||
|
|
in_code = False
|
||
|
|
code_lines: list[str] = []
|
||
|
|
for line in text.splitlines():
|
||
|
|
if line.strip().startswith("```"):
|
||
|
|
if in_code:
|
||
|
|
if current:
|
||
|
|
sections[current].append("code:" + "\n".join(code_lines))
|
||
|
|
code_lines = []
|
||
|
|
in_code = False
|
||
|
|
else:
|
||
|
|
in_code = True
|
||
|
|
continue
|
||
|
|
if in_code:
|
||
|
|
code_lines.append(line)
|
||
|
|
continue
|
||
|
|
h1 = re.match(r"^#\s+(.+)", line)
|
||
|
|
if h1 and not title:
|
||
|
|
title = h1.group(1).strip()
|
||
|
|
continue
|
||
|
|
heading = re.match(r"^##\s+(.+)", line)
|
||
|
|
if heading:
|
||
|
|
current = heading.group(1).strip().lower()
|
||
|
|
sections[current] = []
|
||
|
|
elif current and line.strip().startswith("- "):
|
||
|
|
sections[current].append(line.strip().removeprefix("- "))
|
||
|
|
elif current and line.strip():
|
||
|
|
sections[current].append(line.strip())
|
||
|
|
return {
|
||
|
|
"date": path.stem,
|
||
|
|
"title": title or path.stem,
|
||
|
|
"sections": sections,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def load_notes() -> list[dict]:
|
||
|
|
if not NOTES_DIR.exists():
|
||
|
|
return []
|
||
|
|
notes = []
|
||
|
|
for f in sorted(NOTES_DIR.glob("*.md"), reverse=True):
|
||
|
|
notes.append(parse_note(f))
|
||
|
|
return notes
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
now = datetime.now(timezone.utc)
|
||
|
|
week_ago = now - timedelta(days=7)
|
||
|
|
|
||
|
|
open_prs = fetch_prs("open")
|
||
|
|
closed_prs = fetch_prs("closed", since=week_ago)
|
||
|
|
merged_prs = [pr for pr in closed_prs if pr["merged_at"]]
|
||
|
|
notes = load_notes()
|
||
|
|
|
||
|
|
ci_audit = None
|
||
|
|
resolved = CI_AUDIT_FILE.resolve()
|
||
|
|
if resolved.exists():
|
||
|
|
ci_audit = json.loads(resolved.read_text(encoding="utf-8"))
|
||
|
|
|
||
|
|
data = {
|
||
|
|
"generated_at": now.isoformat(),
|
||
|
|
"org": ORG,
|
||
|
|
"repos": REPOS,
|
||
|
|
"open_prs": open_prs,
|
||
|
|
"merged_prs": merged_prs,
|
||
|
|
"notes": notes,
|
||
|
|
"summary": {
|
||
|
|
"total_open": len(open_prs),
|
||
|
|
"total_merged_7d": len(merged_prs),
|
||
|
|
"draft_count": sum(1 for pr in open_prs if pr["draft"]),
|
||
|
|
"repos_with_open_prs": len({pr["repo"] for pr in open_prs}),
|
||
|
|
},
|
||
|
|
"ci_audit": ci_audit,
|
||
|
|
}
|
||
|
|
|
||
|
|
DATA_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
||
|
|
print(
|
||
|
|
f"Wrote {DATA_FILE} ({len(open_prs)} open, {len(merged_prs)} merged)"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|