Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 changes of platform settings #447

Merged
merged 10 commits into from
Jul 15, 2024
Empty file.
1 change: 1 addition & 0 deletions backend/platform_settings_v2/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Register your models here.
6 changes: 6 additions & 0 deletions backend/platform_settings_v2/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class PlatformSettingsConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "platform_settings_v2"
14 changes: 14 additions & 0 deletions backend/platform_settings_v2/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class PlatformServiceConstants:
IS_ACTIVE = "is_active"
KEY = "key"
ORGANIZATION = "organization"
ID = "id"
ACTIVATE = "ACTIVATE"
DEACTIVATE = "DEACTIVATE"
ACTION = "action"
KEY_NAME = "key_name"


class ErrorMessage:
KEY_EXIST = "Key name already exists"
DUPLICATE_API = "It appears that a duplicate call may have been made."
49 changes: 49 additions & 0 deletions backend/platform_settings_v2/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Optional

from rest_framework.exceptions import APIException


class InternalServiceError(APIException):
status_code = 500
default_detail = "Internal error occurred while performing platform key operations."


class UserForbidden(APIException):
status_code = 403
default_detail = (
"User is forbidden from performing this action. Please contact admin."
)


class KeyCountExceeded(APIException):
status_code = 403
default_detail = (
"Maximum key count is exceeded. Please delete one before generation."
)


class FoundActiveKey(APIException):
status_code = 403
default_detail = "Only one active key allowed at a time."


class ActiveKeyNotFound(APIException):
status_code = 404
default_detail = "At least one active platform key should be available"


class InvalidRequest(APIException):
status_code = 401
default_detail = "Invalid Request"


class DuplicateData(APIException):
status_code = 400
default_detail = "Duplicate Data"

def __init__(self, detail: Optional[str] = None, code: Optional[int] = None):
if detail is not None:
self.detail = detail
if code is not None:
self.code = code
super().__init__(detail, code)
Empty file.
Empty file.
51 changes: 51 additions & 0 deletions backend/platform_settings_v2/platform_auth_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging

from account_v2.authentication_controller import AuthenticationController
from account_v2.models import Organization, PlatformKey, User
from platform_settings_v2.exceptions import KeyCountExceeded, UserForbidden
from tenant_account_v2.models import OrganizationMember

PLATFORM_KEY_COUNT = 2

logger = logging.getLogger(__name__)


class PlatformAuthHelper:
"""Class to hold helper functions for Platform settings authentication."""

@staticmethod
def validate_user_role(user: User) -> None:
"""This method validates if the logged in user has admin role for
performing appropriate actions.

Args:
user (User): Logged in user from context
"""
auth_controller = AuthenticationController()
member: OrganizationMember = auth_controller.get_organization_members_by_user(
user=user
)
if not auth_controller.is_admin_by_role(member.role):
logger.error("User is not having right access to perform this operation.")
raise UserForbidden()
else:
pass

@staticmethod
def validate_token_count(organization: Organization) -> None:
"""This method validates if the organization has reached the maximum
platform key count.

Args:
organization (Organization):
Organization for which the key is being created.
"""
key_count = PlatformKey.objects.filter(organization=organization).count()
if key_count >= PLATFORM_KEY_COUNT:
logger.error(
f"Key count exceeded: {key_count}/{PLATFORM_KEY_COUNT} keys for "
f"organization ID {organization.id}."
)
raise KeyCountExceeded()
else:
pass
242 changes: 242 additions & 0 deletions backend/platform_settings_v2/platform_auth_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import logging
import uuid
from typing import Any, Optional

from account_v2.models import Organization, PlatformKey, User
from account_v2.organization import OrganizationService
from django.db import IntegrityError
from platform_settings_v2.exceptions import (
ActiveKeyNotFound,
DuplicateData,
InternalServiceError,
InvalidRequest,
)
from tenant_account_v2.constants import ErrorMessage, PlatformServiceConstants
from utils.user_context import UserContext

logger = logging.getLogger(__name__)


class PlatformAuthenticationService:
"""Service class to hold Platform service authentication and validation.

Supports generation, refresh, revoke and toggle of active keys.
"""

@staticmethod
def generate_platform_key(
is_active: bool,
key_name: str,
user: User,
organization: Optional[Organization] = None,
) -> dict[str, Any]:
"""Method to support generation of new platform key. Throws error when
maximum count is exceeded. Forbids for user other than admin
permission.

Args:
key_name (str): Value of the key
is_active (bool): By default the key is False
user (User): User object representing the user generating the key
organization (Optional[Organization], optional):
Org the key belongs to. Defaults to None.

Returns:
dict[str, Any]:
A dictionary containing the generated platform key details,
including the id, key name, and key value.
Raises:
DuplicateData: If a platform key with the same key name
already exists for the organization.
InternalServiceError: If an internal error occurs while
generating the platform key.
"""
organization: Organization = organization or UserContext.get_organization()
if not organization:
raise InternalServiceError("No valid organization provided")
try:
# TODO : Add encryption to Platform keys
# id is added here to avoid passing of keys in transactions.
platform_key: PlatformKey = PlatformKey(
id=str(uuid.uuid4()),
key=str(uuid.uuid4()),
is_active=is_active,
organization=organization,
key_name=key_name,
created_by=user,
modified_by=user,
)
platform_key.save()
result: dict[str, Any] = {}
result[PlatformServiceConstants.ID] = platform_key.id
result[PlatformServiceConstants.KEY_NAME] = platform_key.key_name
result[PlatformServiceConstants.KEY] = platform_key.key

logger.info(f"platform_key is generated for {organization.id}")
return result
except IntegrityError as error:
logger.error(
"Failed to generate platform key for "
f"organization {organization}, Integrity error: {error}"
)
raise DuplicateData(
f"{ErrorMessage.KEY_EXIST}, \
{ErrorMessage.DUPLICATE_API}"
)

@staticmethod
def delete_platform_key(id: str) -> None:
muhammad-ali-e marked this conversation as resolved.
Show resolved Hide resolved
"""Method to delete a platform key by id.

Args:
id (str): platform key primary id

Raises:
error: IntegrityError
"""
try:
platform_key: PlatformKey = PlatformKey.objects.get(pk=id)
platform_key.delete()
# TODO: Add organization details in logs in possible places once v2 enabled
logger.info(f"platform_key {id} is deleted for {platform_key.organization}")
except IntegrityError as error:
logger.error(f"Failed to delete platform key : {error}")
raise DuplicateData(
f"{ErrorMessage.KEY_EXIST}, \
{ErrorMessage.DUPLICATE_API}"
)

@staticmethod
def refresh_platform_key(id: str, user: User) -> dict[str, Any]:
"""Method to refresh a platform key.

Args:
id (str): Unique id of the key to be refreshed
new_key (str): Value to be updated.

Raises:
error: IntegrityError
"""
try:
result: dict[str, Any] = {}
platform_key: PlatformKey = PlatformKey.objects.get(pk=id)
platform_key.key = str(uuid.uuid4())
platform_key.modified_by = user
platform_key.save()
result[PlatformServiceConstants.ID] = platform_key.id
result[PlatformServiceConstants.KEY_NAME] = platform_key.key_name
result[PlatformServiceConstants.KEY] = platform_key.key

logger.info(f"platform_key {id} is updated by user {user.id}")
return result
except IntegrityError as error:
logger.error(
f"Failed to refresh platform key {id} "
f"by user {user.id}, Integrity error: {error}"
)
raise DuplicateData(
f"{ErrorMessage.KEY_EXIST}, \
{ErrorMessage.DUPLICATE_API}"
)

@staticmethod
def toggle_platform_key_status(
platform_key: PlatformKey, action: str, user: User
) -> None:
"""Method to activate/deactivate a platform key. Only one active key is
allowed at a time. On change or setting, other keys are deactivated.

Args:
platform_key (PlatformKey): The platform key to be toggled.
action (str): activate/deactivate
user (User): The user performing the action.

Raises:
InvalidRequest: If no valid organization is found.
DuplicateData: If an IntegrityError occurs during the save operation.
"""
try:
organization: Organization = UserContext.get_organization()
if not organization:
logger.error(
f"No valid organization provided to toggle status of platform key "
f"{platform_key.id} for user {user.id}"
)
raise InvalidRequest("Invalid organization")
platform_key.modified_by = user
if action == PlatformServiceConstants.ACTIVATE:
# Deactivate all active keys for the organization
PlatformKey.objects.filter(
is_active=True, organization=organization
).update(is_active=False, modified_by=user)
# Activate the chosen key
platform_key.is_active = True
elif action == PlatformServiceConstants.DEACTIVATE:
platform_key.is_active = False
else:
logger.error(
f"Invalid action: {action} for platform key {platform_key.id} "
f"by user {user.id}"
)
raise InvalidRequest(f"Invalid action: {action}")
platform_key.save()
except IntegrityError as error:
logger.error(
f"IntegrityError - Failed to {action} platform key {platform_key.id}"
f": {error}"
)
raise DuplicateData(
f"{ErrorMessage.KEY_EXIST}, {ErrorMessage.DUPLICATE_API}"
)

@staticmethod
def list_platform_key_ids() -> list[PlatformKey]:
"""Method to fetch list of platform keys unique ids for internal usage.

Returns:
Any: List of platform keys.
"""
organization_id = UserContext.get_organization_identifier()
organization: Organization = OrganizationService.get_organization_by_org_id(
org_id=organization_id
)
organization_pk = organization.id

platform_keys: list[PlatformKey] = PlatformKey.objects.filter(
organization=organization_pk
)
return platform_keys

@staticmethod
def fetch_platform_key_id() -> Any:
"""Method to fetch list of platform keys unique ids for internal usage.

Returns:
Any: List of platform keys.
"""
platform_key: list[PlatformKey] = PlatformKey.objects.all()
return platform_key

@staticmethod
def get_active_platform_key(
organization_id: Optional[str] = None,
) -> PlatformKey:
"""Method to fetch active key.

Considering only one active key is allowed at a time
Returns:
Any: platformKey.
"""
try:
organization_id = (
organization_id or UserContext.get_organization_identifier()
)
organization: Organization = OrganizationService.get_organization_by_org_id(
org_id=organization_id
)
platform_key: PlatformKey = PlatformKey.objects.get(
organization=organization, is_active=True
)
return platform_key
except PlatformKey.DoesNotExist:
raise ActiveKeyNotFound()
24 changes: 24 additions & 0 deletions backend/platform_settings_v2/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from account_v2.models import PlatformKey
from rest_framework import serializers

from backend.serializers import AuditSerializer


class PlatformKeySerializer(AuditSerializer):
class Meta:
model = PlatformKey
fields = "__all__"


class PlatformKeyGenerateSerializer(serializers.Serializer):
# Adjust these fields based on your actual serializer
is_active = serializers.BooleanField()

key_name = serializers.CharField()


class PlatformKeyIDSerializer(serializers.Serializer):
id = serializers.CharField()
key_name = serializers.CharField()
key = serializers.CharField()
is_active = serializers.BooleanField()
1 change: 1 addition & 0 deletions backend/platform_settings_v2/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Create your tests here.
Loading
Loading