Skip to content

Commit

Permalink
Feature: Scheduler pipeline management: Pause/Resume (#30)
Browse files Browse the repository at this point in the history
* Feature: Scheduler pipeline management: Pause/Resume

* Update serializer

---------

Co-authored-by: Jaseem Jas <[email protected]>
  • Loading branch information
athul-rs and jaseemjaskp authored Mar 1, 2024
1 parent 27ca54e commit 7aec6c5
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 10 deletions.
4 changes: 3 additions & 1 deletion backend/pipeline/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import uuid

from account.models import User
from backend.constants import FieldLengthConstants as FieldLength
from django.db import models
from utils.models.base_model import BaseModel
from workflow_manager.workflow.models.workflow import Workflow

from backend.constants import FieldLengthConstants as FieldLength

APP_ID_LENGTH = 32
PIPELINE_NAME_LENGTH = 32

Expand All @@ -25,6 +26,7 @@ class PipelineStatus(models.TextChoices):
INPROGRESS = "INPROGRESS", "Inprogress"
YET_TO_START = "YET_TO_START", "Yet to start"
RESTARTING = "RESTARTING", "Restarting"
PAUSED = "PAUSED", "Paused"

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
pipeline_name = models.CharField(
Expand Down
8 changes: 7 additions & 1 deletion backend/pipeline/pipeline_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Optional

from django.utils import timezone
from pipeline.exceptions import InactivePipelineError, PipelineSaveError
Expand Down Expand Up @@ -38,7 +39,10 @@ def fetch_pipeline(pipeline_id: str, check_active: bool = True) -> Pipeline:

@staticmethod
def update_pipeline_status(
pipeline: Pipeline, status: tuple[str, str], is_end: bool
pipeline: Pipeline,
status: tuple[str, str],
is_end: bool,
is_active: Optional[bool] = None,
) -> Pipeline:
"""Updates pipeline status during execution.
Expand All @@ -52,6 +56,8 @@ def update_pipeline_status(
pipeline.last_run_time = timezone.now()
if status:
pipeline.last_run_status = status
if is_active is not None:
pipeline.active = is_active

try:
pipeline.save()
Expand Down
14 changes: 14 additions & 0 deletions backend/pipeline/serializers/update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pipeline.models import Pipeline
from rest_framework import serializers


class PipelineUpdateSerializer(serializers.Serializer):
pipeline_id = serializers.UUIDField(required=True)
active = serializers.BooleanField(required=True)

def validate_pipeline_id(self, value: str) -> str:
try:
Pipeline.objects.get(pk=value)
except Pipeline.DoesNotExist:
raise serializers.ValidationError("Invalid pipeline ID")
return value
32 changes: 31 additions & 1 deletion backend/pipeline/views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Optional
from typing import Any, Optional

from account.custom_exceptions import DuplicateData
from cron_expression_generator.constants import CronKeys
Expand All @@ -22,6 +22,7 @@
from pipeline.serializers.execute import (
PipelineExecuteSerializer as ExecuteSerializer,
)
from pipeline.serializers.update import PipelineUpdateSerializer
from rest_framework import serializers, status, viewsets
from rest_framework.request import Request
from rest_framework.response import Response
Expand Down Expand Up @@ -118,3 +119,32 @@ def perform_destroy(self, instance: Pipeline) -> None:
pipeline_to_remove = str(instance.pk)
super().perform_destroy(instance)
return SchedulerHelper.remove_job(pipeline_to_remove)

def partial_update(self, request: Request, pk: Any = None) -> Response:
serializer = PipelineUpdateSerializer(data=request.data)
if serializer.is_valid():
pipeline_id = serializer.validated_data.get("pipeline_id")
active = serializer.validated_data.get("active")
try:
if active:
SchedulerHelper.resume_job(pipeline_id)
else:
SchedulerHelper.pause_job(pipeline_id)
except Exception as e:
logger.error(f"Failed to update pipeline status: {e}")
return Response(
{"error": "Failed to update pipeline status"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)

return Response(
{
"status": "success",
"message": f"Pipeline {pipeline_id} status updated",
},
status=status.HTTP_200_OK,
)
else:
return Response(
serializer.errors, status=status.HTTP_400_BAD_REQUEST
)
23 changes: 22 additions & 1 deletion backend/scheduler/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from scheduler.constants import SchedulerConstants as SC
from scheduler.exceptions import JobDeletionError, JobSchedulingError
from scheduler.serializer import AddJobSerializer
from scheduler.tasks import delete_periodic_task
from scheduler.tasks import delete_periodic_task, disable_task, enable_task

logger = logging.getLogger(__name__)


class SchedulerHelper:
@staticmethod
def add_job(
Expand Down Expand Up @@ -51,3 +52,23 @@ def remove_job(pipeline_id: str) -> None:
except Exception as e:
logger.error(f"Exception while removing job: {e}")
raise JobDeletionError

@staticmethod
def pause_job(pipeline_id: str) -> None:
logger.info(f"Pausing job for {pipeline_id}")
try:
logger.info("Celery scheduler - pausing job")
disable_task(pipeline_id)
except Exception as e:
logger.error(f"Exception while pausing job: {e}")
raise JobSchedulingError

@staticmethod
def resume_job(pipeline_id: str) -> None:
logger.info(f"Resuming job for {pipeline_id}")
try:
logger.info("Celery scheduler - resuming job")
enable_task(pipeline_id)
except Exception as e:
logger.error(f"Exception while resuming job: {e}")
raise JobSchedulingError
31 changes: 26 additions & 5 deletions backend/scheduler/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import logging
from typing import Any, Optional

from pipeline.models import Pipeline
from account.models import Organization
from celery import shared_task
from django_celery_beat.models import CrontabSchedule, PeriodicTask
from django_tenants.utils import get_tenant_model, tenant_context
from pipeline.models import Pipeline
from pipeline.pipeline_processor import PipelineProcessor
from pymysql import IntegrityError
from workflow_manager.workflow.models.workflow import Workflow
from workflow_manager.workflow.workflow_helper import WorkflowHelper
from pipeline.pipeline_processor import PipelineProcessor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,14 +59,19 @@ def create_periodic_task(


def update_pipeline(
pipeline_guid: Optional[str], status: tuple[str, str]
pipeline_guid: Optional[str],
status: tuple[str, str],
is_active: Optional[bool] = None,
) -> Any:
if pipeline_guid:
check_active = True
if is_active is True:
check_active = False
pipeline: Pipeline = PipelineProcessor.fetch_pipeline(
pipeline_id=pipeline_guid
pipeline_id=pipeline_guid, check_active=check_active
)
PipelineProcessor.update_pipeline_status(
pipeline=pipeline, is_end=True, status=status
pipeline=pipeline, is_end=True, status=status, is_active=is_active
)
logger.info(f"Updated pipeline status: {status}")

Expand Down Expand Up @@ -120,3 +125,19 @@ def delete_periodic_task(task_name: str) -> None:
logger.error(f"Periodic task does not exist: {task_name}")
except Exception as e:
logger.error(f"Failed to delete periodic task: {e}")


@shared_task
def disable_task(task_name: str) -> None:
task = PeriodicTask.objects.get(name=task_name)
task.enabled = False
task.save()
update_pipeline(task_name, Pipeline.PipelineStatus.PAUSED, False)


@shared_task
def enable_task(task_name: str) -> None:
task = PeriodicTask.objects.get(name=task_name)
task.enabled = True
task.save()
update_pipeline(task_name, Pipeline.PipelineStatus.RESTARTING, True)
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ function Pipelines({ type }) {
};

const handleEnablePipeline = (value, id) => {
const body = { active: value };
const body = { active: value, pipeline_id: id };
const requestOptions = {
method: "PATCH",
url: `/api/v1/unstract/${sessionDetails?.orgId}/pipeline/${id}/`,
Expand Down

0 comments on commit 7aec6c5

Please sign in to comment.