Move aiservice to django/aiservice

This commit is contained in:
afik.cohen 2024-01-02 16:55:24 -08:00
parent bc50e0d8cb
commit 677a299f6e
17 changed files with 5 additions and 217 deletions

0
codeflash/__init__.py Normal file
View file

View file

View file

View file

@ -1,94 +0,0 @@
import http.client
import json
import os
import sys
import time
import urllib.parse
from typing import Dict, Any
# Constants
CLIENT_ID = "Iv1.e30ce7aaafaf6412" # Replace with your GitHub App's client ID
# Function to parse API responses
def parse_response(response: http.client.HTTPResponse) -> Dict[str, Any]:
if response.status in [200, 201]:
return json.loads(response.read().decode())
else:
print(response.read().decode())
sys.exit(1)
# Function to request a device code
def request_device_code() -> Dict[str, Any]:
conn = http.client.HTTPSConnection("github.com")
payload = urllib.parse.urlencode({"client_id": CLIENT_ID})
headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json'}
conn.request("POST", "/login/device/code", payload, headers)
return parse_response(conn.getresponse())
# Function to request a token
def request_token(device_code: str) -> Dict[str, Any]:
conn = http.client.HTTPSConnection("github.com")
payload = urllib.parse.urlencode({
"client_id": CLIENT_ID,
"device_code": device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code"
})
headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json'}
conn.request("POST", "/login/oauth/access_token", payload, headers)
return parse_response(conn.getresponse())
# Function to poll for a token
def poll_for_token(device_code: str, interval: int):
while True:
response = request_token(device_code)
error, access_token = response.get("error"), response.get("access_token")
if error:
if error == "authorization_pending":
time.sleep(interval)
continue
elif error == "slow_down":
time.sleep(interval + 5)
continue
elif error == "expired_token":
print("The device code has expired. Please run `login` again.")
sys.exit(1)
elif error == "access_denied":
print("Login cancelled by user.")
sys.exit(1)
else:
print(response)
sys.exit(1)
with open(".token", "w") as token_file:
token_file.write(access_token)
os.chmod(".token", 0o600)
break
# Login function
def login():
response = request_device_code()
verification_uri, user_code, device_code, interval = response.values_at("verification_uri", "user_code",
"device_code", "interval")
print(f"Please visit: {verification_uri} and enter code: {user_code}")
poll_for_token(device_code, interval)
print("Successfully authenticated!")
# Main function
def main():
if len(sys.argv) < 2 or sys.argv[1] == "help":
print("usage: codeflash <login | help>")
elif sys.argv[1] == "login":
login()
else:
print(f"Unknown command {sys.argv[1]}")
if __name__ == "__main__":
main()

View file

@ -1,120 +0,0 @@
import sys
import time
import uuid
from typing import NoReturn
from typing import Optional
from typing import Tuple
import click
def make_login_url() -> Tuple[uuid.UUID, str]:
env = get_state().env
session_id = uuid.uuid4()
return (
session_id,
f"{env.semgrep_url}/login?cli-token={session_id}&docker={env.in_docker}&gha={env.in_gh_action}",
)
@click.command()
@handle_command_errors
def login() -> NoReturn:
"""
Obtain and save credentials for semgrep.dev
Looks for an semgrep.dev API token in the environment variable SEMGREP_APP_TOKEN.
If not defined and running in a TTY, prompts interactively.
Once token is found, saves it to global settings file
"""
state = get_state()
saved_login_token = auth._read_token_from_settings_file()
if saved_login_token:
click.echo(
f"API token already exists in {state.settings.path}. To login with a different token logout use `semgrep logout`"
)
sys.exit(FATAL_EXIT_CODE)
# If the token is provided as an environment variable, save it to the settings file.
if state.env.app_token is not None and len(state.env.app_token) > 0:
if not save_token(state.env.app_token, echo_token=False):
sys.exit(FATAL_EXIT_CODE)
sys.exit(0)
# If token doesn't already exist in the settings file or as an environment variable,
# interactively prompt the user to supply it (if we are in a TTY).
if not auth.is_a_tty():
click.echo(
f"Error: semgrep login is an interactive command: run in an interactive terminal (or define SEMGREP_APP_TOKEN)",
err=True,
)
sys.exit(FATAL_EXIT_CODE)
session_id, url = make_login_url()
click.echo(
"Login enables additional proprietary Semgrep Registry rules and running custom policies from Semgrep Cloud Platform."
)
click.echo(f"Opening login at: {url}")
click.launch(url)
click.echo(
"\nOnce you've logged in, return here and you'll be ready to start using new Semgrep rules."
)
WAIT_BETWEEN_RETRY_IN_SEC = 6 # So every 10 retries is a minute
MAX_RETRIES = 30 # Give users 3 minutes to log in / open link
for _ in range(MAX_RETRIES):
r = state.app_session.post(
f"{state.env.semgrep_url}/api/agent/tokens/requests",
headers={
"User-Agent": str(state.app_session.user_agent),
"X-Semgrep-Client-Id": str(
state.settings.get("anonymous_user_id") or ""
),
},
json={"token_request_key": str(session_id)},
)
if r.status_code == 200:
as_json = r.json()
if save_token(as_json.get("token"), echo_token=True):
sys.exit(0)
else:
sys.exit(FATAL_EXIT_CODE)
elif r.status_code != 404:
click.echo(
f"Unexpected failure from {state.env.semgrep_url}: status code {r.status_code}; please contact support@semgrep.com if this persists",
err=True,
)
time.sleep(WAIT_BETWEEN_RETRY_IN_SEC)
click.echo(
f"Failed to login: please check your internet connection or contact support@semgrep.com",
err=True,
)
sys.exit(FATAL_EXIT_CODE)
def save_token(login_token: Optional[str], echo_token: bool) -> bool:
state = get_state()
if login_token is not None and auth.get_deployment_from_token(login_token):
auth.set_token(login_token)
click.echo(
f"Saved login token\n\n\t{login_token if echo_token else '<redacted>'}\n\nin {state.settings.path}."
)
click.echo(
f"Note: You can always generate more tokens at {state.env.semgrep_url}/orgs/-/settings/tokens"
)
return True
else:
click.echo("Login token is not valid. Please try again.", err=True)
return False
@click.command()
@handle_command_errors
def logout() -> None:
"""
Remove locally stored credentials to semgrep.dev
"""
auth.delete_token()
click.echo("Logged out (log back in with `semgrep login`)")

View file

View file

View file

View file

View file

View file

View file

View file

@ -90,7 +90,6 @@ def comparator(orig: Any, new: Any) -> bool:
if isinstance(orig, (datetime.datetime, datetime.date, datetime.timedelta)): if isinstance(orig, (datetime.datetime, datetime.date, datetime.timedelta)):
return orig == new return orig == new
# If the object passed has a user defined __eq__ method, use that # If the object passed has a user defined __eq__ method, use that
# This could fail if the user defined __eq__ is defined with cython # This could fail if the user defined __eq__ is defined with cython
try: try:

0
tests/__init__.py Normal file
View file

View file

View file

@ -1,6 +1,7 @@
import datetime import datetime
import pytest import pytest
from codeflash.verification.comparator import comparator from codeflash.verification.comparator import comparator
from codeflash.verification.equivalence import compare_results from codeflash.verification.equivalence import compare_results
from codeflash.verification.test_results import ( from codeflash.verification.test_results import (
@ -196,7 +197,6 @@ def test_custom_object():
assert comparator(b, c) assert comparator(b, c)
def test_compare_results_fn(): def test_compare_results_fn():
original_results = TestResults( original_results = TestResults(
[ [

View file

@ -19,7 +19,10 @@ def test_simple_dependencies():
str(file_path.parent.resolve()), str(file_path.parent.resolve()),
) )
assert len(dependent_functions) == 1 assert len(dependent_functions) == 1
assert dependent_functions[0].definition.full_name == "test_function_dependencies.calculate_something" assert (
dependent_functions[0].definition.full_name
== "test_function_dependencies.calculate_something"
)
def global_dependency_1(num): def global_dependency_1(num):