Skip to content

Commit

Permalink
ruff edits
Browse files Browse the repository at this point in the history
  • Loading branch information
turbomam committed Jul 1, 2024
1 parent 1375ede commit c552981
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea/
*.sqlite


###
Expand Down
56 changes: 28 additions & 28 deletions llm_github/core.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import json
import time
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

import requests
from requests_cache import CachedSession

REQUESTS_TIMEOUT = 10 # Timeout in seconds for requests

# Default fields to be dropped from responses
DEFAULT_DROPPED_FIELDS = [
DEFAULT_DROPPED_FIELDS: List[str] = [
"_links",
"base",
"comments_url",
Expand All @@ -33,7 +33,7 @@
class EnvironmentVariableError(Exception):
"""Exception raised for errors in the environment variables."""

def __init__(self, variable, message="is not set in the environment."):
def __init__(self, variable: str, message: str = "is not set in the environment.") -> None:
self.variable = variable
self.message = message
super().__init__(f"{variable} {message}")
Expand All @@ -59,12 +59,12 @@ def wait_for_rate_limit_reset(reset_time: int) -> None:
time.sleep(wait_time)


def remove_keys_from_dict(data: Dict, keys_to_remove: List[str]) -> Dict:
def remove_keys_from_dict(data: Dict[str, Any], keys_to_remove: List[str]) -> Dict[str, Any]:
"""Remove specified keys from a dictionary."""
return {key: value for key, value in data.items() if key not in keys_to_remove}


def write_json_to_file(json_object: List[Dict], filename: str) -> None:
def write_json_to_file(json_object: List[Dict[str, Any]], filename: str) -> None:
"""Save data to a JSON file."""
with open(filename, "w", encoding="utf-8") as f:
json.dump(json_object, f, ensure_ascii=False, indent=4)
Expand All @@ -84,7 +84,7 @@ def handle_response_errors(response: requests.Response) -> None:
print("Error message:", response.text)


def github_token_check(token: str, session: CachedSession) -> Optional[Dict]:
def github_token_check(token: str, session: CachedSession) -> Optional[Dict[str, Any]]:
"""Validate the GitHub token by fetching user profile."""
headers = {"Authorization": f"token {token}"}
response = session.get("https://api.github.com/user", headers=headers, timeout=REQUESTS_TIMEOUT)
Expand All @@ -95,7 +95,7 @@ def github_token_check(token: str, session: CachedSession) -> Optional[Dict]:
return None


def list_user_orgs(token: str, session: CachedSession) -> Optional[List[Dict]]:
def list_user_orgs(token: str, session: CachedSession) -> Optional[List[Dict[str, Any]]]:
"""List all organizations the user is a member of."""
rate_limit = get_rate_limit(token, session)
if rate_limit["remaining"] == 0:
Expand All @@ -109,12 +109,12 @@ def list_user_orgs(token: str, session: CachedSession) -> Optional[List[Dict]]:
return None


def get_repos(org: str, token: str, session: CachedSession) -> Optional[List[Dict]]:
def get_repos(org: str, token: str, session: CachedSession) -> Optional[List[Dict[str, Any]]]:
"""Fetch all repositories for a given organization."""
rate_limit = get_rate_limit(token, session)
if rate_limit["remaining"] == 0:
wait_for_rate_limit_reset(rate_limit["reset"])
repos = []
repos: List[Dict[str, Any]] = []
url = f"https://api.github.com/orgs/{org}/repos"
headers = {"Authorization": f"token {token}"}
while url:
Expand All @@ -128,9 +128,9 @@ def get_repos(org: str, token: str, session: CachedSession) -> Optional[List[Dic
return repos


def fetch_issues(org: str, token: str, session: CachedSession) -> Optional[List[Dict]]:
def fetch_issues(org: str, token: str, session: CachedSession) -> Optional[List[Dict[str, Any]]]:
"""Fetch all issues from all repositories in an organization, handling pagination and rate limits."""
issues = []
issues: List[Dict[str, Any]] = []
repos = get_repos(org, token, session)
if not repos:
print("No repositories found or failed to fetch repositories.")
Expand All @@ -156,7 +156,7 @@ def fetch_issues(org: str, token: str, session: CachedSession) -> Optional[List[
return issues


def sanitize_user_data(data: Dict) -> Dict:
def sanitize_user_data(data: Any) -> Any:
"""Recursively sanitize user data to keep only the user 'login'."""
if isinstance(data, dict):
if "login" in data and set(data.keys()) - {"login"}:
Expand All @@ -168,7 +168,7 @@ def sanitize_user_data(data: Dict) -> Dict:
return data


def remove_empty_values(data: Dict) -> Dict:
def remove_empty_values(data: Any) -> Any:
"""Recursively remove keys with empty values from a dictionary or list."""
if isinstance(data, dict):
return {k: remove_empty_values(v) for k, v in data.items() if v or isinstance(v, bool)}
Expand All @@ -177,9 +177,9 @@ def remove_empty_values(data: Dict) -> Dict:
return data


def process_issues(issues: List[Dict], keys_to_remove: List[str]) -> List[Dict]:
def process_issues(issues: List[Dict[str, Any]], keys_to_remove: List[str]) -> List[Dict[str, Any]]:
"""Process a list of issues to sanitize user information and remove empty values."""
processed_issues = []
processed_issues: List[Dict[str, Any]] = []
for issue in issues:
sanitized_issue = sanitize_user_data(issue)
cleaned_issue = remove_empty_values(sanitized_issue)
Expand All @@ -188,9 +188,9 @@ def process_issues(issues: List[Dict], keys_to_remove: List[str]) -> List[Dict]:
return processed_issues


def fetch_pull_requests(org: str, token: str, session: CachedSession) -> Optional[List[Dict]]:
def fetch_pull_requests(org: str, token: str, session: CachedSession) -> Optional[List[Dict[str, Any]]]:
"""Fetch all pull requests from all repositories in an organization, handling pagination and rate limits."""
pull_requests = []
pull_requests: List[Dict[str, Any]] = []
repos = get_repos(org, token, session)
if not repos:
print("No repositories found or failed to fetch repositories.")
Expand All @@ -215,9 +215,9 @@ def fetch_pull_requests(org: str, token: str, session: CachedSession) -> Optiona
return pull_requests


def process_pull_requests(pull_requests: List[Dict], keys_to_remove: List[str]) -> List[Dict]:
def process_pull_requests(pull_requests: List[Dict[str, Any]], keys_to_remove: List[str]) -> List[Dict[str, Any]]:
"""Process a list of pull requests to sanitize user information and remove empty values."""
processed_pull_requests = []
processed_pull_requests: List[Dict[str, Any]] = []
for pr in pull_requests:
sanitized_pr = sanitize_user_data(pr)
cleaned_pr = remove_empty_values(sanitized_pr)
Expand All @@ -226,10 +226,10 @@ def process_pull_requests(pull_requests: List[Dict], keys_to_remove: List[str])
return processed_pull_requests


def fetch_all_comments(org: str, token: str, session: CachedSession) -> Optional[List[Dict]]:
def fetch_all_comments(org: str, token: str, session: CachedSession) -> Optional[List[Dict[str, Any]]]:
"""Fetch all comments from all repositories in an organization,
distinguishing between issue and PR comments, while handling pagination and rate limits."""
all_comments = []
all_comments: List[Dict[str, Any]] = []
repos = get_repos(org, token, session)
if not repos:
print("No repositories found or failed to fetch repositories.")
Expand Down Expand Up @@ -261,9 +261,9 @@ def fetch_all_comments(org: str, token: str, session: CachedSession) -> Optional
return all_comments


def process_comments(comments: List[Dict], keys_to_remove: List[str]) -> List[Dict]:
def process_comments(comments: List[Dict[str, Any]], keys_to_remove: List[str]) -> List[Dict[str, Any]]:
"""Process a list of comments to sanitize user information and remove empty values."""
processed_comments = []
processed_comments: List[Dict[str, Any]] = []
for comment in comments:
sanitized_comment = sanitize_user_data(comment)
cleaned_comment = remove_empty_values(sanitized_comment)
Expand All @@ -272,9 +272,9 @@ def process_comments(comments: List[Dict], keys_to_remove: List[str]) -> List[Di
return processed_comments


def fetch_all_discussions(org: str, token: str, session: CachedSession) -> Optional[List[Dict]]:
def fetch_all_discussions(org: str, token: str, session: CachedSession) -> Optional[List[Dict[str, Any]]]:
"""Fetch discussions from all repositories in the specified organization."""
all_discussions = []
all_discussions: List[Dict[str, Any]] = []
repos = get_repos(org, token, session)
if repos:
for repo in repos:
Expand All @@ -288,7 +288,7 @@ def fetch_all_discussions(org: str, token: str, session: CachedSession) -> Optio
return all_discussions


def fetch_discussions_graphql(org: str, repo: str, token: str) -> Optional[List[Dict]]:
def fetch_discussions_graphql(org: str, repo: str, token: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch discussions using GitHub's GraphQL API."""
url = "https://api.github.com/graphql"
headers = {"Authorization": f"Bearer {token}"}
Expand Down Expand Up @@ -330,9 +330,9 @@ def fetch_discussions_graphql(org: str, repo: str, token: str) -> Optional[List[
return None


def process_discussions(discussions: List[Dict], keys_to_remove: List[str]) -> List[Dict]:
def process_discussions(discussions: List[Dict[str, Any]], keys_to_remove: List[str]) -> List[Dict[str, Any]]:
"""Process a list of discussions to sanitize user information, remove empty values, and remove specified keys."""
processed_discussions = []
processed_discussions: List[Dict[str, Any]] = []
for discussion in discussions:
sanitized_discussion = sanitize_user_data(discussion)
cleaned_discussion = remove_empty_values(sanitized_discussion)
Expand Down
67 changes: 28 additions & 39 deletions llm_github/execute.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import os
from typing import Dict, List, Optional

# Fixing import conflicts by adjusting namespace and avoiding re-importing CachedSession
from core import (
DEFAULT_DROPPED_FIELDS,
CachedSession,
EnvironmentVariableError,
fetch_all_comments,
fetch_all_discussions,
Expand All @@ -18,67 +19,55 @@
write_json_to_file,
)
from dotenv import load_dotenv
from requests_cache import CachedSession
from requests_cache.backends.sqlite import SQLiteCache

# Load environment variables from .env file
load_dotenv(dotenv_path="local/.env", verbose=True)

# Global access token for GitHub API
global_token = os.environ["GITHUB_TOKEN"]
global_token: str = os.getenv("GITHUB_TOKEN", "")
if not global_token:
raise EnvironmentVariableError("GITHUB_TOKEN")
print("Token loaded successfully.")

# Set up cache with SQLite backend
session = CachedSession(
session: CachedSession = CachedSession(
cache_name="llm-github-cache",
backend=SQLiteCache("llm-github.sqlite", timeout=86400), # Cache expires after 24 hours
)

user_data = github_token_check(global_token, session=session)
orgs = list_user_orgs(global_token, session=session)
user_data: Optional[Dict] = github_token_check(global_token, session=session)
orgs: Optional[List[Dict]] = list_user_orgs(global_token, session=session)

# turbomam: Resource not found. This could be due to incorrect organization name or insufficient access permissions.
# Error message:
# {
# "message": "Not Found",
# "documentation_url": "https://docs.github.com/rest/repos/repos#list-organization-repositories",
# "status": "404"
# }

# microbiomedata: Access forbidden. Check if your token has the required scopes or if there's a rate limit issue.
# Error message:
# {
# "message": "`microbiomedata` forbids access via a personal access token (classic). Please use a GitHub App, OAuth App, or a personal access token with fine-grained permissions.",
# "documentation_url": "https://docs.github.com/rest/repos/repos#list-organization-repositories",
# "status": "403"
# }

# works: berkeleybop

org_name = "microbiomedata"
org_name: str = "microbiomedata"

print("FETCHING REPOS")
repos = get_repos(org_name, global_token, session=session)
write_json_to_file(repos, f"{org_name}_repos.json")
repos: Optional[List[Dict]] = get_repos(org_name, global_token, session=session)
if repos:
write_json_to_file(repos, f"{org_name}_repos.json")

print("FETCHING ISSUES")
org_issues = fetch_issues(org_name, global_token, session=session)
sanitized_issues = process_issues(org_issues, DEFAULT_DROPPED_FIELDS)
write_json_to_file(sanitized_issues, f"{org_name}_issues.json")
org_issues: Optional[List[Dict]] = fetch_issues(org_name, global_token, session=session)
if org_issues:
sanitized_issues: List[Dict] = process_issues(org_issues, DEFAULT_DROPPED_FIELDS)
write_json_to_file(sanitized_issues, f"{org_name}_issues.json")

print("FETCHING PRs")
pull_requests = fetch_pull_requests(org_name, global_token, session=session)
processed_pull_requests = process_pull_requests(pull_requests, DEFAULT_DROPPED_FIELDS)
write_json_to_file(processed_pull_requests, f"{org_name}_prs.json")
pull_requests: Optional[List[Dict]] = fetch_pull_requests(org_name, global_token, session=session)
if pull_requests:
processed_pull_requests: List[Dict] = process_pull_requests(pull_requests, DEFAULT_DROPPED_FIELDS)
write_json_to_file(processed_pull_requests, f"{org_name}_prs.json")

print("FETCHING COMMENTS")
comments = fetch_all_comments(org_name, global_token, session=session)
processed_comments = process_comments(comments, DEFAULT_DROPPED_FIELDS)
write_json_to_file(processed_comments, f"{org_name}_comments.json")
comments: Optional[List[Dict]] = fetch_all_comments(org_name, global_token, session=session)
if comments:
processed_comments: List[Dict] = process_comments(comments, DEFAULT_DROPPED_FIELDS)
write_json_to_file(processed_comments, f"{org_name}_comments.json")

print("FETCHING DISCUSSIONS")
all_discussions = fetch_all_discussions(org_name, global_token, session=session)
processed_discussions = process_discussions(all_discussions, DEFAULT_DROPPED_FIELDS)
print(f"Total discussions fetched from all repositories: {len(processed_discussions)}")
write_json_to_file(processed_discussions, f"{org_name}_discussions.json")
all_discussions: Optional[List[Dict]] = fetch_all_discussions(org_name, global_token, session=session)
if all_discussions:
processed_discussions: List[Dict] = process_discussions(all_discussions, DEFAULT_DROPPED_FIELDS)
print(f"Total discussions fetched from all repositories: {len(processed_discussions)}")
write_json_to_file(processed_discussions, f"{org_name}_discussions.json")
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ packages = [
python = ">=3.8,<4.0"
python-dotenv = "^1.0.1"
requests-cache = "^1.2.1"
types-requests = "^2.32.0.20240622"

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.0"
Expand Down

0 comments on commit c552981

Please sign in to comment.