diff --git a/CHANGELOG.md b/CHANGELOG.md index b1badc822..905c4b75a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `show_error_logs` argument to `cube.execute_batch()`/`job.start_and_wait()`/... to toggle the automatic printing of error logs on failure ([#505](https://github.com/Open-EO/openeo-python-client/issues/505)) + ### Changed ### Removed diff --git a/docs/batch_jobs.rst b/docs/batch_jobs.rst index 85b9953f2..b2cf1fc47 100644 --- a/docs/batch_jobs.rst +++ b/docs/batch_jobs.rst @@ -292,8 +292,8 @@ When using :py:meth:`job.start_and_wait() ` or :py:meth:`cube.execute_batch() ` to run a batch job and it fails, -the openEO Python client library will automatically -print the batch job logs and instructions to help with further investigation: +the openEO Python client library will print (by default) +the batch job's error logs and instructions to help with further investigation: .. code-block:: pycon diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index 8f7f4b7df..41b8fdfd3 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -658,16 +658,25 @@ def on_job_cancel(self, job: BatchJob, row): def _cancel_prolonged_job(self, job: BatchJob, row): """Cancel the job if it has been running for too long.""" - job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) - elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time - if elapsed > self._cancel_running_job_after: - try: + try: + # Ensure running start time is valid + job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True) + + # Parse the current time into a datetime object with timezone info + current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True) + + # Calculate the elapsed time between job start and now + elapsed = current_time - job_running_start_time + + if elapsed > self._cancel_running_job_after: + _log.info( f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" ) job.stop() - except OpenEoApiError as e: - _log.error(f"Failed to cancel long-running job {job.job_id}: {e}") + + except Exception as e: + _log.error(f"Unexpected error while handling job {job.job_id}: {e}") def get_job_dir(self, job_id: str) -> Path: """Path to directory where job metadata, results and error logs are be saved.""" @@ -728,6 +737,13 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = self.on_job_cancel(the_job, active.loc[i]) if self._cancel_running_job_after and new_status == "running": + if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])): + _log.warning( + f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation." + ) + stats["job started running"] += 1 + active.loc[i, "running_start_time"] = rfc3339.utcnow() + self._cancel_prolonged_job(the_job, active.loc[i]) active.loc[i, "status"] = new_status diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 4b481e544..849da2e82 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -84,6 +84,7 @@ ContextTimer, LazyLoadCache, dict_no_none, + ensure_dir, ensure_list, load_json_resource, repr_truncate, @@ -1771,7 +1772,9 @@ def download( ) if outputfile is not None: - with Path(outputfile).open(mode="wb") as f: + target = Path(outputfile) + ensure_dir(target.parent) + with target.open(mode="wb") as f: for chunk in response.iter_content(chunk_size=chunk_size): f.write(chunk) else: diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index b5ca9c6d1..ebc4e93f7 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -2484,6 +2484,7 @@ def execute_batch( job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, + show_error_logs: bool = True, # TODO: deprecate `format_options` as keyword arguments **format_options, ) -> BatchJob: @@ -2501,12 +2502,16 @@ def execute_batch( :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet. + :param show_error_logs: whether to automatically print error logs when the batch job failed. .. versionchanged:: 0.32.0 Added ``auto_add_save_result`` option .. versionadded:: 0.36.0 Added argument ``additional``. + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. """ # TODO: start showing deprecation warnings about these inconsistent argument names if "format" in format_options and not out_format: @@ -2536,7 +2541,10 @@ def execute_batch( ) return job.run_synchronous( outputfile=outputfile, - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, + max_poll_interval=max_poll_interval, + connection_retry_interval=connection_retry_interval, + show_error_logs=show_error_logs, ) def create_job( diff --git a/openeo/rest/job.py b/openeo/rest/job.py index e3f307a71..63bce4a8a 100644 --- a/openeo/rest/job.py +++ b/openeo/rest/job.py @@ -235,12 +235,30 @@ def logs( return VisualList("logs", data=entries) def run_synchronous( - self, outputfile: Union[str, Path, None] = None, - print=print, max_poll_interval=60, connection_retry_interval=30 + self, + outputfile: Union[str, Path, None] = None, + print=print, + max_poll_interval=60, + connection_retry_interval=30, + show_error_logs: bool = True, ) -> BatchJob: - """Start the job, wait for it to finish and download result""" + """ + Start the job, wait for it to finish and download result + + :param outputfile: The path of a file to which a result can be written + :param print: print/logging function to show progress/status + :param max_poll_interval: maximum number of seconds to sleep between status polls + :param connection_retry_interval: how long to wait when status poll failed due to connection issue + :param show_error_logs: whether to automatically print error logs when the batch job failed. + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. + """ self.start_and_wait( - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, + max_poll_interval=max_poll_interval, + connection_retry_interval=connection_retry_interval, + show_error_logs=show_error_logs, ) # TODO #135 support multi file result sets too? if outputfile is not None: @@ -248,7 +266,12 @@ def run_synchronous( return self def start_and_wait( - self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10 + self, + print=print, + max_poll_interval: int = 60, + connection_retry_interval: int = 30, + soft_error_max=10, + show_error_logs: bool = True, ) -> BatchJob: """ Start the batch job, poll its status and wait till it finishes (or fails) @@ -257,7 +280,10 @@ def start_and_wait( :param max_poll_interval: maximum number of seconds to sleep between status polls :param connection_retry_interval: how long to wait when status poll failed due to connection issue :param soft_error_max: maximum number of soft errors (e.g. temporary connection glitches) to allow - :return: + :param show_error_logs: whether to automatically print error logs when the batch job failed. + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. """ # TODO rename `connection_retry_interval` to something more generic? start_time = time.time() @@ -314,13 +340,13 @@ def soft_error(message: str): poll_interval = min(1.25 * poll_interval, max_poll_interval) if status != "finished": - # TODO: allow to disable this printing logs (e.g. in non-interactive contexts)? # TODO: render logs jupyter-aware in a notebook context? - print(f"Your batch job {self.job_id!r} failed. Error logs:") - print(self.logs(level=logging.ERROR)) - print( - f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`." - ) + if show_error_logs: + print(f"Your batch job {self.job_id!r} failed. Error logs:") + print(self.logs(level=logging.ERROR)) + print( + f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`." + ) raise JobFailedException( f"Batch job {self.job_id!r} didn't finish successfully. Status: {status} (after {elapsed()}).", job=self, diff --git a/openeo/rest/mlmodel.py b/openeo/rest/mlmodel.py index 1220a7701..0db516694 100644 --- a/openeo/rest/mlmodel.py +++ b/openeo/rest/mlmodel.py @@ -71,12 +71,13 @@ def execute_batch( connection_retry_interval=30, additional: Optional[dict] = None, job_options: Optional[dict] = None, + show_error_logs: bool = True, ) -> BatchJob: """ Evaluate the process graph by creating a batch job, and retrieving the results when it is finished. This method is mostly recommended if the batch job is expected to run in a reasonable amount of time. - For very long running jobs, you probably do not want to keep the client running. + For very long-running jobs, you probably do not want to keep the client running. :param job_options: :param outputfile: The path of a file to which a result can be written @@ -85,9 +86,13 @@ def execute_batch( :param additional: additional (top-level) properties to set in the request body :param job_options: dictionary of job options to pass to the backend (under top-level property "job_options") + :param show_error_logs: whether to automatically print error logs when the batch job failed. .. versionadded:: 0.36.0 Added argument ``additional``. + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. """ job = self.create_job( title=title, @@ -100,7 +105,10 @@ def execute_batch( return job.run_synchronous( # TODO #135 support multi file result sets too outputfile=outputfile, - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, + max_poll_interval=max_poll_interval, + connection_retry_interval=connection_retry_interval, + show_error_logs=show_error_logs, ) def create_job( diff --git a/openeo/rest/vectorcube.py b/openeo/rest/vectorcube.py index 51c2bf69e..de6aeb60c 100644 --- a/openeo/rest/vectorcube.py +++ b/openeo/rest/vectorcube.py @@ -259,6 +259,7 @@ def execute_batch( job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, + show_error_logs: bool = True, # TODO: avoid using kwargs as format options **format_options, ) -> BatchJob: @@ -277,6 +278,7 @@ def execute_batch( :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet. + :param show_error_logs: whether to automatically print error logs when the batch job failed. .. versionchanged:: 0.21.0 When not specified explicitly, output format is guessed from output file extension. @@ -286,6 +288,9 @@ def execute_batch( .. versionadded:: 0.36.0 Added argument ``additional``. + + .. versionchanged:: 0.37.0 + Added argument ``show_error_logs``. """ cube = self if auto_add_save_result: @@ -310,7 +315,10 @@ def execute_batch( return job.run_synchronous( # TODO #135 support multi file result sets too outputfile=outputfile, - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, + max_poll_interval=max_poll_interval, + connection_retry_interval=connection_retry_interval, + show_error_logs=show_error_logs, ) def create_job( diff --git a/setup.py b/setup.py index 81a51ffd9..56e3220cd 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,7 @@ "mock", "requests-mock>=1.8.0", "httpretty>=1.1.4", + "urllib3<2.3.0", # httpretty doesn't work properly with urllib3>=2.3.0. See #700 and https://github.com/gabrielfalcao/HTTPretty/issues/484 "netCDF4>=1.7.0", "matplotlib", # TODO: eliminate matplotlib as test dependency "geopandas", diff --git a/tests/extra/job_management/test_job_management.py b/tests/extra/job_management/test_job_management.py index 8a9d2f694..d88a589a3 100644 --- a/tests/extra/job_management/test_job_management.py +++ b/tests/extra/job_management/test_job_management.py @@ -7,6 +7,7 @@ from time import sleep from typing import Callable, Union from unittest import mock +import datetime import dirty_equals import geopandas @@ -554,6 +555,7 @@ def start_job(row, connection_provider, connection, **kwargs): 12 * 60 * 60, "finished", ), + ], ) def test_automatic_cancel_of_too_long_running_jobs( @@ -645,6 +647,80 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep assert needle.search(caplog.text) + @pytest.mark.parametrize( + ["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds"], + [ + # Scenario 1: Missing running_start_time (None) + ( + "2024-09-01T09:00:00Z", # Job creation time + "2024-09-01T09:00:00Z", # Job start time (should be 1 hour after create_time) + None, # Missing running_start_time + "2024-09-01T20:00:00Z", # Job end time + "finished", # Job final status + 6 * 60 * 60, # Cancel after 6 hours + ), + # Scenario 2: NaN running_start_time + ( + "2024-09-01T09:00:00Z", + "2024-09-01T09:00:00Z", + float("nan"), # NaN running_start_time + "2024-09-01T20:00:00Z", # Job end time + "finished", # Job final status + 6 * 60 * 60, # Cancel after 6 hours + ), + ] + ) + def test_ensure_running_start_time_is_datetime( + self, + tmp_path, + time_machine, + create_time, + start_time, + running_start_time, + end_time, + end_status, + cancel_after_seconds, + dummy_backend_foo, + job_manager_root_dir, + ): + def get_status(job_id, current_status): + if rfc3339.utcnow() < start_time: + return "queued" + elif rfc3339.utcnow() < end_time: + return "running" + return end_status + + # Set the job status updater function for the mock backend + dummy_backend_foo.job_status_updater = get_status + + job_manager = MultiBackendJobManager( + root_dir=job_manager_root_dir, cancel_running_job_after=cancel_after_seconds + ) + job_manager.add_backend("foo", connection=dummy_backend_foo.connection) + + # Create a DataFrame representing the job database + df = pd.DataFrame({ + "year": [2024], + "running_start_time": [running_start_time], # Initial running_start_time + }) + + # Move the time machine to the job creation time + time_machine.move_to(create_time) + + job_db_path = tmp_path / "jobs.csv" + + # Mock sleep() to skip one hour at a time instead of actually sleeping + with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)): + job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path) + + final_df = CsvJobDatabase(job_db_path).read() + + # Validate running_start_time is a valid datetime object + filled_running_start_time = final_df.iloc[0]["running_start_time"] + assert isinstance(rfc3339.parse_datetime(filled_running_start_time), datetime.datetime) + + + JOB_DB_DF_BASICS = pd.DataFrame( { "numbers": [3, 2, 1], diff --git a/tests/extra/job_management/test_stac_job_db.py b/tests/extra/job_management/test_stac_job_db.py index 69c9d866c..2833f630c 100644 --- a/tests/extra/job_management/test_stac_job_db.py +++ b/tests/extra/job_management/test_stac_job_db.py @@ -363,8 +363,10 @@ def handle_row(series): job_db_exists._upload_items_bulk(collection_id=job_db_exists.collection_id, items=items) # 10 items in total, 3 items per chunk, should result in 4 calls - assert mock_requests_post.call_count == 4 - expected_calls = [ + assert sorted( + (c.kwargs for c in mock_requests_post.call_args_list), + key=lambda d: sorted(d["json"]["items"].keys()), + ) == [ { "url": f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items", "auth": None, @@ -386,6 +388,3 @@ def handle_row(series): "json": {"method": "upsert", "items": {item.id: item.to_dict() for item in items[9:]}}, }, ] - - for i, call in enumerate(mock_requests_post.call_args_list): - assert call[1] == expected_calls[i] diff --git a/tests/rest/test_job.py b/tests/rest/test_job.py index 775f1b38a..4990d9180 100644 --- a/tests/rest/test_job.py +++ b/tests/rest/test_job.py @@ -151,6 +151,37 @@ def test_execute_batch_with_error(con100, requests_mock, tmpdir): ] +@pytest.mark.parametrize("show_error_logs", [True, False]) +def test_execute_batch_show_error_logs(con100, requests_mock, show_error_logs): + requests_mock.get(API_URL + "/file_formats", json={"output": {"GTiff": {"gis_data_types": ["raster"]}}}) + requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"}) + requests_mock.post(API_URL + "/jobs", status_code=201, headers={"OpenEO-Identifier": "f00ba5"}) + requests_mock.post(API_URL + "/jobs/f00ba5/results", status_code=202) + requests_mock.get(API_URL + "/jobs/f00ba5", json={"status": "error", "progress": 100}) + requests_mock.get( + API_URL + "/jobs/f00ba5/logs", + json={"logs": [{"id": "34", "level": "error", "message": "nope"}]}, + ) + + stdout = [] + with fake_time(), pytest.raises(JobFailedException): + con100.load_collection("SENTINEL2").execute_batch( + max_poll_interval=0.1, print=stdout.append, show_error_logs=show_error_logs + ) + + expected = [ + "0:00:01 Job 'f00ba5': send 'start'", + "0:00:02 Job 'f00ba5': error (progress 100%)", + ] + if show_error_logs: + expected += [ + "Your batch job 'f00ba5' failed. Error logs:", + [{"id": "34", "level": "error", "message": "nope"}], + "Full logs can be inspected in an openEO (web) editor or with `connection.job('f00ba5').logs()`.", + ] + assert stdout == expected + + @pytest.mark.parametrize(["error_response", "expected"], [ ( {"exc": requests.ConnectionError("time out")},