codeflash-agent/packages/github-app/github_app/auth.py
Kevin Turcios 2caaf6af7c
Fix CI: mypy errors, ruff formatting, switch to prek (#22)
* Fix mypy errors and apply ruff formatting across packages

Fix ast.FunctionDef calls missing type_params for Python 3.12+,
correct type: ignore error codes in _comparator and _plugin, and
run ruff format on all package source and test files.

* Switch CI to prek for lint/typecheck checks

Use j178/prek-action for consistent lint+typecheck (ruff check,
ruff format, interrogate, mypy) matching local pre-commit config.
Keep test as a separate parallel job for test-env support.
2026-04-15 02:52:47 -05:00

86 lines
2.1 KiB
Python

"""GitHub App authentication: JWT generation, token exchange, signature verification."""
from __future__ import annotations
import hashlib
import hmac
import time
from typing import TYPE_CHECKING
import jwt
import stamina
from cachetools import TTLCache
from .retry import is_retryable
if TYPE_CHECKING:
import httpx
from .config import Config
GITHUB_API = "https://api.github.com"
# Cache installation tokens for 50 min (tokens last 1 hour).
# Keyed by (app_id, installation_id) to prevent cross-app leakage.
token_cache: TTLCache[tuple[str | int, int], str] = TTLCache(
maxsize=64,
ttl=3000,
)
def generate_jwt(cfg: Config) -> str:
"""Generate a short-lived JWT for the GitHub App."""
now = int(time.time())
payload = {
"iat": now - 60,
"exp": now + 600,
"iss": str(cfg.app_id),
}
return jwt.encode(payload, cfg.private_key, algorithm="RS256")
@stamina.retry(on=is_retryable, attempts=3)
async def get_installation_token(
cfg: Config,
installation_id: int,
*,
client: httpx.AsyncClient,
) -> str:
"""Exchange the JWT for an installation access token.
Results are cached per installation_id for 50 minutes.
"""
cache_key = (cfg.app_id, installation_id)
cached = token_cache.get(cache_key)
if cached is not None:
return cached
token = generate_jwt(cfg)
resp = await client.post(
f"{GITHUB_API}/app/installations/{installation_id}/access_tokens",
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
},
)
resp.raise_for_status()
result = resp.json()["token"]
token_cache[cache_key] = result
return result
def verify_signature(
payload: bytes,
signature: str,
secret: str,
) -> bool:
"""Verify the X-Hub-Signature-256 header."""
if not signature.startswith("sha256="):
return False
expected = hmac.new(
secret.encode(),
payload,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)