Skip to content

Commit

Permalink
v2 changes of platform settings (#447)
Browse files Browse the repository at this point in the history
* v2 changes of platform settings

* considered optional params

* Addressed Pr comments, Removed unnecessory handling of generic exceptions

* Update backend/platform_settings_v2/exceptions.py

Co-authored-by: Chandrasekharan M <[email protected]>
Signed-off-by: ali <[email protected]>

* Update backend/platform_settings_v2/exceptions.py

Co-authored-by: Chandrasekharan M <[email protected]>
Signed-off-by: ali <[email protected]>

* optimized toggle_platform_key_status function

* Added TODO comment for a task

---------

Signed-off-by: ali <[email protected]>
Co-authored-by: Chandrasekharan M <[email protected]>
Co-authored-by: Hari John Kuriakose <[email protected]>
  • Loading branch information
3 people authored Jul 15, 2024
1 parent 4a677f0 commit c805c80
Show file tree
Hide file tree
Showing 13 changed files with 536 additions and 0 deletions.
Empty file.
1 change: 1 addition & 0 deletions backend/platform_settings_v2/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Register your models here.
6 changes: 6 additions & 0 deletions backend/platform_settings_v2/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class PlatformSettingsConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "platform_settings_v2"
14 changes: 14 additions & 0 deletions backend/platform_settings_v2/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class PlatformServiceConstants:
IS_ACTIVE = "is_active"
KEY = "key"
ORGANIZATION = "organization"
ID = "id"
ACTIVATE = "ACTIVATE"
DEACTIVATE = "DEACTIVATE"
ACTION = "action"
KEY_NAME = "key_name"


class ErrorMessage:
KEY_EXIST = "Key name already exists"
DUPLICATE_API = "It appears that a duplicate call may have been made."
49 changes: 49 additions & 0 deletions backend/platform_settings_v2/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Optional

from rest_framework.exceptions import APIException


class InternalServiceError(APIException):
status_code = 500
default_detail = "Internal error occurred while performing platform key operations."


class UserForbidden(APIException):
status_code = 403
default_detail = (
"User is forbidden from performing this action. Please contact admin."
)


class KeyCountExceeded(APIException):
status_code = 403
default_detail = (
"Maximum key count is exceeded. Please delete one before generation."
)


class FoundActiveKey(APIException):
status_code = 403
default_detail = "Only one active key allowed at a time."


class ActiveKeyNotFound(APIException):
status_code = 404
default_detail = "At least one active platform key should be available"


class InvalidRequest(APIException):
status_code = 401
default_detail = "Invalid Request"


class DuplicateData(APIException):
status_code = 400
default_detail = "Duplicate Data"

def __init__(self, detail: Optional[str] = None, code: Optional[int] = None):
if detail is not None:
self.detail = detail
if code is not None:
self.code = code
super().__init__(detail, code)
Empty file.
Empty file.
51 changes: 51 additions & 0 deletions backend/platform_settings_v2/platform_auth_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging

from account_v2.authentication_controller import AuthenticationController
from account_v2.models import Organization, PlatformKey, User
from platform_settings_v2.exceptions import KeyCountExceeded, UserForbidden
from tenant_account_v2.models import OrganizationMember

PLATFORM_KEY_COUNT = 2

logger = logging.getLogger(__name__)


class PlatformAuthHelper:
"""Class to hold helper functions for Platform settings authentication."""

@staticmethod
def validate_user_role(user: User) -> None:
"""This method validates if the logged in user has admin role for
performing appropriate actions.
Args:
user (User): Logged in user from context
"""
auth_controller = AuthenticationController()
member: OrganizationMember = auth_controller.get_organization_members_by_user(
user=user
)
if not auth_controller.is_admin_by_role(member.role):
logger.error("User is not having right access to perform this operation.")
raise UserForbidden()
else:
pass

@staticmethod
def validate_token_count(organization: Organization) -> None:
"""This method validates if the organization has reached the maximum
platform key count.
Args:
organization (Organization):
Organization for which the key is being created.
"""
key_count = PlatformKey.objects.filter(organization=organization).count()
if key_count >= PLATFORM_KEY_COUNT:
logger.error(
f"Key count exceeded: {key_count}/{PLATFORM_KEY_COUNT} keys for "
f"organization ID {organization.id}."
)
raise KeyCountExceeded()
else:
pass
242 changes: 242 additions & 0 deletions backend/platform_settings_v2/platform_auth_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import logging
import uuid
from typing import Any, Optional

from account_v2.models import Organization, PlatformKey, User
from account_v2.organization import OrganizationService
from django.db import IntegrityError
from platform_settings_v2.exceptions import (
ActiveKeyNotFound,
DuplicateData,
InternalServiceError,
InvalidRequest,
)
from tenant_account_v2.constants import ErrorMessage, PlatformServiceConstants
from utils.user_context import UserContext

logger = logging.getLogger(__name__)


class PlatformAuthenticationService:
"""Service class to hold Platform service authentication and validation.
Supports generation, refresh, revoke and toggle of active keys.
"""

@staticmethod
def generate_platform_key(
is_active: bool,
key_name: str,
user: User,
organization: Optional[Organization] = None,
) -> dict[str, Any]:
"""Method to support generation of new platform key. Throws error when
maximum count is exceeded. Forbids for user other than admin
permission.
Args:
key_name (str): Value of the key
is_active (bool): By default the key is False
user (User): User object representing the user generating the key
organization (Optional[Organization], optional):
Org the key belongs to. Defaults to None.
Returns:
dict[str, Any]:
A dictionary containing the generated platform key details,
including the id, key name, and key value.
Raises:
DuplicateData: If a platform key with the same key name
already exists for the organization.
InternalServiceError: If an internal error occurs while
generating the platform key.
"""
organization: Organization = organization or UserContext.get_organization()
if not organization:
raise InternalServiceError("No valid organization provided")
try:
# TODO : Add encryption to Platform keys
# id is added here to avoid passing of keys in transactions.
platform_key: PlatformKey = PlatformKey(
id=str(uuid.uuid4()),
key=str(uuid.uuid4()),
is_active=is_active,
organization=organization,
key_name=key_name,
created_by=user,
modified_by=user,
)
platform_key.save()
result: dict[str, Any] = {}
result[PlatformServiceConstants.ID] = platform_key.id
result[PlatformServiceConstants.KEY_NAME] = platform_key.key_name
result[PlatformServiceConstants.KEY] = platform_key.key

logger.info(f"platform_key is generated for {organization.id}")
return result
except IntegrityError as error:
logger.error(
"Failed to generate platform key for "
f"organization {organization}, Integrity error: {error}"
)
raise DuplicateData(
f"{ErrorMessage.KEY_EXIST}, \
{ErrorMessage.DUPLICATE_API}"
)

@staticmethod
def delete_platform_key(id: str) -> None:
"""Method to delete a platform key by id.
Args:
id (str): platform key primary id
Raises:
error: IntegrityError
"""
try:
platform_key: PlatformKey = PlatformKey.objects.get(pk=id)
platform_key.delete()
# TODO: Add organization details in logs in possible places once v2 enabled
logger.info(f"platform_key {id} is deleted for {platform_key.organization}")
except IntegrityError as error:
logger.error(f"Failed to delete platform key : {error}")
raise DuplicateData(
f"{ErrorMessage.KEY_EXIST}, \
{ErrorMessage.DUPLICATE_API}"
)

@staticmethod
def refresh_platform_key(id: str, user: User) -> dict[str, Any]:
"""Method to refresh a platform key.
Args:
id (str): Unique id of the key to be refreshed
new_key (str): Value to be updated.
Raises:
error: IntegrityError
"""
try:
result: dict[str, Any] = {}
platform_key: PlatformKey = PlatformKey.objects.get(pk=id)
platform_key.key = str(uuid.uuid4())
platform_key.modified_by = user
platform_key.save()
result[PlatformServiceConstants.ID] = platform_key.id
result[PlatformServiceConstants.KEY_NAME] = platform_key.key_name
result[PlatformServiceConstants.KEY] = platform_key.key

logger.info(f"platform_key {id} is updated by user {user.id}")
return result
except IntegrityError as error:
logger.error(
f"Failed to refresh platform key {id} "
f"by user {user.id}, Integrity error: {error}"
)
raise DuplicateData(
f"{ErrorMessage.KEY_EXIST}, \
{ErrorMessage.DUPLICATE_API}"
)

@staticmethod
def toggle_platform_key_status(
platform_key: PlatformKey, action: str, user: User
) -> None:
"""Method to activate/deactivate a platform key. Only one active key is
allowed at a time. On change or setting, other keys are deactivated.
Args:
platform_key (PlatformKey): The platform key to be toggled.
action (str): activate/deactivate
user (User): The user performing the action.
Raises:
InvalidRequest: If no valid organization is found.
DuplicateData: If an IntegrityError occurs during the save operation.
"""
try:
organization: Organization = UserContext.get_organization()
if not organization:
logger.error(
f"No valid organization provided to toggle status of platform key "
f"{platform_key.id} for user {user.id}"
)
raise InvalidRequest("Invalid organization")
platform_key.modified_by = user
if action == PlatformServiceConstants.ACTIVATE:
# Deactivate all active keys for the organization
PlatformKey.objects.filter(
is_active=True, organization=organization
).update(is_active=False, modified_by=user)
# Activate the chosen key
platform_key.is_active = True
elif action == PlatformServiceConstants.DEACTIVATE:
platform_key.is_active = False
else:
logger.error(
f"Invalid action: {action} for platform key {platform_key.id} "
f"by user {user.id}"
)
raise InvalidRequest(f"Invalid action: {action}")
platform_key.save()
except IntegrityError as error:
logger.error(
f"IntegrityError - Failed to {action} platform key {platform_key.id}"
f": {error}"
)
raise DuplicateData(
f"{ErrorMessage.KEY_EXIST}, {ErrorMessage.DUPLICATE_API}"
)

@staticmethod
def list_platform_key_ids() -> list[PlatformKey]:
"""Method to fetch list of platform keys unique ids for internal usage.
Returns:
Any: List of platform keys.
"""
organization_id = UserContext.get_organization_identifier()
organization: Organization = OrganizationService.get_organization_by_org_id(
org_id=organization_id
)
organization_pk = organization.id

platform_keys: list[PlatformKey] = PlatformKey.objects.filter(
organization=organization_pk
)
return platform_keys

@staticmethod
def fetch_platform_key_id() -> Any:
"""Method to fetch list of platform keys unique ids for internal usage.
Returns:
Any: List of platform keys.
"""
platform_key: list[PlatformKey] = PlatformKey.objects.all()
return platform_key

@staticmethod
def get_active_platform_key(
organization_id: Optional[str] = None,
) -> PlatformKey:
"""Method to fetch active key.
Considering only one active key is allowed at a time
Returns:
Any: platformKey.
"""
try:
organization_id = (
organization_id or UserContext.get_organization_identifier()
)
organization: Organization = OrganizationService.get_organization_by_org_id(
org_id=organization_id
)
platform_key: PlatformKey = PlatformKey.objects.get(
organization=organization, is_active=True
)
return platform_key
except PlatformKey.DoesNotExist:
raise ActiveKeyNotFound()
24 changes: 24 additions & 0 deletions backend/platform_settings_v2/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from account_v2.models import PlatformKey
from rest_framework import serializers

from backend.serializers import AuditSerializer


class PlatformKeySerializer(AuditSerializer):
class Meta:
model = PlatformKey
fields = "__all__"


class PlatformKeyGenerateSerializer(serializers.Serializer):
# Adjust these fields based on your actual serializer
is_active = serializers.BooleanField()

key_name = serializers.CharField()


class PlatformKeyIDSerializer(serializers.Serializer):
id = serializers.CharField()
key_name = serializers.CharField()
key = serializers.CharField()
is_active = serializers.BooleanField()
1 change: 1 addition & 0 deletions backend/platform_settings_v2/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Create your tests here.
Loading

0 comments on commit c805c80

Please sign in to comment.