Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative: Add task events to the scheduler #2214

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mula/.ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services:

ci_postgres:
image: postgres:15
command: ["postgres", "-c", "log_statement=all", "-c", "log_destination=stderr"]
healthcheck:
test: ["CMD", "gosu", "postgres", "pg_isready"]
interval: 3s
Expand Down
53 changes: 53 additions & 0 deletions mula/scheduler/alembic/versions/0008_create_task_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Create task events

Revision ID: 0008
Revises: 0007
Create Date: 2024-01-02 11:17:29.273133

"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

import scheduler

# revision identifiers, used by Alembic.
revision = "0008"
down_revision = "0007"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"task_events",
sa.Column("id", scheduler.utils.datastore.GUID(), nullable=False),
sa.Column("task_id", scheduler.utils.datastore.GUID(), nullable=False),
sa.Column("event_type", sa.String(), nullable=False),
sa.Column("event_data", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column("timestamp", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
["task_id"],
["tasks.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_task_events_task_id"), "task_events", ["task_id"], unique=False)
op.drop_index("ix_tasks_p_item_hash", table_name="tasks")
op.create_index("ix_p_item_hash", "tasks", [sa.text("(p_item->>'hash')"), sa.text("created_at DESC")], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_p_item_hash", table_name="tasks")
op.create_index(
"ix_tasks_p_item_hash",
"tasks",
[sa.text("(p_item ->> 'hash'::text)"), sa.text("created_at DESC")],
unique=False,
)
op.drop_index(op.f("ix_task_events_task_id"), table_name="task_events")
op.drop_table("task_events")
# ### end Alembic commands ###
2 changes: 1 addition & 1 deletion mula/scheduler/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from .queue import PrioritizedItem, PrioritizedItemDB, Queue
from .request import PrioritizedItemRequest
from .scheduler import Scheduler
from .tasks import BoefjeTask, NormalizerTask, Task, TaskDB, TaskStatus
from .tasks import BoefjeTask, NormalizerTask, Task, TaskDB, TaskEvent, TaskEventDB, TaskEventType, TaskStatus
141 changes: 136 additions & 5 deletions mula/scheduler/models/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import enum
import uuid
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from typing import ClassVar, List, Optional

import mmh3
from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, computed_field
from sqlalchemy import Column, DateTime, Enum, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import relationship
from sqlalchemy.schema import Index
from sqlalchemy.sql import func
from sqlalchemy.sql.expression import text
from sqlalchemy.sql.schema import ForeignKey

from scheduler.utils import GUID

Expand Down Expand Up @@ -44,6 +46,40 @@ class TaskStatus(str, enum.Enum):
CANCELLED = "cancelled"


class TaskEventType(str, enum.Enum):
STATUS_CHANGE = "status_change"


class TaskEvent(BaseModel):
"""TaskEvent represent an event that happened to a Task."""

model_config = ConfigDict(from_attributes=True)

id: uuid.UUID = Field(default_factory=uuid.uuid4)
task_id: uuid.UUID
event_type: str
event_data: dict
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))


class TaskEventDB(Base):
__tablename__ = "task_events"

id = Column(GUID, primary_key=True)

task_id = Column(GUID, ForeignKey("tasks.id"), index=True, nullable=False)
task = relationship("TaskDB", back_populates="events")

event_type = Column(String, nullable=False)
event_data = Column(JSONB, nullable=False)

timestamp = Column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)


class Task(BaseModel):
model_config = ConfigDict(from_attributes=True)

Expand All @@ -57,13 +93,104 @@ class Task(BaseModel):

status: TaskStatus

created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
events: List[TaskEvent] = []

created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

@computed_field # type: ignore
@property
def queued(self) -> Optional[timedelta]:
"""Get the time the task has been queued in seconds. From the time the
task has been QUEUED to the time it has been DISPATCHED."""
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None

# From the events, get the timestamp of the first QUEUED event
for event in self.events:
if event.event_type == TaskEventType.STATUS_CHANGE and event.event_data["to_status"] == TaskStatus.QUEUED:
start_time = event.timestamp
break

# From the events, get the timestamp of the first DISPATCHED event
for event in self.events:
if (
event.event_type == TaskEventType.STATUS_CHANGE
and event.event_data["to_status"] == TaskStatus.DISPATCHED
):
end_time = event.timestamp
break

if start_time and end_time:
return end_time - start_time

return None

@computed_field # type: ignore
@property
def runtime(self) -> Optional[timedelta]:
"""Get the runtime of the task in seconds. From the time the task has
been DISPATCHED to the time it has been COMPLETED or FAILED."""
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None

# From the events, get the timestamp of the first DISPATCHED event
for event in self.events:
if (
event.event_type == TaskEventType.STATUS_CHANGE
and event.event_data["to_status"] == TaskStatus.DISPATCHED
):
start_time = event.timestamp
break

# From the events, get the timestamp of the last COMPLETED or FAILED event
for event in reversed(self.events):
if event.event_type == TaskEventType.STATUS_CHANGE and event.event_data["to_status"] in [
TaskStatus.COMPLETED,
TaskStatus.FAILED,
]:
end_time = event.timestamp
break

if start_time and end_time:
return end_time - start_time

return None

@computed_field # type: ignore
@property
def duration(self) -> Optional[timedelta]:
"""Get the duration of the task in seconds. From the time the task has
been QUEUED to the time it has been COMPLETED or FAILED."""
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None

# From the events, get the timestamp of the first QUEUED event
for event in self.events:
if event.event_type == TaskEventType.STATUS_CHANGE and event.event_data["to_status"] == TaskStatus.QUEUED:
start_time = event.timestamp
break

# From the events, get the timestamp of the last COMPLETED or FAILED event
for event in reversed(self.events):
if event.event_type == TaskEventType.STATUS_CHANGE and event.event_data["to_status"] in [
TaskStatus.COMPLETED,
TaskStatus.FAILED,
]:
end_time = event.timestamp
break

if start_time and end_time:
return end_time - start_time

return None

def __repr__(self):
return f"Task(id={self.id}, scheduler_id={self.scheduler_id}, type={self.type}, status={self.status})"

def model_dump_db(self):
return self.model_dump(exclude={"events", "runtime", "duration", "queued"})


class TaskDB(Base):
__tablename__ = "tasks"
Expand All @@ -82,6 +209,10 @@ class TaskDB(Base):
default=TaskStatus.PENDING,
)

events = relationship(
"TaskEventDB", back_populates="task", cascade="all, delete-orphan", order_by="TaskEventDB.timestamp"
)

created_at = Column(
DateTime(timezone=True),
nullable=False,
Expand Down Expand Up @@ -109,7 +240,7 @@ class NormalizerTask(BaseModel):

type: ClassVar[str] = "normalizer"

id: Optional[uuid.UUID] = Field(default_factory=uuid.uuid4)
id: uuid.UUID = Field(default_factory=uuid.uuid4)
normalizer: Normalizer
raw_data: RawData

Expand All @@ -128,7 +259,7 @@ class BoefjeTask(BaseModel):

type: ClassVar[str] = "boefje"

id: Optional[uuid.UUID] = Field(default_factory=uuid.uuid4)
id: uuid.UUID = Field(default_factory=uuid.uuid4)
boefje: Boefje
input_ooi: Optional[str]
organization: str
Expand Down
18 changes: 18 additions & 0 deletions mula/scheduler/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,18 @@ def post_push(self, p_item: models.PrioritizedItem) -> None:
self.ctx.datastores.task_store.update_task(task)
return

# Create task
self.ctx.datastores.task_store.create_task(task)

# Create event
self.ctx.datastores.task_store.log_event(
models.TaskEvent(
task_id=task.id,
event_type=models.TaskEventType.STATUS_CHANGE,
event_data={"from_status": models.TaskStatus.PENDING, "to_status": task.status},
)
)

self.last_activity = datetime.now(timezone.utc)

def post_pop(self, p_item: models.PrioritizedItem) -> None:
Expand Down Expand Up @@ -136,6 +146,14 @@ def post_pop(self, p_item: models.PrioritizedItem) -> None:
task.status = models.TaskStatus.DISPATCHED
self.ctx.datastores.task_store.update_task(task)

self.ctx.datastores.task_store.log_event(
models.TaskEvent(
task_id=task.id,
event_type=models.TaskEventType.STATUS_CHANGE,
event_data={"from_status": models.TaskStatus.QUEUED, "to_status": task.status},
)
)

self.last_activity = datetime.now(timezone.utc)

def pop_item_from_queue(
Expand Down
27 changes: 27 additions & 0 deletions mula/scheduler/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,23 @@ def patch_task(self, task_id: str, item: Dict) -> Any:

updated_task = task_db.model_copy(update=item)

# Check if status changed and log event
if "status" in item and item["status"] != task_db.status:
try:
self.ctx.datastores.task_store.log_event(
models.TaskEvent(
task_id=task_id,
event_type=models.TaskEventType.STATUS_CHANGE,
event_data={"from_status": task_db.status, "to_status": item["status"]},
)
)
except Exception as exc:
self.logger.error(exc)
raise fastapi.HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="failed to log event",
) from exc

# Update task in database
try:
self.ctx.datastores.task_store.update_task(updated_task)
Expand All @@ -470,6 +487,16 @@ def patch_task(self, task_id: str, item: Dict) -> Any:
detail="failed to update task",
) from exc

# Retrieve updated task
try:
updated_task = self.ctx.datastores.task_store.get_task_by_id(task_id)
except Exception as exc:
self.logger.error(exc)
raise fastapi.HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="failed to get updated task",
) from exc

return updated_task

def get_task_stats(self, scheduler_id: Optional[str] = None) -> Optional[Dict[str, Dict[str, int]]]:
Expand Down
18 changes: 15 additions & 3 deletions mula/scheduler/storage/task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, List, Optional, Tuple

from sqlalchemy import exc, func
from sqlalchemy.orm import joinedload

from scheduler import models

Expand Down Expand Up @@ -61,7 +62,12 @@ def get_tasks(
@retry()
def get_task_by_id(self, task_id: str) -> Optional[models.Task]:
with self.dbconn.session.begin() as session:
task_orm = session.query(models.TaskDB).filter(models.TaskDB.id == task_id).first()
task_orm = (
session.query(models.TaskDB)
.options(joinedload(models.TaskDB.events))
.filter(models.TaskDB.id == task_id)
.first()
)
if task_orm is None:
return None

Expand Down Expand Up @@ -106,7 +112,7 @@ def get_latest_task_by_hash(self, task_hash: str) -> Optional[models.Task]:
@retry()
def create_task(self, task: models.Task) -> Optional[models.Task]:
with self.dbconn.session.begin() as session:
task_orm = models.TaskDB(**task.model_dump())
task_orm = models.TaskDB(**task.model_dump_db())
session.add(task_orm)

created_task = models.Task.model_validate(task_orm)
Expand All @@ -116,7 +122,7 @@ def create_task(self, task: models.Task) -> Optional[models.Task]:
@retry()
def update_task(self, task: models.Task) -> None:
with self.dbconn.session.begin() as session:
(session.query(models.TaskDB).filter(models.TaskDB.id == task.id).update(task.model_dump()))
(session.query(models.TaskDB).filter(models.TaskDB.id == task.id).update(task.model_dump_db()))

@retry()
def cancel_tasks(self, scheduler_id: str, task_ids: List[str]) -> None:
Expand All @@ -125,6 +131,12 @@ def cancel_tasks(self, scheduler_id: str, task_ids: List[str]) -> None:
models.TaskDB.scheduler_id == scheduler_id, models.TaskDB.id.in_(task_ids)
).update({"status": models.TaskStatus.CANCELLED.name})

@retry()
def log_event(self, event: models.TaskEvent) -> None:
with self.dbconn.session.begin() as session:
event_orm = models.TaskEventDB(**event.model_dump())
session.add(event_orm)

@retry()
def get_status_count_per_hour(
self,
Expand Down
Loading