Skip to content

Commit

Permalink
v2 changes of Filemanagement And Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammad-ali-e committed Jul 10, 2024
1 parent d8e731d commit 8c78008
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 16 deletions.
8 changes: 7 additions & 1 deletion backend/file_management/file_management_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Any

import magic
from connector.models import ConnectorInstance
from django.conf import settings
from django.http import StreamingHttpResponse
from file_management.exceptions import (
Expand All @@ -24,8 +23,15 @@
from fsspec import AbstractFileSystem
from pydrive2.files import ApiRequestError

from backend.constants import FeatureFlag
from unstract.connectors.filesystems import connectors as fs_connectors
from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.MULTI_TENANCY_V2):
from connector_v2.models import ConnectorInstance
else:
from connector.models import ConnectorInstance


class FileManagerHelper:
Expand Down
12 changes: 10 additions & 2 deletions backend/file_management/views.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from typing import Any

from connector.models import ConnectorInstance
from django.http import HttpRequest
from file_management.exceptions import (
ConnectorInstanceNotFound,
Expand All @@ -17,15 +16,24 @@
FileUploadSerializer,
)
from oauth2client.client import HttpAccessTokenRefreshError
from prompt_studio.prompt_studio_document_manager.models import DocumentManager
from rest_framework import serializers, status, viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.versioning import URLPathVersioning
from utils.user_session import UserSessionUtils

from backend.constants import FeatureFlag
from unstract.connectors.exceptions import ConnectorError
from unstract.connectors.filesystems.local_storage.local_storage import LocalStorageFS
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.MULTI_TENANCY_V2):
from connector_v2.models import ConnectorInstance
from prompt_studio.prompt_studio_document_manager_v2.models import DocumentManager

else:
from connector.models import ConnectorInstance
from prompt_studio.prompt_studio_document_manager.models import DocumentManager

logger = logging.getLogger(__name__)

Expand Down
25 changes: 19 additions & 6 deletions backend/scheduler/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from typing import Any

from django.db import connection
from pipeline.models import Pipeline
from pipeline.pipeline_processor import PipelineProcessor
from rest_framework.serializers import ValidationError
from scheduler.constants import SchedulerConstants as SC
from scheduler.exceptions import JobDeletionError, JobSchedulingError
Expand All @@ -15,9 +13,21 @@
disable_task,
enable_task,
)
from workflow_manager.workflow.constants import WorkflowExecutionKey, WorkflowKey
from workflow_manager.workflow.serializers import ExecuteWorkflowSerializer

from backend.constants import FeatureFlag
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.MULTI_TENANCY_V2):
from pipeline_v2.models import Pipeline
from pipeline_v2.pipeline_processor import PipelineProcessor
from utils.user_context import UserContext
from workflow_manager.workflow_v2.constants import WorkflowExecutionKey, WorkflowKey
from workflow_manager.workflow_v2.serializers import ExecuteWorkflowSerializer
else:
from pipeline.models import Pipeline
from pipeline.pipeline_processor import PipelineProcessor
from workflow_manager.workflow.constants import WorkflowExecutionKey, WorkflowKey
from workflow_manager.workflow.serializers import ExecuteWorkflowSerializer
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -46,15 +56,18 @@ def _schedule_task_job(pipeline_id: str, job_data: Any) -> None:
workflow_id = serializer.get_workflow_id(serializer.validated_data)
# TODO: Remove unused argument in execute_pipeline_task
execution_action = serializer.get_execution_action(serializer.validated_data)
org_schema = connection.tenant.schema_name
if check_feature_flag_status(FeatureFlag.MULTI_TENANCY_V2):
organization_id = UserContext.get_organization_identifier()
else:
organization_id = connection.tenant.schema_name

create_periodic_task(
cron_string=cron_string,
task_name=pipeline.pk,
task_path="scheduler.tasks.execute_pipeline_task",
task_args=[
str(workflow_id),
org_schema,
organization_id,
execution_action or "",
execution_id,
str(pipeline.pk),
Expand Down
8 changes: 7 additions & 1 deletion backend/scheduler/serializer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import logging
from typing import Any

from pipeline.manager import PipelineManager
from rest_framework import serializers
from scheduler.constants import SchedulerConstants as SC

from backend.constants import FeatureFlag
from backend.constants import FieldLengthConstants as FieldLength
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.MULTI_TENANCY_V2):
from pipeline_v2.manager import PipelineManager
else:
from pipeline.manager import PipelineManager

logger = logging.getLogger(__name__)

Expand Down
89 changes: 83 additions & 6 deletions backend/scheduler/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@
import logging
from typing import Any

from account.models import Organization
from account.subscription_loader import load_plugins, validate_etl_run
from celery import shared_task
from django_celery_beat.models import CrontabSchedule, PeriodicTask
from django_tenants.utils import get_tenant_model, tenant_context
from pipeline.models import Pipeline
from pipeline.pipeline_processor import PipelineProcessor
from workflow_manager.workflow.models.workflow import Workflow
from workflow_manager.workflow.workflow_helper import WorkflowHelper

from backend.constants import FeatureFlag
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.MULTI_TENANCY_V2):
from account_v2.subscription_loader import load_plugins, validate_etl_run
from pipeline_v2.models import Pipeline
from pipeline_v2.pipeline_processor import PipelineProcessor
from utils.user_context import UserContext
from workflow_manager.workflow_v2.models.workflow import Workflow
from workflow_manager.workflow_v2.workflow_helper import WorkflowHelper
else:
from account.models import Organization
from account.subscription_loader import load_plugins, validate_etl_run
from pipeline.models import Pipeline
from pipeline.pipeline_processor import PipelineProcessor
from workflow_manager.workflow.models.workflow import Workflow
from workflow_manager.workflow.workflow_helper import WorkflowHelper


logger = logging.getLogger(__name__)
subscription_loader = load_plugins()
Expand Down Expand Up @@ -58,6 +71,15 @@ def execute_pipeline_task(
with_logs: Any,
name: Any,
) -> None:
if check_feature_flag_status(FeatureFlag.MULTI_TENANCY_V2):
execute_pipeline_task_v2(
workflow_id=workflow_id,
organization_id=org_schema,
execution_id=execution_id,
pipeline_id=pipepline_id,
pipeline_name=name,
)
return
logger.info(f"Executing pipeline name: {name}")
try:
logger.info(f"Executing workflow id: {workflow_id}")
Expand Down Expand Up @@ -92,6 +114,61 @@ def execute_pipeline_task(
logger.error(f"Failed to execute pipeline: {name}. Error: {e}")


def execute_pipeline_task_v2(
workflow_id: Any,
organization_id: Any,
execution_id: Any,
pipeline_id: Any,
pipeline_name: Any,
) -> None:
"""V2 of execute_pipeline method.
Args:
workflow_id (Any): UID of workflow entity
org_schema (Any): Organization Identifier
execution_id (Any): UID of execution entity
pipeline_id (Any): UID of pipeline entity
name (Any): pipeline name
"""
try:
logger.info(
f"Executing workflow id: {workflow_id} for pipeline {pipeline_name}"
)
# Set organization in state store for execution
UserContext.set_organization_identifier(organization_id)
if (
subscription_loader
and subscription_loader[0]
and not validate_etl_run(organization_id)
):
try:
logger.info(f"Disabling ETL task: {pipeline_id}")
disable_task(pipeline_id)
except Exception as e:
logger.warning(f"Failed to disable task: {pipeline_id}. Error: {e}")
return
workflow = WorkflowHelper.get_workflow_by_id(
id=workflow_id, organization_id=organization_id
)
logger.info(f"Executing workflow: {workflow}")
PipelineProcessor.update_pipeline(
pipeline_id, Pipeline.PipelineStatus.INPROGRESS
)
execution_response = WorkflowHelper.complete_execution(
workflow, execution_id, pipeline_id
)
logger.info(
f"Execution response for pipeline {pipeline_name} of organization "
f"{organization_id}: {execution_response}"
)
logger.info(
f"Execution completed for pipeline {pipeline_name} of organization: "
f"{organization_id}"
)
except Exception as e:
logger.error(f"Failed to execute pipeline: {pipeline_name}. Error: {e}")


def delete_periodic_task(task_name: str) -> None:
try:
task = PeriodicTask.objects.get(name=task_name)
Expand Down

0 comments on commit 8c78008

Please sign in to comment.