Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMP] queue_job: change jobs to paused channel #10

Open
wants to merge 1 commit into
base: 16.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"views/queue_job_views.xml",
"views/queue_job_channel_views.xml",
"views/queue_job_function_views.xml",
"wizards/queue_job_queue_jobs_pause_channel_views.xml",
"wizards/queue_jobs_to_done_views.xml",
"wizards/queue_jobs_to_cancelled_views.xml",
"wizards/queue_requeue_job_views.xml",
Expand Down
4 changes: 4 additions & 0 deletions queue_job/data/queue_data.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,9 @@
<record model="queue.job.channel" id="channel_root">
<field name="name">root</field>
</record>
<record model="queue.job.channel" id="channel_pause">
<field name="name">pause</field>
<field name="parent_id" ref="channel_root" />
</record>
</data>
</odoo>
6 changes: 6 additions & 0 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
DEFAULT_PRIORITY = 10 # used by the PriorityQueue to sort the jobs
DEFAULT_MAX_RETRIES = 5
RETRY_INTERVAL = 10 * 60 # seconds
PAUSE_CHANNEL = "root.pause"

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -624,6 +625,8 @@ def _store_values(self, create=False):
vals["eta"] = self.eta
if self.identity_key:
vals["identity_key"] = self.identity_key
if self.channel:
vals["channel"] = self.channel

dependencies = {
"depends_on": [parent.uuid for parent in self.depends_on],
Expand Down Expand Up @@ -840,6 +843,9 @@ def set_failed(self, **kw):
if v is not None:
setattr(self, k, v)

def change_job_channel(self, to_channel):
self.channel = to_channel

def __repr__(self):
return "<Job %s, priority:%d>" % (self.uuid, self.priority)

Expand Down
30 changes: 26 additions & 4 deletions queue_job/jobrunner/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@
from weakref import WeakValueDictionary

from ..exception import ChannelNotFound
from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES
from ..job import (
CANCELLED,
DONE,
ENQUEUED,
FAILED,
PAUSE_CHANNEL,
PENDING,
STARTED,
WAIT_DEPENDENCIES,
)

NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED)

Expand Down Expand Up @@ -451,7 +460,13 @@ def get_subchannel_by_name(self, subchannel_name):
return self.children.get(subchannel_name)

def __str__(self):
capacity = "∞" if self.capacity is None else str(self.capacity)
if not self.capacity:
if self.name == PAUSE_CHANNEL:
capacity = "0"
else:
capacity = "∞"
else:
capacity = str(self.capacity)
return "%s(C:%s,Q:%d,R:%d,F:%d)" % (
self.fullname,
capacity,
Expand Down Expand Up @@ -517,7 +532,7 @@ def has_capacity(self):
if self.sequential and self._failed:
# a sequential queue blocks on failed jobs
return False
if not self.capacity:
if not self.capacity and self.fullname != PAUSE_CHANNEL:
# unlimited capacity
return True
return len(self._running) < self.capacity
Expand Down Expand Up @@ -873,6 +888,10 @@ def parse_simple_config(cls, config_string):
capacity = config_items[1]
try:
config["capacity"] = int(capacity)
if name == PAUSE_CHANNEL and config["capacity"] != 0:
raise Exception(
"Channel 'pause' must be capacity equal to zero"
)
except Exception as ex:
raise ValueError(
"Invalid channel config %s: "
Expand All @@ -896,7 +915,10 @@ def parse_simple_config(cls, config_string):
)
config[k] = v
else:
config["capacity"] = 1
if name == PAUSE_CHANNEL:
config["capacity"] = 0
else:
config["capacity"] = 1
res.append(config)
return res

Expand Down
29 changes: 29 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
CANCELLED,
DONE,
FAILED,
PAUSE_CHANNEL,
PENDING,
STARTED,
STATES,
Expand Down Expand Up @@ -506,3 +507,31 @@ def _test_job(self, failure_rate=0):
_logger.info("Running test job.")
if random.random() <= failure_rate:
raise JobError("Job failed")

def _change_job_pause_channel(self):
"""Change the state of the `Job` object
Changing the channel of the Job will automatically change some fields
(date, result, ...).
"""
for record in self:
job_ = Job.load(record.env, record.uuid)
to_channel = ""
if record.channel == PAUSE_CHANNEL:
# Get original channel
to_channel = record.job_function_id.channel
record.channel = record.job_function_id.channel
else:
to_channel = PAUSE_CHANNEL
record.channel = to_channel
job_.change_job_channel(to_channel)
job_.store()

def _validate_state_jobs(self):
if any(job.state in ("done", "started") for job in self):
raise exceptions.ValidationError(
_("Some selected jobs are in invalid states to pause.")
)

def set_channel_pause(self):
self._change_job_pause_channel()
return True
1 change: 1 addition & 0 deletions queue_job/security/ir.model.access.csv
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue
access_queue_requeue_job,queue requeue job manager,queue_job.model_queue_requeue_job,queue_job.group_queue_job_manager,1,1,1,1
access_queue_jobs_to_done,queue jobs to done manager,queue_job.model_queue_jobs_to_done,queue_job.group_queue_job_manager,1,1,1,1
access_queue_jobs_to_cancelled,queue jobs to cancelled manager,queue_job.model_queue_jobs_to_cancelled,queue_job.group_queue_job_manager,1,1,1,1
access_queue_channel_pause,access_queue_channel_pause,model_queue_channel_pause,queue_job.group_queue_job_manager,1,1,1,1
1 change: 1 addition & 0 deletions queue_job/wizards/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import queue_requeue_job
from . import queue_jobs_to_done
from . import queue_jobs_to_cancelled
from . import queue_jobs_pause_channel
34 changes: 34 additions & 0 deletions queue_job/wizards/queue_job_queue_jobs_pause_channel_views.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="utf-8" ?>
<odoo>

<record id="view_set_channel_pause_job" model="ir.ui.view">
<field name="name">Pause Jobs</field>
<field name="model">queue.channel.pause</field>
<field name="arch" type="xml">
<form string="Pause/Resume Jobs">
<group string="The selected jobs will be paused/resumed.">
<field name="job_ids" nolabel="1" />
</group>
<footer>
<button
name="set_channel_paused"
string="Pause/Resume"
type="object"
class="oe_highlight"
/>
<button string="Cancel" class="oe_link" special="cancel" />
</footer>
</form>
</field>
</record>

<record id="action_set_channel_pause_job" model="ir.actions.act_window">
<field name="name">Pause/Resume Jobs</field>
<field name="res_model">queue.channel.pause</field>
<field name="view_mode">form</field>
<field name="view_id" ref="view_set_channel_pause_job" />
<field name="target">new</field>
<field name="binding_model_id" ref="queue_job.model_queue_job" />
</record>

</odoo>
22 changes: 22 additions & 0 deletions queue_job/wizards/queue_jobs_pause_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from odoo import fields, models


class QueueChannelPause(models.TransientModel):
_name = "queue.channel.pause"
_description = "Wizard to change jobs to channel paused"

job_ids = fields.Many2many(
comodel_name="queue.job", string="Jobs", default=lambda r: r._default_job_ids()
)

def _default_job_ids(self):
res = False
context = self.env.context
if context.get("active_model") == "queue.job" and context.get("active_ids"):
res = context["active_ids"]
return res

def set_channel_paused(self):
self.job_ids._validate_state_jobs()
self.job_ids.set_channel_pause()
return {"type": "ir.actions.act_window_close"}
Loading