"""GitHub App authentication: JWT generation, token exchange, signature verification.""" from __future__ import annotations import hashlib import hmac import time from typing import TYPE_CHECKING import jwt import stamina from cachetools import TTLCache from .retry import is_retryable if TYPE_CHECKING: import httpx from .config import Config GITHUB_API = "https://api.github.com" # Cache installation tokens for 50 min (tokens last 1 hour). # Keyed by (app_id, installation_id) to prevent cross-app leakage. token_cache: TTLCache[tuple[str | int, int], str] = TTLCache( maxsize=64, ttl=3000, ) def generate_jwt(cfg: Config) -> str: """Generate a short-lived JWT for the GitHub App.""" now = int(time.time()) payload = { "iat": now - 60, "exp": now + 600, "iss": str(cfg.app_id), } return jwt.encode(payload, cfg.private_key, algorithm="RS256") @stamina.retry(on=is_retryable, attempts=3) async def get_installation_token( cfg: Config, installation_id: int, *, client: httpx.AsyncClient, ) -> str: """Exchange the JWT for an installation access token. Results are cached per installation_id for 50 minutes. """ cache_key = (cfg.app_id, installation_id) cached = token_cache.get(cache_key) if cached is not None: return cached token = generate_jwt(cfg) resp = await client.post( f"{GITHUB_API}/app/installations/{installation_id}/access_tokens", headers={ "Authorization": f"Bearer {token}", "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28", }, ) resp.raise_for_status() result = resp.json()["token"] token_cache[cache_key] = result return result def verify_signature( payload: bytes, signature: str, secret: str, ) -> bool: """Verify the X-Hub-Signature-256 header.""" if not signature.startswith("sha256="): return False expected = hmac.new( secret.encode(), payload, hashlib.sha256, ).hexdigest() return hmac.compare_digest(f"sha256={expected}", signature)