Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task duration intervals to the scheduler #2451

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
11a7504
Add task events
jpbruinsslot Dec 21, 2023
26bc615
Merge branch 'main' into feature/mula/task-events-alternative
jpbruinsslot Dec 28, 2023
76b3759
Implement alternative task events
jpbruinsslot Dec 28, 2023
dffc491
Set index for task_id
jpbruinsslot Jan 2, 2024
740ad93
Merge branch 'main' into feature/mula/task-events-alternative
jpbruinsslot Jan 2, 2024
8d6718f
Implement alternative
jpbruinsslot Jan 31, 2024
3f25a24
Fix and update tests
jpbruinsslot Feb 5, 2024
f3b257c
Add tests
jpbruinsslot Feb 5, 2024
e9875f0
Pre commit
jpbruinsslot Feb 7, 2024
8a64695
Merge branch 'main' into feature/mula/task-events-alternative-2
jpbruinsslot Feb 7, 2024
7754dc8
Fix
jpbruinsslot Feb 7, 2024
1139f27
Merge branch 'main' into feature/mula/task-events-alternative-2
jpbruinsslot Feb 8, 2024
b559b2b
Merge branch 'main' into feature/mula/task-events-alternative-2
jpbruinsslot Feb 12, 2024
ccc315d
Merge branch 'main' into feature/mula/task-events-alternative-2
jpbruinsslot Feb 12, 2024
7c60699
Change datetime fields
jpbruinsslot Mar 4, 2024
33bf243
Merge branch 'main' into feature/mula/task-events-alternative-2
jpbruinsslot Mar 5, 2024
49abda1
Pre-commit
jpbruinsslot Mar 5, 2024
657e936
Feature/efficient reporting for all reports (#2586)
Donnype Mar 5, 2024
4b8df88
Filter out undeserializable objects from xtdb query in `construct_nei…
originalsouth Mar 5, 2024
fb71c62
TLS Report unit tests (#2593)
madelondohmen Mar 5, 2024
6015d7c
Translations update from Hosted Weblate (#2594)
weblate Mar 5, 2024
c5e79db
Fix missing finding_type table (#2596)
madelondohmen Mar 5, 2024
33f8aab
add extra checks for findings to dns report (#2506)
underdarknl Mar 5, 2024
c8590ab
fix deprecated warning due to old env in .env-defaults (#2597)
underdarknl Mar 5, 2024
5cd2e5d
Merge branch 'main' into feature/mula/task-events-alternative-2
jpbruinsslot Mar 5, 2024
03ec671
Fix tests
jpbruinsslot Mar 6, 2024
5abb1ae
Fix tests
jpbruinsslot Mar 6, 2024
39c0704
Fix tests
jpbruinsslot Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion boefjes/boefjes/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
import os
from typing import Any, Dict, List, Tuple, Union

from octopoes.models import OOI
from pydantic import ValidationError

from boefjes.clients.scheduler_client import SchedulerAPIClient, TaskStatus
from boefjes.config import settings
from boefjes.job_models import (
BoefjeMeta,
InvalidReturnValueNormalizer,
Expand All @@ -18,7 +21,6 @@
)
from boefjes.katalogus.local_repository import LocalPluginRepository
from boefjes.runtime_interfaces import BoefjeJobRunner, JobRuntimeError, NormalizerJobRunner
from octopoes.models import OOI

logger = logging.getLogger(__name__)

Expand All @@ -40,10 +42,14 @@ def __exit__(self, exc_type, exc_val, exc_tb):
class LocalBoefjeJobRunner(BoefjeJobRunner):
def __init__(self, local_repository: LocalPluginRepository):
self.local_repository = local_repository
self.scheduler_client = SchedulerAPIClient(str(settings.scheduler_api))

def run(self, boefje_meta: BoefjeMeta, environment: Dict[str, str]) -> List[Tuple[set, Union[bytes, str]]]:
logger.info("Running local boefje plugin")

task_id = str(boefje_meta.id)
self.scheduler_client.patch_task(task_id, TaskStatus.RUNNING)

Donnype marked this conversation as resolved.
Show resolved Hide resolved
boefjes = self.local_repository.resolve_boefjes()
boefje_resource = boefjes[boefje_meta.boefje.id]

Expand All @@ -58,10 +64,14 @@ def run(self, boefje_meta: BoefjeMeta, environment: Dict[str, str]) -> List[Tupl
class LocalNormalizerJobRunner(NormalizerJobRunner):
def __init__(self, local_repository: LocalPluginRepository):
self.local_repository = local_repository
self.scheduler_client = SchedulerAPIClient(str(settings.scheduler_api))

def run(self, normalizer_meta, raw) -> NormalizerOutput:
logger.info("Running local normalizer plugin")

task_id = str(normalizer_meta.id)
self.scheduler_client.patch_task(task_id, TaskStatus.RUNNING)

underdarknl marked this conversation as resolved.
Show resolved Hide resolved
normalizers = self.local_repository.resolve_normalizers()
normalizer = normalizers[normalizer_meta.normalizer.id]

Expand Down
1 change: 1 addition & 0 deletions mula/.ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services:

ci_postgres:
image: postgres:15
command: ["postgres", "-c", "log_statement=all", "-c", "log_destination=stderr"]
healthcheck:
test: ["CMD", "gosu", "postgres", "pg_isready"]
interval: 3s
Expand Down
40 changes: 40 additions & 0 deletions mula/scheduler/alembic/versions/0008_update_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Update tasks

Revision ID: 0008
Revises: 0007
Create Date: 2024-01-31 16:30:14.891313

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '0008'
down_revision = '0007'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('tasks', sa.Column('pending', sa.Interval(), nullable=True))
op.add_column('tasks', sa.Column('queued', sa.Interval(), nullable=True))
op.add_column('tasks', sa.Column('dispatched', sa.Interval(), nullable=True))
op.add_column('tasks', sa.Column('running', sa.Interval(), nullable=True))
op.add_column('tasks', sa.Column('meta', postgresql.JSONB(astext_type=sa.Text()), nullable=True))
op.drop_index('ix_tasks_p_item_hash', table_name='tasks')
op.create_index('ix_p_item_hash', 'tasks', [sa.text("(p_item->>'hash')"), sa.text('created_at DESC')], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('ix_p_item_hash', table_name='tasks')
op.create_index('ix_tasks_p_item_hash', 'tasks', [sa.text("(p_item ->> 'hash'::text)"), sa.text('created_at DESC')], unique=False)
op.drop_column('tasks', 'meta')
op.drop_column('tasks', 'running')
op.drop_column('tasks', 'dispatched')
op.drop_column('tasks', 'queued')
op.drop_column('tasks', 'pending')
# ### end Alembic commands ###
67 changes: 62 additions & 5 deletions mula/scheduler/models/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import enum
import uuid
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from typing import ClassVar, List, Optional

import mmh3
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import Column, DateTime, Enum, String
from pydantic import BaseModel, ConfigDict, Field, computed_field
from sqlalchemy import Column, DateTime, Enum, Interval, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import relationship
from sqlalchemy.schema import Index
from sqlalchemy.sql import func
from sqlalchemy.sql.expression import text
from sqlalchemy.sql.schema import ForeignKey

from scheduler.utils import GUID

Expand Down Expand Up @@ -57,10 +59,58 @@ class Task(BaseModel):

status: TaskStatus

# Durations
pending: Optional[timedelta] = None
queued: Optional[timedelta] = None
dispatched: Optional[timedelta] = None
running: Optional[timedelta] = None
underdarknl marked this conversation as resolved.
Show resolved Hide resolved

meta: Optional[dict] = None

created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed if the only state changes are for the status that you use the ..._at fields for?


def update_status(self, status: TaskStatus) -> None:
from_status = self.status
to_status = status

now_utc = datetime.now(timezone.utc)
t0 = (
self.created_at
+ (self.pending or timedelta())
+ (self.queued or timedelta())
+ (self.dispatched or timedelta())
+ (self.running or timedelta())
)

if from_status == TaskStatus.PENDING and to_status in (
TaskStatus.QUEUED,
TaskStatus.CANCELLED,
TaskStatus.FAILED,
):
self.pending = now_utc - t0
elif from_status == TaskStatus.QUEUED and to_status in (
TaskStatus.DISPATCHED,
TaskStatus.CANCELLED,
TaskStatus.FAILED,
):
self.queued = now_utc - t0
elif from_status == TaskStatus.DISPATCHED and to_status in (
TaskStatus.RUNNING,
TaskStatus.CANCELLED,
TaskStatus.FAILED,
):
self.dispatched = now_utc - t0
elif from_status == TaskStatus.RUNNING and to_status in (
TaskStatus.COMPLETED,
TaskStatus.FAILED,
TaskStatus.CANCELLED,
):
self.running = now_utc - t0

self.status = to_status

def __repr__(self):
return f"Task(id={self.id}, scheduler_id={self.scheduler_id}, type={self.type}, status={self.status})"

Expand All @@ -82,6 +132,13 @@ class TaskDB(Base):
default=TaskStatus.PENDING,
)

pending = Column(Interval)
queued = Column(Interval)
dispatched = Column(Interval)
running = Column(Interval)

meta = Column(JSONB)
Copy link
Contributor Author

@jpbruinsslot jpbruinsslot Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added meta column for runners to relay and persist information about a task, e.g. cpu, mem load, run times etc. If this info is already available through bytes we can opt to reference that. However, it might be more difficult/time-consuming to retrieve that information when we want to make rankings based on that information since we don't have it readily available in the scheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can opt for removing this field for now since it depends on changes or implementation in runners

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather collect this info in the scheduler for fast-access, as its something the user wants to filter on, see on the tasklist. Having those details only in Bytes would mean we end up reading those jobmeta's for the whole list / or worse for all jobs when rendering a filtered task list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question where and how we want to to store these meta values in the schedulers database is still very valid though.


created_at = Column(
DateTime(timezone=True),
nullable=False,
Expand Down Expand Up @@ -109,7 +166,7 @@ class NormalizerTask(BaseModel):

type: ClassVar[str] = "normalizer"

id: Optional[uuid.UUID] = Field(default_factory=uuid.uuid4)
id: uuid.UUID = Field(default_factory=uuid.uuid4)
normalizer: Normalizer
raw_data: RawData

Expand All @@ -128,7 +185,7 @@ class BoefjeTask(BaseModel):

type: ClassVar[str] = "boefje"

id: Optional[uuid.UUID] = Field(default_factory=uuid.uuid4)
id: uuid.UUID = Field(default_factory=uuid.uuid4)
boefje: Boefje
input_ooi: Optional[str]
organization: str
Expand Down
4 changes: 2 additions & 2 deletions mula/scheduler/schedulers/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def push_tasks_for_scan_profile_mutations(self, body: bytes) -> None:
if task is None:
continue

task.status = TaskStatus.CANCELLED
task.update_status(TaskStatus.CANCELLED)
self.ctx.datastores.task_store.update_task(task)

return
Expand Down Expand Up @@ -593,7 +593,7 @@ def push_task(self, boefje: Plugin, ooi: OOI, caller: str = "") -> None:

# Update task in datastore to be failed
task_db = self.ctx.datastores.task_store.get_latest_task_by_hash(task.hash)
task_db.status = TaskStatus.FAILED
task_db.update_status(TaskStatus.FAILED)
self.ctx.datastores.task_store.update_task(task_db)
except Exception as exc_stalled:
self.logger.warning(
Expand Down
35 changes: 19 additions & 16 deletions mula/scheduler/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,12 @@ def post_push(self, p_item: models.PrioritizedItem) -> None:
Args:
p_item: The prioritized item from the priority queue.
"""
# NOTE: we set the id of the task the same as the p_item, for easier
# lookup.
task = models.Task(
id=p_item.id,
scheduler_id=self.scheduler_id,
type=self.queue.item_type.type,
p_item=p_item,
status=models.TaskStatus.QUEUED,
created_at=datetime.now(timezone.utc),
modified_at=datetime.now(timezone.utc),
)

task_db = self.ctx.datastores.task_store.get_task_by_id(str(p_item.id))
if task_db is not None:
self.ctx.datastores.task_store.update_task(task)
if task_db is None:
return

self.ctx.datastores.task_store.create_task(task)
task_db.update_status(models.TaskStatus.QUEUED)
self.ctx.datastores.task_store.update_task(task_db)

self.last_activity = datetime.now(timezone.utc)

Expand All @@ -133,7 +121,7 @@ def post_pop(self, p_item: models.PrioritizedItem) -> None:
)
return

task.status = models.TaskStatus.DISPATCHED
task.update_status(models.TaskStatus.DISPATCHED)
self.ctx.datastores.task_store.update_task(task)

self.last_activity = datetime.now(timezone.utc)
Expand Down Expand Up @@ -195,6 +183,21 @@ def push_item_to_queue(self, p_item: models.PrioritizedItem) -> None:
)
raise queues.errors.NotAllowedError("Scheduler is disabled")

# Create task
#
# NOTE: we set the id of the task the same as the p_item, for easier
# lookup.
task = models.Task(
id=p_item.id,
scheduler_id=self.scheduler_id,
type=self.queue.item_type.type,
p_item=p_item,
status=models.TaskStatus.PENDING,
created_at=datetime.now(timezone.utc),
modified_at=datetime.now(timezone.utc),
)
self.ctx.datastores.task_store.create_task(task)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to make the sequence pending > queued > dispatched work correctly within the scheduler

try:
self.queue.push(p_item)
except queues.errors.NotAllowedError as exc:
Expand Down
14 changes: 14 additions & 0 deletions mula/scheduler/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ def patch_task(self, task_id: str, item: Dict) -> Any:
detail="task not found",
)

# Check if status changed and update duration
if "status" in item and item["status"] != task_db.status:
task_db.update_status(item["status"])

updated_task = task_db.model_copy(update=item)

# Update task in database
Expand All @@ -470,6 +474,16 @@ def patch_task(self, task_id: str, item: Dict) -> Any:
detail="failed to update task",
) from exc

# Retrieve updated task
try:
updated_task = self.ctx.datastores.task_store.get_task_by_id(task_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you are trying to achieve with fetching the just saved task from the database.

except Exception as exc:
self.logger.error(exc)
raise fastapi.HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="failed to get updated task",
) from exc

return updated_task

def get_task_stats(self, scheduler_id: Optional[str] = None) -> Optional[Dict[str, Dict[str, int]]]:
Expand Down
1 change: 1 addition & 0 deletions mula/scheduler/storage/task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, List, Optional, Tuple

from sqlalchemy import exc, func
from sqlalchemy.orm import joinedload

from scheduler import models

Expand Down
9 changes: 8 additions & 1 deletion mula/tests/integration/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from unittest import mock

from fastapi.testclient import TestClient
from scheduler import config, models, server, storage

from scheduler import config, models, server, storage
from tests.factories import OrganisationFactory
from tests.mocks import queue as mock_queue
from tests.mocks import scheduler as mock_scheduler
Expand Down Expand Up @@ -599,6 +599,13 @@ def test_patch_tasks(self):
self.assertEqual(200, response.status_code)
self.assertEqual("completed", response.json().get("status"))

def test_patch_task_update_status(self):
self.assertEqual("queued", self.first_item_api.get("status"))
response = self.client.patch(f"/tasks/{self.first_item_api.get('id')}", json={"status": "dispatched"})
self.assertEqual(200, response.status_code)
self.assertEqual("dispatched", response.json().get("status"))
self.assertIsNotNone(response.json().get("queued"))

def test_patch_task_empty(self):
# Patch a task with empty body
response = self.client.patch(f"/tasks/{self.first_item_api.get('id')}", json={})
Expand Down
Loading
Loading