Skip to content

Commit

Permalink
Merge branch 'master' into issue693-sar_backscatter-get-coefficients-…
Browse files Browse the repository at this point in the history
…from-schema
  • Loading branch information
ElienVandermaesenVITO committed Jan 9, 2025
2 parents 71b2b79 + d5180c3 commit 53936a6
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `show_error_logs` argument to `cube.execute_batch()`/`job.start_and_wait()`/... to toggle the automatic printing of error logs on failure ([#505](https://github.com/Open-EO/openeo-python-client/issues/505))

### Changed

### Removed
Expand Down
4 changes: 2 additions & 2 deletions docs/batch_jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ When using
:py:meth:`job.start_and_wait() <openeo.rest.job.BatchJob.start_and_wait>`
or :py:meth:`cube.execute_batch() <openeo.rest.datacube.DataCube.execute_batch>`
to run a batch job and it fails,
the openEO Python client library will automatically
print the batch job logs and instructions to help with further investigation:
the openEO Python client library will print (by default)
the batch job's error logs and instructions to help with further investigation:
.. code-block:: pycon
Expand Down
28 changes: 22 additions & 6 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,16 +658,25 @@ def on_job_cancel(self, job: BatchJob, row):

def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True)
elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time
if elapsed > self._cancel_running_job_after:
try:
try:
# Ensure running start time is valid
job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True)

# Parse the current time into a datetime object with timezone info
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)

# Calculate the elapsed time between job start and now
elapsed = current_time - job_running_start_time

if elapsed > self._cancel_running_job_after:

_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()
except OpenEoApiError as e:
_log.error(f"Failed to cancel long-running job {job.job_id}: {e}")

except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
Expand Down Expand Up @@ -728,6 +737,13 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
self.on_job_cancel(the_job, active.loc[i])

if self._cancel_running_job_after and new_status == "running":
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
_log.warning(
f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation."
)
stats["job started running"] += 1
active.loc[i, "running_start_time"] = rfc3339.utcnow()

self._cancel_prolonged_job(the_job, active.loc[i])

active.loc[i, "status"] = new_status
Expand Down
5 changes: 4 additions & 1 deletion openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
ContextTimer,
LazyLoadCache,
dict_no_none,
ensure_dir,
ensure_list,
load_json_resource,
repr_truncate,
Expand Down Expand Up @@ -1771,7 +1772,9 @@ def download(
)

if outputfile is not None:
with Path(outputfile).open(mode="wb") as f:
target = Path(outputfile)
ensure_dir(target.parent)
with target.open(mode="wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
f.write(chunk)
else:
Expand Down
10 changes: 9 additions & 1 deletion openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -2484,6 +2484,7 @@ def execute_batch(
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
show_error_logs: bool = True,
# TODO: deprecate `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand All @@ -2501,12 +2502,16 @@ def execute_batch(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
.. versionadded:: 0.36.0
Added argument ``additional``.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
# TODO: start showing deprecation warnings about these inconsistent argument names
if "format" in format_options and not out_format:
Expand Down Expand Up @@ -2536,7 +2541,10 @@ def execute_batch(
)
return job.run_synchronous(
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)

def create_job(
Expand Down
50 changes: 38 additions & 12 deletions openeo/rest/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,20 +235,43 @@ def logs(
return VisualList("logs", data=entries)

def run_synchronous(
self, outputfile: Union[str, Path, None] = None,
print=print, max_poll_interval=60, connection_retry_interval=30
self,
outputfile: Union[str, Path, None] = None,
print=print,
max_poll_interval=60,
connection_retry_interval=30,
show_error_logs: bool = True,
) -> BatchJob:
"""Start the job, wait for it to finish and download result"""
"""
Start the job, wait for it to finish and download result
:param outputfile: The path of a file to which a result can be written
:param print: print/logging function to show progress/status
:param max_poll_interval: maximum number of seconds to sleep between status polls
:param connection_retry_interval: how long to wait when status poll failed due to connection issue
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
self.start_and_wait(
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)
# TODO #135 support multi file result sets too?
if outputfile is not None:
self.download_result(outputfile)
return self

def start_and_wait(
self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10
self,
print=print,
max_poll_interval: int = 60,
connection_retry_interval: int = 30,
soft_error_max=10,
show_error_logs: bool = True,
) -> BatchJob:
"""
Start the batch job, poll its status and wait till it finishes (or fails)
Expand All @@ -257,7 +280,10 @@ def start_and_wait(
:param max_poll_interval: maximum number of seconds to sleep between status polls
:param connection_retry_interval: how long to wait when status poll failed due to connection issue
:param soft_error_max: maximum number of soft errors (e.g. temporary connection glitches) to allow
:return:
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
# TODO rename `connection_retry_interval` to something more generic?
start_time = time.time()
Expand Down Expand Up @@ -314,13 +340,13 @@ def soft_error(message: str):
poll_interval = min(1.25 * poll_interval, max_poll_interval)

if status != "finished":
# TODO: allow to disable this printing logs (e.g. in non-interactive contexts)?
# TODO: render logs jupyter-aware in a notebook context?
print(f"Your batch job {self.job_id!r} failed. Error logs:")
print(self.logs(level=logging.ERROR))
print(
f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`."
)
if show_error_logs:
print(f"Your batch job {self.job_id!r} failed. Error logs:")
print(self.logs(level=logging.ERROR))
print(
f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`."
)
raise JobFailedException(
f"Batch job {self.job_id!r} didn't finish successfully. Status: {status} (after {elapsed()}).",
job=self,
Expand Down
12 changes: 10 additions & 2 deletions openeo/rest/mlmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ def execute_batch(
connection_retry_interval=30,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
show_error_logs: bool = True,
) -> BatchJob:
"""
Evaluate the process graph by creating a batch job, and retrieving the results when it is finished.
This method is mostly recommended if the batch job is expected to run in a reasonable amount of time.
For very long running jobs, you probably do not want to keep the client running.
For very long-running jobs, you probably do not want to keep the client running.
:param job_options:
:param outputfile: The path of a file to which a result can be written
Expand All @@ -85,9 +86,13 @@ def execute_batch(
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionadded:: 0.36.0
Added argument ``additional``.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
job = self.create_job(
title=title,
Expand All @@ -100,7 +105,10 @@ def execute_batch(
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)

def create_job(
Expand Down
10 changes: 9 additions & 1 deletion openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def execute_batch(
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
show_error_logs: bool = True,
# TODO: avoid using kwargs as format options
**format_options,
) -> BatchJob:
Expand All @@ -277,6 +278,7 @@ def execute_batch(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionchanged:: 0.21.0
When not specified explicitly, output format is guessed from output file extension.
Expand All @@ -286,6 +288,9 @@ def execute_batch(
.. versionadded:: 0.36.0
Added argument ``additional``.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
cube = self
if auto_add_save_result:
Expand All @@ -310,7 +315,10 @@ def execute_batch(
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)

def create_job(
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"mock",
"requests-mock>=1.8.0",
"httpretty>=1.1.4",
"urllib3<2.3.0", # httpretty doesn't work properly with urllib3>=2.3.0. See #700 and https://github.com/gabrielfalcao/HTTPretty/issues/484
"netCDF4>=1.7.0",
"matplotlib", # TODO: eliminate matplotlib as test dependency
"geopandas",
Expand Down
76 changes: 76 additions & 0 deletions tests/extra/job_management/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from time import sleep
from typing import Callable, Union
from unittest import mock
import datetime

import dirty_equals
import geopandas
Expand Down Expand Up @@ -554,6 +555,7 @@ def start_job(row, connection_provider, connection, **kwargs):
12 * 60 * 60,
"finished",
),
],
)
def test_automatic_cancel_of_too_long_running_jobs(
Expand Down Expand Up @@ -645,6 +647,80 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep
assert needle.search(caplog.text)


@pytest.mark.parametrize(
["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds"],
[
# Scenario 1: Missing running_start_time (None)
(
"2024-09-01T09:00:00Z", # Job creation time
"2024-09-01T09:00:00Z", # Job start time (should be 1 hour after create_time)
None, # Missing running_start_time
"2024-09-01T20:00:00Z", # Job end time
"finished", # Job final status
6 * 60 * 60, # Cancel after 6 hours
),
# Scenario 2: NaN running_start_time
(
"2024-09-01T09:00:00Z",
"2024-09-01T09:00:00Z",
float("nan"), # NaN running_start_time
"2024-09-01T20:00:00Z", # Job end time
"finished", # Job final status
6 * 60 * 60, # Cancel after 6 hours
),
]
)
def test_ensure_running_start_time_is_datetime(
self,
tmp_path,
time_machine,
create_time,
start_time,
running_start_time,
end_time,
end_status,
cancel_after_seconds,
dummy_backend_foo,
job_manager_root_dir,
):
def get_status(job_id, current_status):
if rfc3339.utcnow() < start_time:
return "queued"
elif rfc3339.utcnow() < end_time:
return "running"
return end_status

# Set the job status updater function for the mock backend
dummy_backend_foo.job_status_updater = get_status

job_manager = MultiBackendJobManager(
root_dir=job_manager_root_dir, cancel_running_job_after=cancel_after_seconds
)
job_manager.add_backend("foo", connection=dummy_backend_foo.connection)

# Create a DataFrame representing the job database
df = pd.DataFrame({
"year": [2024],
"running_start_time": [running_start_time], # Initial running_start_time
})

# Move the time machine to the job creation time
time_machine.move_to(create_time)

job_db_path = tmp_path / "jobs.csv"

# Mock sleep() to skip one hour at a time instead of actually sleeping
with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)):
job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path)

final_df = CsvJobDatabase(job_db_path).read()

# Validate running_start_time is a valid datetime object
filled_running_start_time = final_df.iloc[0]["running_start_time"]
assert isinstance(rfc3339.parse_datetime(filled_running_start_time), datetime.datetime)



JOB_DB_DF_BASICS = pd.DataFrame(
{
"numbers": [3, 2, 1],
Expand Down
Loading

0 comments on commit 53936a6

Please sign in to comment.