From 00c959c99f3151f5f576b591ae4e072547aa880c Mon Sep 17 00:00:00 2001 From: Alden Hilton Date: Thu, 1 Feb 2024 14:57:04 -0800 Subject: [PATCH] Convert API reference to enums --- scubagoggles/api_reference.py | 23 +++++++++++++++++++++++ scubagoggles/provider.py | 30 ++++++++++++++++-------------- scubagoggles/reporter/reporter.py | 30 +++--------------------------- 3 files changed, 42 insertions(+), 41 deletions(-) create mode 100644 scubagoggles/api_reference.py diff --git a/scubagoggles/api_reference.py b/scubagoggles/api_reference.py new file mode 100644 index 00000000..63968441 --- /dev/null +++ b/scubagoggles/api_reference.py @@ -0,0 +1,23 @@ +from enum import Enum + +BASE_URL = "https://developers.google.com/admin-sdk" + +class ApiReference(Enum): + LIST_USERS = "directory/v1/users/list" + LIST_OUS = "directory/v1/orgunits/list" + LIST_DOMAINS = "directory/v1/domains/list" + LIST_GROUPS = "directory/v1/groups/list" + LIST_ACTIVITIES = "reports/v1/activities/list" + GET_GROUP = "groups-settings/v1/groups/get" + +class ApiUrl(Enum): + LIST_USERS = f"{BASE_URL}/directory/reference/rest/v1/users/list" + LIST_OUS = f"{BASE_URL}/directory/reference/rest/v1/orgunits/list" + LIST_DOMAINS = f"{BASE_URL}/directory/reference/rest/v1/domains/list" + LIST_GROUPS = f"{BASE_URL}/directory/reference/rest/v1/groups/list" + LIST_ACTIVITIES = f"{BASE_URL}/reports/reference/rest/v1/activities/list" + GET_GROUP = f"{BASE_URL}/groups-settings/v1/reference/" + +API_LINKS = { + api.value: f'{api.value}' for api in ApiReference +} \ No newline at end of file diff --git a/scubagoggles/provider.py b/scubagoggles/provider.py index d8523dd6..5dd4e05f 100644 --- a/scubagoggles/provider.py +++ b/scubagoggles/provider.py @@ -6,6 +6,7 @@ from tqdm import tqdm from scubagoggles.utils import create_subset_inverted_dict, create_key_to_list, merge_dicts +from scubagoggles.api_reference import ApiReference from scubagoggles.robust_dns import RobustDNSClient EVENTS = { @@ -66,6 +67,7 @@ 'all': [None] } + SELECTORS = ["google", "selector1", "selector2"] # For DKIM. # Unfortunately, hard-coded. Ideally, we'd be able to use an API to get @@ -112,11 +114,11 @@ def list_domains(self) -> list: try: self.domains = self.services['directory'].domains().list(customer=self.customer_id)\ .execute()['domains'] - self.successful_calls.add("directory/v1/domains/list") + self.successful_calls.add(ApiReference.LIST_DOMAINS.value) except Exception as exc: self.domains = [] warnings.warn(f"An exception was thrown by list_domains: {exc}", RuntimeWarning) - self.unsuccessful_calls.add("directory/v1/domains/list") + self.unsuccessful_calls.add(ApiReference.LIST_DOMAINS.value) return self.domains def get_spf_records(self, domains: list) -> list: @@ -265,14 +267,14 @@ def get_super_admins(self) -> dict: org_unit = org_unit[1:] if org_unit.startswith('/') else org_unit email = user['primaryEmail'] admins.append({'primaryEmail': email, 'orgUnitPath': org_unit}) - self.successful_calls.add("directory/v1/users/list") + self.successful_calls.add(ApiReference.LIST_USERS.value) return {'super_admins': admins} except Exception as exc: warnings.warn( f"Exception thrown while getting super admins; outputs will be incorrect: {exc}", RuntimeWarning ) - self.unsuccessful_calls.add("directory/v1/users/list") + self.unsuccessful_calls.add(ApiReference.LIST_USERS.value) return {'super_admins': []} def get_ous(self) -> dict: @@ -283,7 +285,7 @@ def get_ous(self) -> dict: try: response = self.services['directory'].orgunits().list(customerId=self.customer_id)\ .execute() - self.successful_calls.add("directory/v1/orgunits/list") + self.successful_calls.add(ApiReference.LIST_OUS.value) if 'organizationUnits' not in response: return {} return response @@ -292,7 +294,7 @@ def get_ous(self) -> dict: f"Exception thrown while getting top level OU: {exc}", RuntimeWarning ) - self.unsuccessful_calls.add("directory/v1/orgunits/list") + self.unsuccessful_calls.add(ApiReference.LIST_OUS.value) return {} def get_toplevel_ou(self) -> str: @@ -320,14 +322,14 @@ def get_toplevel_ou(self) -> str: response = self.services['directory'].orgunits()\ .get(customerId=self.customer_id, orgUnitPath=parent_ou).execute() ou_name = response['name'] - self.successful_calls.add("directory/v1/orgunits/list") + self.successful_calls.add(ApiReference.LIST_OUS.value) return ou_name except Exception as exc: warnings.warn( f"Exception thrown while getting top level OU: {exc}", RuntimeWarning ) - self.unsuccessful_calls.add("directory/v1/orgunits/list") + self.unsuccessful_calls.add(ApiReference.LIST_OUS.value) return "Error Retrieving" @@ -435,16 +437,16 @@ def get_group_settings(self) -> dict: for group in response.get('groups'): email = group.get('email') group_settings.append(group_service.groups().get(groupUniqueId=email).execute()) - self.successful_calls.add("directory/v1/groups/list") - self.successful_calls.add("groups-settings/v1/groups/get") + self.successful_calls.add(ApiReference.LIST_GROUPS.value) + self.successful_calls.add(ApiReference.GET_GROUP.value) return {'group_settings': group_settings} except Exception as exc: warnings.warn( f"Exception thrown while getting group settings; outputs will be incorrect: {exc}", RuntimeWarning ) - self.unsuccessful_calls.add("directory/v1/groups/list") - self.unsuccessful_calls.add("groups-settings/v1/groups/get") + self.unsuccessful_calls.add(ApiReference.LIST_GROUPS.value) + self.unsuccessful_calls.add(ApiReference.GET_GROUP.value) return {'group_settings': []} def call_gws_providers(self, products: list, quiet) -> dict: @@ -492,11 +494,11 @@ def call_gws_providers(self, products: list, quiet) -> dict: product_to_logs, self.get_gws_logs(products=product_list, event=event) ) - self.successful_calls.add("reports/v1/activities/list") + self.successful_calls.add(ApiReference.LIST_ACTIVITIES.value) except Exception as exc: warnings.warn("Provider Exception thrown while getting the logs; "\ f"outputs will be incorrect: {exc}", RuntimeWarning) - self.unsuccessful_calls.add("reports/v1/activities/list") + self.unsuccessful_calls.add(ApiReference.LIST_ACTIVITIES.value) # repacks the main aggregator into the original form # that the api returns the data in; under an 'items' key. diff --git a/scubagoggles/reporter/reporter.py b/scubagoggles/reporter/reporter.py index d9b837a5..f5f83cd6 100644 --- a/scubagoggles/reporter/reporter.py +++ b/scubagoggles/reporter/reporter.py @@ -9,6 +9,7 @@ from datetime import datetime import pandas as pd from scubagoggles.utils import rel_abs_path +from scubagoggles.api_reference import API_LINKS SCUBA_GITHUB_URL = "https://github.com/cisagov/scubagoggles" @@ -167,21 +168,6 @@ def get_failed_prereqs(test : dict, successful_calls : set, unsuccessful_calls : return failed_prereqs -def get_reference_a_tag(api_call : str) -> str: - ''' - Craft the link to the documentation page for a given API call. - - :param api_call: a string representing the API call, such as "directory/v1/users/list". - ''' - if api_call == "groups-settings/v1/groups/get": - # The reference URL for this API is structured differently than the others - return '{api_call}' - api = api_call.split('/')[0] - call = '/'.join(api_call.split('/')[1:]) - return f'' \ - f'{api_call}' - def get_failed_details(failed_prereqs : set) -> str: ''' Create the string used for the Details column of the report when one @@ -190,19 +176,9 @@ def get_failed_details(failed_prereqs : set) -> str: :param failed_prereqs: A set of strings with the API calls/function prerequisites that were not met for a given test. ''' - api_links = { - api_call: get_reference_a_tag(api_call) for api_call in [ - "directory/v1/users/list", - "directory/v1/orgunits/list", - "directory/v1/domains/list", - "directory/v1/groups/list", - "reports/v1/activities/list", - "groups-settings/v1/groups/get" - ] - } - failed_apis = [api_links[api] for api in failed_prereqs if api in api_links] - failed_functions = [call for call in failed_prereqs if call not in api_links] + failed_apis = [API_LINKS[api] for api in failed_prereqs if api in API_LINKS] + failed_functions = [call for call in failed_prereqs if call not in API_LINKS] failed_details = "" if len(failed_apis) > 0: links = ', '.join(failed_apis)