From caba809a46955e8d416d620a6049ae34b6d5cec3 Mon Sep 17 00:00:00 2001 From: Michael Eliachevitch Date: Tue, 5 Apr 2022 21:05:36 +0200 Subject: [PATCH 1/2] Remember already submitted htcondor jobs to avoid re-submitting --- CHANGELOG.md | 3 +++ b2luigi/batch/processes/htcondor.py | 36 +++++++++++++++++++++++++++++ pyproject.toml | 12 +++++++++- 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 14e29870..545358c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added +* Remember already submitted htcondor jobs ID's and don't re-submit them as long as they are running. + ### Added * [#166](https://github.com/nils-braun/b2luigi/pull/166): add automatic need-changelog PR labeller as github workflow diff --git a/b2luigi/batch/processes/htcondor.py b/b2luigi/batch/processes/htcondor.py index 7ee2a2f7..7b8c8266 100644 --- a/b2luigi/batch/processes/htcondor.py +++ b/b2luigi/batch/processes/htcondor.py @@ -5,6 +5,8 @@ import enum from retry import retry +from xdg import xdg_cache_home +import dataset from b2luigi.core.settings import get_setting from b2luigi.batch.processes import BatchProcess, JobStatus @@ -151,6 +153,11 @@ def __init__(self, *args, **kwargs): self._batch_job_id = None + # define path to local cache database that maps b2luigi task ID's to HTCODO + job_db_path = xdg_cache_home() / "b2luigi/batch_job_cache.db" + job_db_path.parent.mkdir(parents=True, exist_ok=True) # create directory if it does not exist + self._job_cache_uri = f"sqlite:///{job_db_path}" + def get_job_status(self): if not self._batch_job_id: return JobStatus.aborted @@ -170,6 +177,22 @@ def get_job_status(self): raise ValueError(f"Unknown HTCondor Job status: {job_status}") def start_job(self): + # Check if job with this with task ID is already running on the batch. + # For that, we have to first check if there is a job ID stored in the cache. + with dataset.connect(self._job_cache_uri) as job_db: + existing_job = job_db["htcondor_jobs"].find_one(task_id=self.task.task_id) + + if existing_job and "htcondor_id" in existing_job: + self._batch_job_id = existing_job["htcondor_id"] + + # If the existing job is running, abort submission and continue monitoring existing job + if self.get_job_status() == JobStatus.running: + return + + # if htcondor job is not successful, remove htcondor job id from local cache + with dataset.connect(self._job_cache_uri) as job_db: + job_db["htcondor_jobs"].delete(task_id=self.task.task_id) + submit_file = self._create_htcondor_submit_file() # HTCondor submit needs to be called in the folder of the submit file @@ -183,12 +206,25 @@ def start_job(self): self._batch_job_id = int(match.group(0)[:-1]) + # store htcondor job ids for newly submitted tasks in database + with dataset.connect(self._job_cache_uri) as job_db: + job_db["htcondor_jobs"].insert( + dict( + task_id=self.task.task_id, + htcondor_id=self._batch_job_id + ) + ) + def kill_job(self): if not self._batch_job_id: return subprocess.run(["condor_rm", str(self._batch_job_id)], stdout=subprocess.DEVNULL) + # remove task from htcondor job id cache + with dataset.connect(self._job_cache_uri) as job_db: + job_db["htcondor_jobs"].delete(task_id=self.task.task_id) + def _create_htcondor_submit_file(self): submit_file_content = [] diff --git a/pyproject.toml b/pyproject.toml index cdfd5acb..f824a88e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,4 +9,14 @@ author-email = "nils.braun@kit.edu" home-page = "https://github.com/nils-braun/b2luigi" classifiers = ["License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)"] -requires=['luigi>=3.0.2', 'parse>=1.8.4', 'GitPython>=2.1.11', "colorama>=0.3.9", "cachetools>=2.1.0", "jinja2", "retry2>=0.9.3"] +requires=[ + "GitPython>=2.1.11", + "cachetools>=2.1.0", + "colorama>=0.3.9", + "dataset>=1.5.2", + "jinja2", + "luigi>=3.0.2", + "parse>=1.8.4", + "retry2>=0.9.3", + "xdg>=5.1.1", +] From 4c0178a6ccc601b7be3defcb7abff11c56cf684e Mon Sep 17 00:00:00 2001 From: Michael Eliachevitch Date: Tue, 5 Apr 2022 22:47:26 +0200 Subject: [PATCH 2/2] Exit database transaction directly after getting job entry --- b2luigi/batch/processes/htcondor.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/b2luigi/batch/processes/htcondor.py b/b2luigi/batch/processes/htcondor.py index 7b8c8266..e652f9ec 100644 --- a/b2luigi/batch/processes/htcondor.py +++ b/b2luigi/batch/processes/htcondor.py @@ -182,16 +182,16 @@ def start_job(self): with dataset.connect(self._job_cache_uri) as job_db: existing_job = job_db["htcondor_jobs"].find_one(task_id=self.task.task_id) - if existing_job and "htcondor_id" in existing_job: - self._batch_job_id = existing_job["htcondor_id"] + if existing_job and "htcondor_id" in existing_job: + self._batch_job_id = existing_job["htcondor_id"] - # If the existing job is running, abort submission and continue monitoring existing job - if self.get_job_status() == JobStatus.running: - return + # If the existing job is running, abort submission and continue monitoring existing job + if self.get_job_status() == JobStatus.running: + return - # if htcondor job is not successful, remove htcondor job id from local cache - with dataset.connect(self._job_cache_uri) as job_db: - job_db["htcondor_jobs"].delete(task_id=self.task.task_id) + # else if htcondor job is not successful, remove htcondor job id from local cache + with dataset.connect(self._job_cache_uri) as job_db: + job_db["htcondor_jobs"].delete(task_id=self.task.task_id) submit_file = self._create_htcondor_submit_file()