Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype of the remote boefje runner #3223

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions boefjes/boefjes/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def create_boefje_meta(task, local_repository):
arguments = {"oci_arguments": boefje_resource.oci_arguments}

if input_ooi:
reference = Reference.from_str(input_ooi)
reference = Reference.from_str(input_ooi) # TODO SOUF why is this here? Just giving `input_ooi` works too
try:
ooi = get_octopoes_api_connector(organization).get(reference, valid_time=datetime.now(timezone.utc))
except ObjectNotFoundException as e:
Expand All @@ -170,7 +170,7 @@ def create_boefje_meta(task, local_repository):
boefje_meta = BoefjeMeta(
id=task.id,
boefje=boefje,
input_ooi=input_ooi,
input_ooi=input_ooi, # `input_ooi` is of type str, and we just give it an instance here?
arguments=arguments,
organization=organization,
environment=environment,
Expand Down
4 changes: 1 addition & 3 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,8 @@ def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue):
all_queues_empty = True

for queue_type in queues:
logger.debug("Popping from queue %s", queue_type.id)

try:
p_item = self.scheduler_client.pop_item(queue_type.id)
p_item = self.scheduler_client.pop_non_remote_item(queue_type.id)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(10)
Expand Down
22 changes: 19 additions & 3 deletions boefjes/boefjes/clients/scheduler_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import uuid
from enum import Enum
from typing import Literal

from httpx import Client, HTTPTransport, Response
from pydantic import BaseModel, TypeAdapter
Expand All @@ -26,6 +27,7 @@ class QueuePrioritizedItem(BaseModel):
priority: int
hash: str | None = None
data: BoefjeMeta | NormalizerMeta
remote: bool


class TaskStatus(Enum):
Expand All @@ -47,13 +49,26 @@ class Task(BaseModel):
status: TaskStatus
created_at: datetime.datetime
modified_at: datetime.datetime
remote: bool


# TODO: SOUF ask where to put this
class Filter(BaseModel):
column: str
field: str | None = None
operator: Literal["=="] = "=="
value: bool


class QueuePopModel(BaseModel):
filters: list[Filter]


class SchedulerClientInterface:
def get_queues(self) -> list[Queue]:
raise NotImplementedError()

def pop_item(self, queue: str) -> QueuePrioritizedItem | None:
def pop_non_remote_item(self, queue: str) -> QueuePrioritizedItem | None:
raise NotImplementedError()

def patch_task(self, task_id: uuid.UUID, status: TaskStatus) -> None:
Expand All @@ -80,8 +95,9 @@ def get_queues(self) -> list[Queue]:

return TypeAdapter(list[Queue]).validate_json(response.content)

def pop_item(self, queue: str) -> QueuePrioritizedItem | None:
response = self._session.post(f"/queues/{queue}/pop")
def pop_non_remote_item(self, queue: str) -> QueuePrioritizedItem | None:
non_remote_filter = QueuePopModel(filters=[Filter(column="remote", operator="==", value=False)])
response = self._session.post(f"/queues/{queue}/pop", json=non_remote_filter.model_dump())
Comment on lines +98 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to keep the pop_item name the same, but configure the self._session (eg, the schedulerclient) to have a different url on which it can find the scheduler for remote runners.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a new url or changing the method's name, I have thought of adding a new parameter to pop_item that would change the query. The change would look like something like this:

def pop_non_remote_item(self, queue: str) -> QueuePrioritizedItem | None:
	non_remote_filter = QueuePopModel(filters=[Filter(column="remote", operator="==", value=False)])
	response = self._session.post(f"/queues/{queue}/pop", json=non_remote_filter.model_dump())

=>

def pop_item(self, queue: str, remote: bool = False) -> QueuePrioritizedItem | None:
	non_remote_filter = QueuePopModel(filters=[Filter(column="remote", operator="==", value=remote)])
	response = self._session.post(f"/queues/{queue}/pop", json=non_remote_filter.model_dump())

(added a new parameter used for the filter and changed the name back)

Or do you think it'd be better to add a new URLs to the scheduler which would only pop non-remote tasks and remote tasks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its the other way round.
The Scheduler is located on one location. and has one url (possibly two if you where to use a local url and an external url).

It's the boefjes runners that are hosted somewhere else. They need to contact the scheduler for Jobs. to do so, they need to know the url for the scheduler (eg, remote accessible, or local url).
With that request for jobs they need to send any limitations they themselves know about. Eg, Can they reach network X, can they reach internet, Can they perform ipv6 lookups etc.
Which limitations they have, and where the scheduler can be found needs to be configurable per Boefje Runner in a config file they can read locally (or via Env vars)

The scheduler then returns this filtered list of jobs, and the boefje runner starts processing them.
Once done, the boefje needs to return it's raw files to Bytes. For that a url/path to bytes also needs to be present, much in the same way the url/path to the scheduler needs to be present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scheduler is located on one location. and has one url (possibly two if you where to use a local url and an external url).

I don't understand what you mean with the scheduler potentially having 2 URLs.
Do you mean with "local url" and "external url" for example http://localhost:8004 and https://www.example.com/scheduler?

With that request for jobs they need to send any limitations they themselves know about. Eg, Can they reach network X, can they reach internet, Can they perform ipv6 lookups etc.

Would the information about those jobs all be inside the same tasks-queue that the current boefje-runner uses?
(With "boefjes runner" you mean the BoefjeHandler I assume).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, for some the scheduler can be found on localhost, or on a local IP, for others it might be located on https://somesite.com/scheduler A boefje runner needs one to contact the scheduler for jobs, and one to send raw files to bytes once the jobs are done.
The task-queue exists in the scheduler, the boefje runner just picks up jobs from the scheduler and executes them locally. A boefje runner does not have more than one job active per thread at the moment, so no queue exists there.

self._verify_response(response)

return TypeAdapter(QueuePrioritizedItem | None).validate_json(response.content)
Expand Down
2 changes: 2 additions & 0 deletions boefjes/boefjes/job_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Job(BaseModel):
id: UUID
started_at: AwareDatetime | None = Field(default=None)
ended_at: AwareDatetime | None = Field(default=None)
remote: bool = Field(default=False)

@property
def runtime(self) -> timedelta | None:
Expand All @@ -33,6 +34,7 @@ class Boefje(BaseModel):

id: Annotated[str, StringConstraints(min_length=1)]
version: str | None = Field(default=None)
remote: bool = Field(default=False)


class Normalizer(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions boefjes/boefjes/katalogus/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Plugin(BaseModel):
environment_keys: list[str] = Field(default_factory=list)
related: list[str] | None = None
enabled: bool = False
remote: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not the boefje that is 'remote', its the job in a specific context (eg, on a given IP, or connected to a specific network) that needs/can be ran remotely.

Jobs for Nmap-tcp that are scheduled for an IP in a network|Soufyan-home should not be returned to the job-runner who's limitations only mention Network|Internet, they should be returned to the boefje-runner that asks for jobs with the filter: network|Soufyan-home

N.B. This scoping on networks still needs to be build in the scheduler.

A simpler limitation/job filter could be the following:
The Nmap-TCP boefje can create two types of jobs for Internet connected IP's, one for ipv4 and one for ipv6 input objects.
To be able to succesfully run all Nmap-TCP jobs, the boefje runner needs to be able to access ipv4 and ipv6 'internet'.
I can see a way forward where there's two specific NMAP-TCP boefjes configured, one for each protocol. The Boefje manifest then lists the following needed Traits: ipv4-connectivity, ipv6-connectivity.
The boefje-runner can then from its config file read that it can only reach ipv4 networks, it can reach the network Internet and produces a job-query for the scheduler to receive only jobs that have the ipv4-connectivity trait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not the boefje that is 'remote', its the job in a specific context (eg, on a given IP, or connected to a specific network) that needs/can be ran remotely.

This has been implemented this way because of the fact that tasks currently created with information from the boefje. (so my idea was to create boefjes that state that all tasks created for this type of boefje will have to be ran remotely).

they should be returned to the boefje-runner that asks for jobs with the filter: network|Soufyan-home

Do you have an idea when these tasks should be created? Would the user manually have to create these tasks and mention in what network they should run?

To be able to succesfully run all Nmap-TCP jobs, the boefje runner needs to be able to access ipv4 and ipv6 'internet'.
I can see a way forward where there's two specific NMAP-TCP boefjes configured, one for each protocol.

Do you want OpenKAT give the option to run the same boefje multiple times (with each their own settings)?

The boefje-runner can then from its config file read that it can only reach ipv4 networks, it can reach the network Internet and produces a job-query for the scheduler to receive only jobs that have the ipv4-connectivity trait.

I don't understand what you mean with a boefje-runner only being able to reach ipv4 networks. Shouldn't the boefje decide what kind of IPAddress it can take in?


Right now I have only focused on implementing a "remote" boefje that can request a job whenever that boefje is ready from the outside.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not the boefje that is 'remote', its the job in a specific context (eg, on a given IP, or connected to a specific network) that needs/can be ran remotely.

This has been implemented this way because of the fact that tasks currently created with information from the boefje. (so my idea was to create boefjes that state that all tasks created for this type of boefje will have to be ran remotely).

The way you implemented the Remote trait now, can work, but I would add a more general traits system to the boefje config and allow filtering on any/all of them.

they should be returned to the boefje-runner that asks for jobs with the filter: network|Soufyan-home

Do you have an idea when these tasks should be created? Would the user manually have to create these tasks and mention in what network they should run?

Ideally the scheduler just makes jobs, one for each boefje+input-ooi combination. Where they are executed is not up to the user. The user can add their own jobs, but again they won't be able to steer which runner runs these jobs.
All boefjes runners wlll try to execute jobs from the queue as created by the scheduler, but since not all jobs can be excecuted from everywhere, we need to make sure we have enough logic in the scheduler to give the correct jobs to the correct runner based on what networks/hosts they can reach (scopes), and what traits they have (ipv4, ipv6)

To be able to succesfully run all Nmap-TCP jobs, the boefje runner needs to be able to access ipv4 and ipv6 'internet'.
I can see a way forward where there's two specific NMAP-TCP boefjes configured, one for each protocol.

Do you want OpenKAT give the option to run the same boefje multiple times (with each their own settings)?

The boefje-runner can then from its config file read that it can only reach ipv4 networks, it can reach the network Internet and produces a job-query for the scheduler to receive only jobs that have the ipv4-connectivity trait.

I don't understand what you mean with a boefje-runner only being able to reach ipv4 networks. Shouldn't the boefje decide what kind of IPAddress it can take in?

A boefje like Nmap can consume all IP-addresses and should list both types as valid input-oois. However, if the machine running that specific boefje does not have ipv6 connectivity, running Nmap on an ipv6 address just spits out errors. The same would be true for trying to download an ipv6 version of a website.

Right now I have only focused on implementing a "remote" boefje that can request a job whenever that boefje is ready from the outside.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am starting to understand it better, thank you 👍.

I will try to find a way of adding flags to tasks which can be filtered by the boefje runners.

It is still unclear how these tasks should be created with these flags. Should the nmap boefje specify that it will potentially run with IPv6 addresses or should the nmap boefje see that it's runner is not able to run with IPv6 addresses and skip those OOIs?


def __str__(self):
return f"{self.id}:{self.version}"
Expand Down
1 change: 1 addition & 0 deletions boefjes/boefjes/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from octopoes.models import OOI, DeclaredScanProfile

logger = logging.getLogger(__name__)
# TODO: SOUF change filename to `local_boefjes_runner` for consistency


class TemporaryEnvironment:
Expand Down
Empty file.
13 changes: 13 additions & 0 deletions boefjes/boefjes/plugins/kat_remote_scanner/boefje.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"id": "remote-scanner",
"name": "Remote scanner",
"description": "Scans from the remote",
"consumes": [
"HTTPResource",
"IPAddressV4",
"IPAddressV6"
],
"environment_keys": [],
"scan_level": 4,
"remote": true
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions boefjes/boefjes/plugins/kat_remote_scanner/description.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# REMOTE SCANNER

TODO: SOUF ASK
WHERE DO I GET DISPLAYED
2 changes: 2 additions & 0 deletions boefjes/boefjes/plugins/kat_remote_scanner/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def run(boefje_meta: dict):
return [(set(), "[ERROR] This should never be ran")]
4 changes: 3 additions & 1 deletion boefjes/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ def get_queues(self) -> list[Queue]:
time.sleep(self.sleep_time)
return TypeAdapter(list[Queue]).validate_json(self.queue_response)

def pop_item(self, queue: str) -> QueuePrioritizedItem | None:
def pop_non_remote_item(self, queue: str) -> QueuePrioritizedItem | None:
time.sleep(self.sleep_time)

try:
if WorkerManager.Queue.BOEFJES.value in queue:
print(self.boefje_responses[0].decode())
p_item = TypeAdapter(QueuePrioritizedItem).validate_json(self.boefje_responses.pop(0))
self._popped_items[str(p_item.id)] = p_item
self._tasks[str(p_item.id)] = self._task_from_id(p_item.id)
Expand Down Expand Up @@ -86,6 +87,7 @@ def _task_from_id(self, task_id: UUID):
status=TaskStatus.DISPATCHED,
created_at=datetime.now(timezone.utc),
modified_at=datetime.now(timezone.utc),
remote=False,
)

def push_item(self, queue_id: str, p_item: QueuePrioritizedItem) -> None:
Expand Down
9 changes: 6 additions & 3 deletions boefjes/tests/examples/scheduler/pop_response_boefje.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"boefje": {
"id": "dns-records",
"version": null
"version": null,
"remote": false
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
}
"ended_at": null,
"remote": false
},
"remote": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"boefje": {
"id": "dns-records",
"version": null
"version": null,
"remote": false
},
"input_ooi": "",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
}
"ended_at": null,
"remote": false
},
"remote": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"priority": 1,
"scheduler_id": "normalizer-_dev",
"hash": "7e698c377cfd85015c0d7086b76b76b4",
"remote": false,
"data": {
"id": "60da7d4ff41f4940901bd98a92e9014b",
"raw_data": {
Expand Down Expand Up @@ -33,7 +34,8 @@
"DNSSOARecord",
"DNSCNAMERecord"
],
"dispatches": null
"dispatches": null,
"remote": false
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
Expand Down
9 changes: 6 additions & 3 deletions boefjes/tests/examples/scheduler/should_crash.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
"DNSSOARecord",
"DNSCNAMERecord"
],
"dispatches": null
"dispatches": null,
"remote": false
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"dispatches": []
}
"dispatches": [],
"remote": false
},
"remote": false
}
4 changes: 2 additions & 2 deletions boefjes/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_healthz(api):

def test_boefje_input_running(api, tmp_path):
scheduler_client = _mocked_scheduler_client(tmp_path)
task = scheduler_client.pop_item("boefje")
task = scheduler_client.pop_non_remote_item("boefje")
scheduler_client.patch_task(task.id, TaskStatus.RUNNING)
api.app.dependency_overrides[boefjes.api.get_scheduler_client] = lambda: scheduler_client

Expand All @@ -49,7 +49,7 @@ def test_boefje_input_running(api, tmp_path):

def test_boefje_input_not_running(api, tmp_path):
scheduler_client = _mocked_scheduler_client(tmp_path)
scheduler_client.pop_item("boefje")
scheduler_client.pop_non_remote_item("boefje")
api.app.dependency_overrides[boefjes.api.get_scheduler_client] = lambda: scheduler_client

response = api.get("/api/v0/tasks/70da7d4f-f41f-4940-901b-d98a92e9014b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def upgrade():
),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("modified_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("remote", sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
# ### end Alembic commands ###
Expand Down
1 change: 1 addition & 0 deletions mula/scheduler/alembic/versions/0002_update_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def upgrade():
sa.Column("data", sa.JSON(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("modified_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("remote", sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)

Expand Down
1 change: 1 addition & 0 deletions mula/scheduler/models/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Boefje(BaseModel):
id: str
name: str | None = Field(default=None)
version: str | None = Field(default=None)
remote: bool = False


class BoefjeMeta(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions mula/scheduler/models/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ class Plugin(BaseModel):
consumes: str | list[str]
options: list[str] | None = None
produces: list[str]
remote: bool = False
6 changes: 5 additions & 1 deletion mula/scheduler/models/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime, timezone

from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import func

Expand Down Expand Up @@ -34,6 +34,8 @@ class PrioritizedItem(BaseModel):

modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

remote: bool = Field(default=False)


class PrioritizedItemDB(Base):
__tablename__ = "items"
Expand Down Expand Up @@ -61,6 +63,8 @@ class PrioritizedItemDB(Base):
onupdate=func.now(),
)

remote = Column(Boolean)


class Queue(BaseModel):
id: str
Expand Down
7 changes: 6 additions & 1 deletion mula/scheduler/models/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import mmh3
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import Column, DateTime, Enum, String
from sqlalchemy import Boolean, Column, DateTime, Enum, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.schema import Index
from sqlalchemy.sql import func
Expand Down Expand Up @@ -61,6 +61,8 @@ class Task(BaseModel):

modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

remote: bool

def __repr__(self):
return f"Task(id={self.id}, scheduler_id={self.scheduler_id}, type={self.type}, status={self.status})"

Expand Down Expand Up @@ -95,6 +97,8 @@ class TaskDB(Base):
onupdate=func.now(),
)

remote = Column(Boolean)

__table_args__ = (
Index(
"ix_p_item_hash",
Expand Down Expand Up @@ -132,6 +136,7 @@ class BoefjeTask(BaseModel):
boefje: Boefje
input_ooi: str | None
organization: str
remote: bool = False

dispatches: list[Normalizer] = Field(default_factory=list)

Expand Down
5 changes: 3 additions & 2 deletions mula/scheduler/schedulers/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,11 +559,11 @@ def push_task(self, boefje: Plugin, ooi: OOI, caller: str = "") -> None:

"""
task = BoefjeTask(
boefje=Boefje.parse_obj(boefje.dict()),
boefje=Boefje.model_validate(boefje.model_dump()),
input_ooi=ooi.primary_key,
organization=self.organisation.id,
remote=boefje.remote,
)

if not self.is_task_allowed_to_run(boefje, ooi):
self.logger.debug(
"Task is not allowed to run: %s",
Expand Down Expand Up @@ -690,6 +690,7 @@ def push_task(self, boefje: Plugin, ooi: OOI, caller: str = "") -> None:
priority=score,
data=task.model_dump(),
hash=task.hash,
remote=boefje.remote,
)

try:
Expand Down
2 changes: 2 additions & 0 deletions mula/scheduler/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def post_push(self, p_item: models.PrioritizedItem) -> None:
status=models.TaskStatus.QUEUED,
created_at=datetime.now(timezone.utc),
modified_at=datetime.now(timezone.utc),
remote=p_item.remote,
)

task_db = self.ctx.datastores.task_store.get_task_by_id(str(p_item.id))
Expand Down Expand Up @@ -232,6 +233,7 @@ def push_item_to_queue(self, p_item: models.PrioritizedItem) -> None:
scheduler_id=self.scheduler_id,
)
raise exc
# TODO: SOUF WHEN PULLING FROM TASKSTORE WITH INVALID MODEL EXCEPTION DOES NOT GET CAUGHT

self.logger.debug(
"Pushed item %s to queue %s with priority %s ",
Expand Down
1 change: 1 addition & 0 deletions mula/scheduler/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ def push_queue(self, queue_id: str, item: models.PrioritizedItemRequest) -> Any:
p_item.scheduler_id = s.scheduler_id

p_item.priority = item.priority
p_item.remote = item.data["remote"]

if s.queue.item_type == models.BoefjeTask:
p_item.data = models.BoefjeTask(**item.data).dict()
Expand Down
Loading