Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remember already submitted htcondor jobs to avoid re-submitting #167

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added
* Remember already submitted htcondor jobs ID's and don't re-submit them as long as they are running.

### Added
* [#166](https://github.com/nils-braun/b2luigi/pull/166): add automatic need-changelog PR labeller as github workflow

Expand Down
36 changes: 36 additions & 0 deletions b2luigi/batch/processes/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import enum

from retry import retry
from xdg import xdg_cache_home
import dataset

from b2luigi.core.settings import get_setting
from b2luigi.batch.processes import BatchProcess, JobStatus
Expand Down Expand Up @@ -151,6 +153,11 @@ def __init__(self, *args, **kwargs):

self._batch_job_id = None

# define path to local cache database that maps b2luigi task ID's to HTCODO
job_db_path = xdg_cache_home() / "b2luigi/batch_job_cache.db"
job_db_path.parent.mkdir(parents=True, exist_ok=True) # create directory if it does not exist
self._job_cache_uri = f"sqlite:///{job_db_path}"

def get_job_status(self):
if not self._batch_job_id:
return JobStatus.aborted
Expand All @@ -170,6 +177,22 @@ def get_job_status(self):
raise ValueError(f"Unknown HTCondor Job status: {job_status}")

def start_job(self):
# Check if job with this with task ID is already running on the batch.
# For that, we have to first check if there is a job ID stored in the cache.
with dataset.connect(self._job_cache_uri) as job_db:
existing_job = job_db["htcondor_jobs"].find_one(task_id=self.task.task_id)

if existing_job and "htcondor_id" in existing_job:
self._batch_job_id = existing_job["htcondor_id"]

# If the existing job is running, abort submission and continue monitoring existing job
if self.get_job_status() == JobStatus.running:
return

# else if htcondor job is not successful, remove htcondor job id from local cache
with dataset.connect(self._job_cache_uri) as job_db:
job_db["htcondor_jobs"].delete(task_id=self.task.task_id)

submit_file = self._create_htcondor_submit_file()

# HTCondor submit needs to be called in the folder of the submit file
Expand All @@ -183,12 +206,25 @@ def start_job(self):

self._batch_job_id = int(match.group(0)[:-1])

# store htcondor job ids for newly submitted tasks in database
with dataset.connect(self._job_cache_uri) as job_db:
job_db["htcondor_jobs"].insert(
dict(
task_id=self.task.task_id,
htcondor_id=self._batch_job_id
)
)

def kill_job(self):
if not self._batch_job_id:
return

subprocess.run(["condor_rm", str(self._batch_job_id)], stdout=subprocess.DEVNULL)

# remove task from htcondor job id cache
with dataset.connect(self._job_cache_uri) as job_db:
job_db["htcondor_jobs"].delete(task_id=self.task.task_id)

def _create_htcondor_submit_file(self):
submit_file_content = []

Expand Down
12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,14 @@ author-email = "[email protected]"
home-page = "https://github.com/nils-braun/b2luigi"
classifiers = ["License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)"]

requires=['luigi>=3.0.2', 'parse>=1.8.4', 'GitPython>=2.1.11', "colorama>=0.3.9", "cachetools>=2.1.0", "jinja2", "retry2>=0.9.3"]
requires=[
"GitPython>=2.1.11",
"cachetools>=2.1.0",
"colorama>=0.3.9",
"dataset>=1.5.2",
"jinja2",
"luigi>=3.0.2",
"parse>=1.8.4",
"retry2>=0.9.3",
"xdg>=5.1.1",
]