From 2e2ef66c1bafcaf22846f69b8556a7bdf92c541c Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 6 Jan 2025 15:02:09 +0100 Subject: [PATCH 01/13] Issue #700: pin down urllib3 to workaround httpretty incompatibility issue --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 81a51ffd9..99f18e43e 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,7 @@ "mock", "requests-mock>=1.8.0", "httpretty>=1.1.4", + "urllib3<2.3.0", # httpretty doesn't work properly with urllib3>2.3.0. See https://github.com/Open-EO/openeo-python-client/issues/700 "netCDF4>=1.7.0", "matplotlib", # TODO: eliminate matplotlib as test dependency "geopandas", From 84c073c4577b5cf47bb7ab67d81d1ed429454acc Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 6 Jan 2025 15:04:53 +0100 Subject: [PATCH 02/13] fixup! Issue #700: pin down urllib3 to workaround httpretty incompatibility issue --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 99f18e43e..56e3220cd 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ "mock", "requests-mock>=1.8.0", "httpretty>=1.1.4", - "urllib3<2.3.0", # httpretty doesn't work properly with urllib3>2.3.0. See https://github.com/Open-EO/openeo-python-client/issues/700 + "urllib3<2.3.0", # httpretty doesn't work properly with urllib3>=2.3.0. See #700 and https://github.com/gabrielfalcao/HTTPretty/issues/484 "netCDF4>=1.7.0", "matplotlib", # TODO: eliminate matplotlib as test dependency "geopandas", From 036ec7d2563b266c6329f17d2378206eddcfe9b2 Mon Sep 17 00:00:00 2001 From: Elien Vandermaesen Date: Thu, 2 Jan 2025 08:05:48 +0100 Subject: [PATCH 03/13] issue #689 create directory in connection.download if it doesn't exist --- openeo/rest/connection.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 28de15dfd..8b4663a3b 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -83,6 +83,7 @@ ContextTimer, LazyLoadCache, dict_no_none, + ensure_dir, ensure_list, load_json_resource, repr_truncate, @@ -1748,7 +1749,9 @@ def download( ) if outputfile is not None: - with Path(outputfile).open(mode="wb") as f: + target = Path(outputfile) + ensure_dir(target.parent) + with target.open(mode="wb") as f: for chunk in response.iter_content(chunk_size=chunk_size): f.write(chunk) else: From 30c57bf9a65097efe795ac01c81bba3004cb9e37 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 7 Jan 2025 11:48:28 +0100 Subject: [PATCH 04/13] Issue #702 improve assert in `test_persist_multiple_chunks` for easier debugging --- tests/extra/job_management/test_stac_job_db.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/extra/job_management/test_stac_job_db.py b/tests/extra/job_management/test_stac_job_db.py index 69c9d866c..e1968a3f6 100644 --- a/tests/extra/job_management/test_stac_job_db.py +++ b/tests/extra/job_management/test_stac_job_db.py @@ -363,8 +363,7 @@ def handle_row(series): job_db_exists._upload_items_bulk(collection_id=job_db_exists.collection_id, items=items) # 10 items in total, 3 items per chunk, should result in 4 calls - assert mock_requests_post.call_count == 4 - expected_calls = [ + assert [c.kwargs for c in mock_requests_post.call_args_list] == [ { "url": f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items", "auth": None, @@ -386,6 +385,3 @@ def handle_row(series): "json": {"method": "upsert", "items": {item.id: item.to_dict() for item in items[9:]}}, }, ] - - for i, call in enumerate(mock_requests_post.call_args_list): - assert call[1] == expected_calls[i] From fed61cc2b19cae558cdbfdb31dcce3f8aef23f69 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 7 Jan 2025 13:01:38 +0100 Subject: [PATCH 05/13] Issue #702 make `test_persist_multiple_chunks` less flaky --- tests/extra/job_management/test_stac_job_db.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/extra/job_management/test_stac_job_db.py b/tests/extra/job_management/test_stac_job_db.py index e1968a3f6..2833f630c 100644 --- a/tests/extra/job_management/test_stac_job_db.py +++ b/tests/extra/job_management/test_stac_job_db.py @@ -363,7 +363,10 @@ def handle_row(series): job_db_exists._upload_items_bulk(collection_id=job_db_exists.collection_id, items=items) # 10 items in total, 3 items per chunk, should result in 4 calls - assert [c.kwargs for c in mock_requests_post.call_args_list] == [ + assert sorted( + (c.kwargs for c in mock_requests_post.call_args_list), + key=lambda d: sorted(d["json"]["items"].keys()), + ) == [ { "url": f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items", "auth": None, From cf9e4ec4488d10a9ad3c4e985f9a2779e346bf9a Mon Sep 17 00:00:00 2001 From: Elien Vandermaesen Date: Mon, 16 Dec 2024 10:24:50 +0100 Subject: [PATCH 06/13] issue #505 option to disable error logs in batch jobs --- openeo/rest/datacube.py | 5 +++- openeo/rest/job.py | 32 ++++++++++++++++++-------- tests/rest/test_job.py | 51 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 11 deletions(-) diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index db087f854..83a9f1f3d 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -2477,6 +2477,7 @@ def execute_batch( job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, + log_error=True, # TODO: deprecate `format_options` as keyword arguments **format_options, ) -> BatchJob: @@ -2494,6 +2495,7 @@ def execute_batch( :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet. + :param log_error: whether to print error logs .. versionchanged:: 0.32.0 Added ``auto_add_save_result`` option @@ -2529,7 +2531,8 @@ def execute_batch( ) return job.run_synchronous( outputfile=outputfile, - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval, + log_error=log_error ) def create_job( diff --git a/openeo/rest/job.py b/openeo/rest/job.py index e3f307a71..8575fa00c 100644 --- a/openeo/rest/job.py +++ b/openeo/rest/job.py @@ -236,11 +236,21 @@ def logs( def run_synchronous( self, outputfile: Union[str, Path, None] = None, - print=print, max_poll_interval=60, connection_retry_interval=30 + print=print, max_poll_interval=60, connection_retry_interval=30, log_error=True ) -> BatchJob: - """Start the job, wait for it to finish and download result""" + """ + Start the job, wait for it to finish and download result + + :param outputfile: The path of a file to which a result can be written + :param print: print/logging function to show progress/status + :param max_poll_interval: maximum number of seconds to sleep between status polls + :param connection_retry_interval: how long to wait when status poll failed due to connection issue + :param log_error: whether to print error logs + :return: + """ self.start_and_wait( - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval, + log_error=log_error ) # TODO #135 support multi file result sets too? if outputfile is not None: @@ -248,7 +258,8 @@ def run_synchronous( return self def start_and_wait( - self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10 + self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10, + log_error=True ) -> BatchJob: """ Start the batch job, poll its status and wait till it finishes (or fails) @@ -257,6 +268,7 @@ def start_and_wait( :param max_poll_interval: maximum number of seconds to sleep between status polls :param connection_retry_interval: how long to wait when status poll failed due to connection issue :param soft_error_max: maximum number of soft errors (e.g. temporary connection glitches) to allow + :param log_error: whether to print error logs :return: """ # TODO rename `connection_retry_interval` to something more generic? @@ -314,13 +326,13 @@ def soft_error(message: str): poll_interval = min(1.25 * poll_interval, max_poll_interval) if status != "finished": - # TODO: allow to disable this printing logs (e.g. in non-interactive contexts)? # TODO: render logs jupyter-aware in a notebook context? - print(f"Your batch job {self.job_id!r} failed. Error logs:") - print(self.logs(level=logging.ERROR)) - print( - f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`." - ) + if log_error: + print(f"Your batch job {self.job_id!r} failed. Error logs:") + print(self.logs(level=logging.ERROR)) + print( + f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`." + ) raise JobFailedException( f"Batch job {self.job_id!r} didn't finish successfully. Status: {status} (after {elapsed()}).", job=self, diff --git a/tests/rest/test_job.py b/tests/rest/test_job.py index 775f1b38a..9f2a47841 100644 --- a/tests/rest/test_job.py +++ b/tests/rest/test_job.py @@ -150,6 +150,57 @@ def test_execute_batch_with_error(con100, requests_mock, tmpdir): "Full logs can be inspected in an openEO (web) editor or with `connection.job('f00ba5').logs()`.", ] +def test_execute_batch_with_error_with_error_logs_disabled(con100, requests_mock, tmpdir): + requests_mock.get(API_URL + "/file_formats", json={"output": {"GTiff": {"gis_data_types": ["raster"]}}}) + requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"}) + requests_mock.post(API_URL + "/jobs", status_code=201, headers={"OpenEO-Identifier": "f00ba5"}) + requests_mock.post(API_URL + "/jobs/f00ba5/results", status_code=202) + requests_mock.get( + API_URL + "/jobs/f00ba5", + [ + {"json": {"status": "submitted"}}, + {"json": {"status": "queued"}}, + {"json": {"status": "running", "progress": 15}}, + {"json": {"status": "running", "progress": 80}}, + {"json": {"status": "error", "progress": 100}}, + ], + ) + requests_mock.get( + API_URL + "/jobs/f00ba5/logs", + json={ + "logs": [ + {"id": "12", "level": "info", "message": "starting"}, + {"id": "34", "level": "error", "message": "nope"}, + ] + }, + ) + + path = tmpdir.join("tmp.tiff") + log = [] + + try: + with fake_time(): + con100.load_collection("SENTINEL2").execute_batch( + outputfile=path, out_format="GTIFF", + max_poll_interval=.1, print=log.append, log_error=False + ) + pytest.fail("execute_batch should fail") + except JobFailedException as e: + assert e.job.status() == "error" + assert [(l.level, l.message) for l in e.job.logs()] == [ + ("info", "starting"), + ("error", "nope"), + ] + + assert log == [ + "0:00:01 Job 'f00ba5': send 'start'", + "0:00:02 Job 'f00ba5': submitted (progress N/A)", + "0:00:04 Job 'f00ba5': queued (progress N/A)", + "0:00:07 Job 'f00ba5': running (progress 15%)", + "0:00:12 Job 'f00ba5': running (progress 80%)", + "0:00:20 Job 'f00ba5': error (progress 100%)", + ] + @pytest.mark.parametrize(["error_response", "expected"], [ ( From f18dc627910a60589c05bc024aae8dfd7d5e8959 Mon Sep 17 00:00:00 2001 From: Elien Vandermaesen Date: Thu, 19 Dec 2024 07:50:09 +0100 Subject: [PATCH 07/13] issue #687 rename log_error to show_error logs --- openeo/rest/datacube.py | 6 +++--- openeo/rest/job.py | 12 ++++++------ tests/rest/test_job.py | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index 83a9f1f3d..9190b2d5a 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -2477,7 +2477,7 @@ def execute_batch( job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, - log_error=True, + show_error_logs: bool = True, # TODO: deprecate `format_options` as keyword arguments **format_options, ) -> BatchJob: @@ -2495,7 +2495,7 @@ def execute_batch( :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet. - :param log_error: whether to print error logs + :param show_error_logs: whether to automatically print error logs when the batch job failed. .. versionchanged:: 0.32.0 Added ``auto_add_save_result`` option @@ -2532,7 +2532,7 @@ def execute_batch( return job.run_synchronous( outputfile=outputfile, print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval, - log_error=log_error + show_error_logs=show_error_logs ) def create_job( diff --git a/openeo/rest/job.py b/openeo/rest/job.py index 8575fa00c..09e6611a2 100644 --- a/openeo/rest/job.py +++ b/openeo/rest/job.py @@ -236,7 +236,7 @@ def logs( def run_synchronous( self, outputfile: Union[str, Path, None] = None, - print=print, max_poll_interval=60, connection_retry_interval=30, log_error=True + print=print, max_poll_interval=60, connection_retry_interval=30, show_error_logs: bool = True ) -> BatchJob: """ Start the job, wait for it to finish and download result @@ -245,12 +245,12 @@ def run_synchronous( :param print: print/logging function to show progress/status :param max_poll_interval: maximum number of seconds to sleep between status polls :param connection_retry_interval: how long to wait when status poll failed due to connection issue - :param log_error: whether to print error logs + :param show_error_logs: whether to automatically print error logs when the batch job failed. :return: """ self.start_and_wait( print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval, - log_error=log_error + show_error_logs=show_error_logs ) # TODO #135 support multi file result sets too? if outputfile is not None: @@ -259,7 +259,7 @@ def run_synchronous( def start_and_wait( self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10, - log_error=True + show_error_logs: bool = True ) -> BatchJob: """ Start the batch job, poll its status and wait till it finishes (or fails) @@ -268,7 +268,7 @@ def start_and_wait( :param max_poll_interval: maximum number of seconds to sleep between status polls :param connection_retry_interval: how long to wait when status poll failed due to connection issue :param soft_error_max: maximum number of soft errors (e.g. temporary connection glitches) to allow - :param log_error: whether to print error logs + :param show_error_logs: whether to automatically print error logs when the batch job failed. :return: """ # TODO rename `connection_retry_interval` to something more generic? @@ -327,7 +327,7 @@ def soft_error(message: str): if status != "finished": # TODO: render logs jupyter-aware in a notebook context? - if log_error: + if show_error_logs: print(f"Your batch job {self.job_id!r} failed. Error logs:") print(self.logs(level=logging.ERROR)) print( diff --git a/tests/rest/test_job.py b/tests/rest/test_job.py index 9f2a47841..4e6768b89 100644 --- a/tests/rest/test_job.py +++ b/tests/rest/test_job.py @@ -182,7 +182,7 @@ def test_execute_batch_with_error_with_error_logs_disabled(con100, requests_mock with fake_time(): con100.load_collection("SENTINEL2").execute_batch( outputfile=path, out_format="GTIFF", - max_poll_interval=.1, print=log.append, log_error=False + max_poll_interval=.1, print=log.append, show_error_logs=False ) pytest.fail("execute_batch should fail") except JobFailedException as e: From bd9160c07bf781bd426aa747615cd069becf9a75 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 7 Jan 2025 18:26:02 +0100 Subject: [PATCH 08/13] Issue #505 fintune #687 - add changelog entry - doc finetuning - cover mlmodel and vectorcube too - darker code style - make test more to the point --- CHANGELOG.md | 2 ++ docs/batch_jobs.rst | 4 +-- openeo/rest/datacube.py | 9 ++++-- openeo/rest/job.py | 30 ++++++++++++++------ openeo/rest/mlmodel.py | 12 ++++++-- openeo/rest/vectorcube.py | 10 ++++++- tests/rest/test_job.py | 58 +++++++++++++-------------------------- 7 files changed, 71 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b1badc822..905c4b75a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `show_error_logs` argument to `cube.execute_batch()`/`job.start_and_wait()`/... to toggle the automatic printing of error logs on failure ([#505](https://github.com/Open-EO/openeo-python-client/issues/505)) + ### Changed ### Removed diff --git a/docs/batch_jobs.rst b/docs/batch_jobs.rst index 85b9953f2..b2cf1fc47 100644 --- a/docs/batch_jobs.rst +++ b/docs/batch_jobs.rst @@ -292,8 +292,8 @@ When using :py:meth:`job.start_and_wait() ` or :py:meth:`cube.execute_batch() ` to run a batch job and it fails, -the openEO Python client library will automatically -print the batch job logs and instructions to help with further investigation: +the openEO Python client library will print (by default) +the batch job's error logs and instructions to help with further investigation: .. code-block:: pycon diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index 9190b2d5a..ceee3503b 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -2502,6 +2502,9 @@ def execute_batch( .. versionadded:: 0.36.0 Added argument ``additional``. + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. """ # TODO: start showing deprecation warnings about these inconsistent argument names if "format" in format_options and not out_format: @@ -2531,8 +2534,10 @@ def execute_batch( ) return job.run_synchronous( outputfile=outputfile, - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval, - show_error_logs=show_error_logs + print=print, + max_poll_interval=max_poll_interval, + connection_retry_interval=connection_retry_interval, + show_error_logs=show_error_logs, ) def create_job( diff --git a/openeo/rest/job.py b/openeo/rest/job.py index 09e6611a2..63bce4a8a 100644 --- a/openeo/rest/job.py +++ b/openeo/rest/job.py @@ -235,8 +235,12 @@ def logs( return VisualList("logs", data=entries) def run_synchronous( - self, outputfile: Union[str, Path, None] = None, - print=print, max_poll_interval=60, connection_retry_interval=30, show_error_logs: bool = True + self, + outputfile: Union[str, Path, None] = None, + print=print, + max_poll_interval=60, + connection_retry_interval=30, + show_error_logs: bool = True, ) -> BatchJob: """ Start the job, wait for it to finish and download result @@ -246,11 +250,15 @@ def run_synchronous( :param max_poll_interval: maximum number of seconds to sleep between status polls :param connection_retry_interval: how long to wait when status poll failed due to connection issue :param show_error_logs: whether to automatically print error logs when the batch job failed. - :return: + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. """ self.start_and_wait( - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval, - show_error_logs=show_error_logs + print=print, + max_poll_interval=max_poll_interval, + connection_retry_interval=connection_retry_interval, + show_error_logs=show_error_logs, ) # TODO #135 support multi file result sets too? if outputfile is not None: @@ -258,8 +266,12 @@ def run_synchronous( return self def start_and_wait( - self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10, - show_error_logs: bool = True + self, + print=print, + max_poll_interval: int = 60, + connection_retry_interval: int = 30, + soft_error_max=10, + show_error_logs: bool = True, ) -> BatchJob: """ Start the batch job, poll its status and wait till it finishes (or fails) @@ -269,7 +281,9 @@ def start_and_wait( :param connection_retry_interval: how long to wait when status poll failed due to connection issue :param soft_error_max: maximum number of soft errors (e.g. temporary connection glitches) to allow :param show_error_logs: whether to automatically print error logs when the batch job failed. - :return: + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. """ # TODO rename `connection_retry_interval` to something more generic? start_time = time.time() diff --git a/openeo/rest/mlmodel.py b/openeo/rest/mlmodel.py index 1220a7701..0db516694 100644 --- a/openeo/rest/mlmodel.py +++ b/openeo/rest/mlmodel.py @@ -71,12 +71,13 @@ def execute_batch( connection_retry_interval=30, additional: Optional[dict] = None, job_options: Optional[dict] = None, + show_error_logs: bool = True, ) -> BatchJob: """ Evaluate the process graph by creating a batch job, and retrieving the results when it is finished. This method is mostly recommended if the batch job is expected to run in a reasonable amount of time. - For very long running jobs, you probably do not want to keep the client running. + For very long-running jobs, you probably do not want to keep the client running. :param job_options: :param outputfile: The path of a file to which a result can be written @@ -85,9 +86,13 @@ def execute_batch( :param additional: additional (top-level) properties to set in the request body :param job_options: dictionary of job options to pass to the backend (under top-level property "job_options") + :param show_error_logs: whether to automatically print error logs when the batch job failed. .. versionadded:: 0.36.0 Added argument ``additional``. + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. """ job = self.create_job( title=title, @@ -100,7 +105,10 @@ def execute_batch( return job.run_synchronous( # TODO #135 support multi file result sets too outputfile=outputfile, - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, + max_poll_interval=max_poll_interval, + connection_retry_interval=connection_retry_interval, + show_error_logs=show_error_logs, ) def create_job( diff --git a/openeo/rest/vectorcube.py b/openeo/rest/vectorcube.py index 51c2bf69e..de6aeb60c 100644 --- a/openeo/rest/vectorcube.py +++ b/openeo/rest/vectorcube.py @@ -259,6 +259,7 @@ def execute_batch( job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, + show_error_logs: bool = True, # TODO: avoid using kwargs as format options **format_options, ) -> BatchJob: @@ -277,6 +278,7 @@ def execute_batch( :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet. + :param show_error_logs: whether to automatically print error logs when the batch job failed. .. versionchanged:: 0.21.0 When not specified explicitly, output format is guessed from output file extension. @@ -286,6 +288,9 @@ def execute_batch( .. versionadded:: 0.36.0 Added argument ``additional``. + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. """ cube = self if auto_add_save_result: @@ -310,7 +315,10 @@ def execute_batch( return job.run_synchronous( # TODO #135 support multi file result sets too outputfile=outputfile, - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, + max_poll_interval=max_poll_interval, + connection_retry_interval=connection_retry_interval, + show_error_logs=show_error_logs, ) def create_job( diff --git a/tests/rest/test_job.py b/tests/rest/test_job.py index 4e6768b89..4990d9180 100644 --- a/tests/rest/test_job.py +++ b/tests/rest/test_job.py @@ -150,56 +150,36 @@ def test_execute_batch_with_error(con100, requests_mock, tmpdir): "Full logs can be inspected in an openEO (web) editor or with `connection.job('f00ba5').logs()`.", ] -def test_execute_batch_with_error_with_error_logs_disabled(con100, requests_mock, tmpdir): + +@pytest.mark.parametrize("show_error_logs", [True, False]) +def test_execute_batch_show_error_logs(con100, requests_mock, show_error_logs): requests_mock.get(API_URL + "/file_formats", json={"output": {"GTiff": {"gis_data_types": ["raster"]}}}) requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"}) requests_mock.post(API_URL + "/jobs", status_code=201, headers={"OpenEO-Identifier": "f00ba5"}) requests_mock.post(API_URL + "/jobs/f00ba5/results", status_code=202) - requests_mock.get( - API_URL + "/jobs/f00ba5", - [ - {"json": {"status": "submitted"}}, - {"json": {"status": "queued"}}, - {"json": {"status": "running", "progress": 15}}, - {"json": {"status": "running", "progress": 80}}, - {"json": {"status": "error", "progress": 100}}, - ], - ) + requests_mock.get(API_URL + "/jobs/f00ba5", json={"status": "error", "progress": 100}) requests_mock.get( API_URL + "/jobs/f00ba5/logs", - json={ - "logs": [ - {"id": "12", "level": "info", "message": "starting"}, - {"id": "34", "level": "error", "message": "nope"}, - ] - }, + json={"logs": [{"id": "34", "level": "error", "message": "nope"}]}, ) - path = tmpdir.join("tmp.tiff") - log = [] - - try: - with fake_time(): - con100.load_collection("SENTINEL2").execute_batch( - outputfile=path, out_format="GTIFF", - max_poll_interval=.1, print=log.append, show_error_logs=False - ) - pytest.fail("execute_batch should fail") - except JobFailedException as e: - assert e.job.status() == "error" - assert [(l.level, l.message) for l in e.job.logs()] == [ - ("info", "starting"), - ("error", "nope"), - ] + stdout = [] + with fake_time(), pytest.raises(JobFailedException): + con100.load_collection("SENTINEL2").execute_batch( + max_poll_interval=0.1, print=stdout.append, show_error_logs=show_error_logs + ) - assert log == [ + expected = [ "0:00:01 Job 'f00ba5': send 'start'", - "0:00:02 Job 'f00ba5': submitted (progress N/A)", - "0:00:04 Job 'f00ba5': queued (progress N/A)", - "0:00:07 Job 'f00ba5': running (progress 15%)", - "0:00:12 Job 'f00ba5': running (progress 80%)", - "0:00:20 Job 'f00ba5': error (progress 100%)", + "0:00:02 Job 'f00ba5': error (progress 100%)", ] + if show_error_logs: + expected += [ + "Your batch job 'f00ba5' failed. Error logs:", + [{"id": "34", "level": "error", "message": "nope"}], + "Full logs can be inspected in an openEO (web) editor or with `connection.job('f00ba5').logs()`.", + ] + assert stdout == expected @pytest.mark.parametrize(["error_response", "expected"], [ From 43424d060c2ca6c4d6aeca64b7aafbff4cf91ae3 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Fri, 29 Nov 2024 16:13:27 +0100 Subject: [PATCH 09/13] hardening the cancelation functionality --- openeo/extra/job_management/__init__.py | 29 ++++++++++++++++--------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index 8f7f4b7df..8ae3e8cca 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -658,16 +658,25 @@ def on_job_cancel(self, job: BatchJob, row): def _cancel_prolonged_job(self, job: BatchJob, row): """Cancel the job if it has been running for too long.""" - job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) - elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time - if elapsed > self._cancel_running_job_after: - try: - _log.info( - f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" - ) - job.stop() - except OpenEoApiError as e: - _log.error(f"Failed to cancel long-running job {job.job_id}: {e}") + try: + running_start_time_str = row.get("running_start_time") + if not running_start_time_str or pd.isna(running_start_time_str): + _log.warning(f"Job {job.job_id} does not have a valid running start time. Cancellation skipped.") + return + + job_running_start_time = rfc3339.parse_datetime(running_start_time_str, with_timezone=True) + elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time + + if elapsed > self._cancel_running_job_after: + try: + _log.info( + f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" + ) + job.stop() + except OpenEoApiError as e: + _log.error(f"Failed to cancel long-running job {job.job_id}: {e}") + except Exception as e: + _log.error(f"Unexpected error while handling job {job.job_id}: {e}") def get_job_dir(self, job_id: str) -> Path: """Path to directory where job metadata, results and error logs are be saved.""" From 9300f83a99fc7c9ead4114de6a4b48b67d67a0f9 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Tue, 3 Dec 2024 15:01:59 +0100 Subject: [PATCH 10/13] influence the job_database_directly --- openeo/extra/job_management/__init__.py | 46 ++++++++--- .../job_management/test_job_management.py | 79 +++++++++++++++++++ 2 files changed, 116 insertions(+), 9 deletions(-) diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index 8ae3e8cca..fed17b222 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -656,21 +656,25 @@ def on_job_cancel(self, job: BatchJob, row): """ pass - def _cancel_prolonged_job(self, job: BatchJob, row): + def _cancel_prolonged_job(self, job: BatchJob, row, df): """Cancel the job if it has been running for too long.""" try: - running_start_time_str = row.get("running_start_time") - if not running_start_time_str or pd.isna(running_start_time_str): - _log.warning(f"Job {job.job_id} does not have a valid running start time. Cancellation skipped.") - return + # Ensure running start time is valid + running_start_time = self._ensure_running_start_time(job, row, df) - job_running_start_time = rfc3339.parse_datetime(running_start_time_str, with_timezone=True) - elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time + # Get the current time in RFC 3339 format (timezone-aware) + current_time_rfc3339 = rfc3339.utcnow() + + # Parse the current time into a datetime object with timezone info + current_time = rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True) + + # Calculate the elapsed time between job start and now + elapsed = current_time - running_start_time if elapsed > self._cancel_running_job_after: try: _log.info( - f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" + f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {running_start_time})" ) job.stop() except OpenEoApiError as e: @@ -678,6 +682,30 @@ def _cancel_prolonged_job(self, job: BatchJob, row): except Exception as e: _log.error(f"Unexpected error while handling job {job.job_id}: {e}") + def _ensure_running_start_time(self, job: BatchJob, row, df) -> datetime.datetime: + """ + Ensures the running start time is valid. If missing, approximates with the current time. + Returns the parsed running start time as a datetime object. + """ + running_start_time_str = row.get("running_start_time") + + if not running_start_time_str or pd.isna(running_start_time_str): + _log.warning( + f"Job {job.job_id} does not have a valid running start time. Setting the current time as an approximation." + ) + # Generate the current time in RFC 3339 format + current_time_rfc3339 = rfc3339.utcnow() + + # Update the DataFrame safely using `.loc` + df.loc[df.index[row.name], "running_start_time"] = current_time_rfc3339 + + # Parse and return the datetime object with UTC timezone + return rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True) + + # Parse the existing time string and return it + return rfc3339.parse_datetime(running_start_time_str, with_timezone=True) + + def get_job_dir(self, job_id: str) -> Path: """Path to directory where job metadata, results and error logs are be saved.""" return self._root_dir / f"job_{job_id}" @@ -737,7 +765,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = self.on_job_cancel(the_job, active.loc[i]) if self._cancel_running_job_after and new_status == "running": - self._cancel_prolonged_job(the_job, active.loc[i]) + self._cancel_prolonged_job(the_job, active.loc[i], active) active.loc[i, "status"] = new_status diff --git a/tests/extra/job_management/test_job_management.py b/tests/extra/job_management/test_job_management.py index 8a9d2f694..002830d54 100644 --- a/tests/extra/job_management/test_job_management.py +++ b/tests/extra/job_management/test_job_management.py @@ -7,6 +7,7 @@ from time import sleep from typing import Callable, Union from unittest import mock +import datetime import dirty_equals import geopandas @@ -554,6 +555,7 @@ def start_job(row, connection_provider, connection, **kwargs): 12 * 60 * 60, "finished", ), + ], ) def test_automatic_cancel_of_too_long_running_jobs( @@ -645,6 +647,83 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep assert needle.search(caplog.text) + @pytest.mark.parametrize( + ["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds", "expected_status"], + [ + # Scenario 1: Missing running_start_time (None) + ( + "2024-09-01T09:00:00Z", # Job creation time + "2024-09-01T09:00:00Z", # Job start time (should be 1 hour after create_time) + None, # Missing running_start_time + "2024-09-01T20:00:00Z", # Job end time + "finished", # Job final status + 6 * 60 * 60, # Cancel after 6 hours + "finished", # Expected final status + ), + # Scenario 2: NaN running_start_time + ( + "2024-09-01T09:00:00Z", + "2024-09-01T09:00:00Z", + float("nan"), # NaN running_start_time + "2024-09-01T20:00:00Z", # Job end time + "finished", # Job final status + 6 * 60 * 60, # Cancel after 6 hours + "finished", # Expected final status + ), + ] + ) + def test_ensure_running_start_time_is_datetime( + self, + tmp_path, + time_machine, + create_time, + start_time, + running_start_time, + end_time, + end_status, + cancel_after_seconds, + expected_status, + dummy_backend_foo, + job_manager_root_dir, + ): + def get_status(job_id, current_status): + if rfc3339.utcnow() < start_time: + return "queued" + elif rfc3339.utcnow() < end_time: + return "running" + return end_status + + # Set the job status updater function for the mock backend + dummy_backend_foo.job_status_updater = get_status + + job_manager = MultiBackendJobManager( + root_dir=job_manager_root_dir, cancel_running_job_after=cancel_after_seconds + ) + job_manager.add_backend("foo", connection=dummy_backend_foo.connection) + + # Create a DataFrame representing the job database + df = pd.DataFrame({ + "year": [2024], + "running_start_time": [running_start_time], # Initial running_start_time + }) + + # Move the time machine to the job creation time + time_machine.move_to(create_time) + + job_db_path = tmp_path / "jobs.csv" + + # Mock sleep() to skip one hour at a time instead of actually sleeping + with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)): + job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path) + + final_df = CsvJobDatabase(job_db_path).read() + + # Validate running_start_time is a valid datetime object + filled_running_start_time = final_df.iloc[0]["running_start_time"] + assert isinstance(rfc3339.parse_datetime(filled_running_start_time), datetime.datetime) + + + JOB_DB_DF_BASICS = pd.DataFrame( { "numbers": [3, 2, 1], From 4f4437e6420330a7b0549e14d2393c3fe86a69a8 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Tue, 3 Dec 2024 15:05:43 +0100 Subject: [PATCH 11/13] clean up --- tests/extra/job_management/test_job_management.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/extra/job_management/test_job_management.py b/tests/extra/job_management/test_job_management.py index 002830d54..d88a589a3 100644 --- a/tests/extra/job_management/test_job_management.py +++ b/tests/extra/job_management/test_job_management.py @@ -648,7 +648,7 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep @pytest.mark.parametrize( - ["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds", "expected_status"], + ["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds"], [ # Scenario 1: Missing running_start_time (None) ( @@ -658,7 +658,6 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep "2024-09-01T20:00:00Z", # Job end time "finished", # Job final status 6 * 60 * 60, # Cancel after 6 hours - "finished", # Expected final status ), # Scenario 2: NaN running_start_time ( @@ -668,7 +667,6 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep "2024-09-01T20:00:00Z", # Job end time "finished", # Job final status 6 * 60 * 60, # Cancel after 6 hours - "finished", # Expected final status ), ] ) @@ -682,7 +680,6 @@ def test_ensure_running_start_time_is_datetime( end_time, end_status, cancel_after_seconds, - expected_status, dummy_backend_foo, job_manager_root_dir, ): From f889eefe3507bf34baef71ba26a8c0a8140cb34b Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Thu, 5 Dec 2024 11:44:50 +0100 Subject: [PATCH 12/13] simplify --- openeo/extra/job_management/__init__.py | 48 ++++++++----------------- 1 file changed, 14 insertions(+), 34 deletions(-) diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index fed17b222..26cd66f27 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -656,25 +656,22 @@ def on_job_cancel(self, job: BatchJob, row): """ pass - def _cancel_prolonged_job(self, job: BatchJob, row, df): + def _cancel_prolonged_job(self, job: BatchJob, row): """Cancel the job if it has been running for too long.""" try: # Ensure running start time is valid - running_start_time = self._ensure_running_start_time(job, row, df) - - # Get the current time in RFC 3339 format (timezone-aware) - current_time_rfc3339 = rfc3339.utcnow() - + job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True) + # Parse the current time into a datetime object with timezone info - current_time = rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True) + current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True) # Calculate the elapsed time between job start and now - elapsed = current_time - running_start_time + elapsed = current_time - job_running_start_time if elapsed > self._cancel_running_job_after: try: _log.info( - f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {running_start_time})" + f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" ) job.stop() except OpenEoApiError as e: @@ -682,30 +679,6 @@ def _cancel_prolonged_job(self, job: BatchJob, row, df): except Exception as e: _log.error(f"Unexpected error while handling job {job.job_id}: {e}") - def _ensure_running_start_time(self, job: BatchJob, row, df) -> datetime.datetime: - """ - Ensures the running start time is valid. If missing, approximates with the current time. - Returns the parsed running start time as a datetime object. - """ - running_start_time_str = row.get("running_start_time") - - if not running_start_time_str or pd.isna(running_start_time_str): - _log.warning( - f"Job {job.job_id} does not have a valid running start time. Setting the current time as an approximation." - ) - # Generate the current time in RFC 3339 format - current_time_rfc3339 = rfc3339.utcnow() - - # Update the DataFrame safely using `.loc` - df.loc[df.index[row.name], "running_start_time"] = current_time_rfc3339 - - # Parse and return the datetime object with UTC timezone - return rfc3339.parse_datetime(current_time_rfc3339, with_timezone=True) - - # Parse the existing time string and return it - return rfc3339.parse_datetime(running_start_time_str, with_timezone=True) - - def get_job_dir(self, job_id: str) -> Path: """Path to directory where job metadata, results and error logs are be saved.""" return self._root_dir / f"job_{job_id}" @@ -765,7 +738,14 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = self.on_job_cancel(the_job, active.loc[i]) if self._cancel_running_job_after and new_status == "running": - self._cancel_prolonged_job(the_job, active.loc[i], active) + if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])): + _log.warning( + f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation." + ) + stats["job started running"] += 1 + active.loc[i, "running_start_time"] = rfc3339.utcnow() + + self._cancel_prolonged_job(the_job, active.loc[i]) active.loc[i, "status"] = new_status From 75c3797a381fd9c5fce5c553a84a432942a04f6f Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Wed, 8 Jan 2025 09:32:46 +0100 Subject: [PATCH 13/13] remove try, except --- openeo/extra/job_management/__init__.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index 26cd66f27..41b8fdfd3 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -669,13 +669,12 @@ def _cancel_prolonged_job(self, job: BatchJob, row): elapsed = current_time - job_running_start_time if elapsed > self._cancel_running_job_after: - try: - _log.info( - f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" - ) - job.stop() - except OpenEoApiError as e: - _log.error(f"Failed to cancel long-running job {job.job_id}: {e}") + + _log.info( + f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" + ) + job.stop() + except Exception as e: _log.error(f"Unexpected error while handling job {job.job_id}: {e}")