Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple job progress estimation #340

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ and start a new "In Progress" section above it.

## In progress


## 0.122.0

- `load_collection`: more consistent cube extent handling when a buffer is applied. ([#334](https://github.com/Open-EO/openeo-python-driver/issues/334))
- `load_collection`: collapse multiple `load_collection` calls into a single one in cases with buffers. ([#336](https://github.com/Open-EO/openeo-python-driver/issues/336))
- Add `simple_job_progress_estimation` config for simple job progress estimation ([Open-EO/openeo-geopyspark-driver#772](https://github.com/Open-EO/openeo-geopyspark-driver/issues/772))

## 0.121.0

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.121.0a1"
__version__ = "0.122.0a1"
26 changes: 21 additions & 5 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from openeo_driver.processes import ProcessRegistry
from openeo_driver.users import User
from openeo_driver.users.oidc import OidcProvider
from openeo_driver.util.date_math import simple_job_progress_estimation
from openeo_driver.util.logging import just_log_exceptions
from openeo_driver.utils import read_json, dict_item, EvalEnv, extract_namedtuple_fields_from_dict, \
get_package_versions

Expand Down Expand Up @@ -178,12 +180,12 @@ class LoadParameters(dict):
"""
A buffer provided in the units of the target CRS. If target CRS is not provided, then it is assumed to be the native CRS
of the collection.
This buffer is applied to AOI when constructing the datacube, allowing operations that require neighbouring pixels

This buffer is applied to AOI when constructing the datacube, allowing operations that require neighbouring pixels
to be implemented correctly. Examples are apply_kernel and apply_neighborhood, but also certain resampling operations
could be affected by this.
The buffer has to be considered in the global extent!

The buffer has to be considered in the global extent!
"""
pixel_buffer = dict_item(default=None)

Expand Down Expand Up @@ -285,6 +287,7 @@ class BatchJobMetadata(NamedTuple):
job_options: Optional[dict] = None
title: Optional[str] = None
description: Optional[str] = None
# Progress as percent value: 0.0 (just started) - 100.0 (fully completed)
progress: Optional[float] = None
updated: Optional[datetime] = None
plan: Optional[str] = None
Expand Down Expand Up @@ -360,8 +363,21 @@ def to_api_dict(self, full=True, api_version: ComparableVersion = None) -> dict:
result["created"] = rfc3339.datetime(self.created) if self.created else None
result["updated"] = rfc3339.datetime(self.updated) if self.updated else None

progress = self.progress
if (
self.status == JOB_STATUS.RUNNING
and self.started
and progress is None
and get_backend_config().simple_job_progress_estimation
):
# TODO: is there a cleaner place to inject this fallback progress estimation?
with just_log_exceptions(log=logger, name="simple_job_progress_estimation"):
progress = 100 * simple_job_progress_estimation(
started=self.started, average_run_time=get_backend_config().simple_job_progress_estimation
)
# Clamp "progress" for certain "status" values according to the spec.
result["progress"] = {"created": 0, "queued": 0, "finished": 100}.get(self.status, self.progress)
progress = {"created": 0, "queued": 0, "finished": 100}.get(self.status, progress)
result["progress"] = progress

if full:
usage = self.usage or {}
Expand Down
3 changes: 3 additions & 0 deletions openeo_driver/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class OpenEoBackendConfig:

ejr_retry_settings: dict = attrs.Factory(lambda: dict(tries=4, delay=2, backoff=2))

"Experimental: simple job progress fallback estimation. Specify average batch job completion time (wall clock) in seconds."
simple_job_progress_estimation: Optional[float] = None


def check_config_definition(config_class: Type[OpenEoBackendConfig]):
"""
Expand Down
30 changes: 30 additions & 0 deletions openeo_driver/util/date_math.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,33 @@ def month_shift(
except ValueError:
# Handle month overflow (e.g clip Feb 31 to 28)
return month_shift(d=d.replace(day=1), months=months + 1) - dt.timedelta(days=1)


def simple_job_progress_estimation(started: dt.datetime, average_run_time: float) -> float:
"""
Simple progress estimation,
assuming job run time is distributed exponentially (with lambda = 1 / average run time)

- estimated remaining run time = average run time
(note that this is regardless of current run time so far,
this is mathematical consequence of assuming an exponential distribution)
- estimated total run time = current run time + average run time
- estimated progress = current run time / (current run time + average run time)

:param started: start time of the job
:param average_run_time: average run time of jobs in seconds
:return: progress as a fraction in range [0, 1]
"""
# TODO: also support string input?
# TODO: also support other timezones than UTC or naive?

if started.tzinfo is None:
# Convert naive to UTC
started = started.replace(tzinfo=dt.timezone.utc)

now = dt.datetime.now(tz=dt.timezone.utc)
elapsed = (now - started).total_seconds()
if elapsed <= 0 or average_run_time <= 0:
return 0.0
progress = elapsed / (elapsed + average_run_time)
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
return progress
14 changes: 14 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pystac.validation.stac_validator
import pytest
import re_assert
import time_machine
import werkzeug.exceptions
from openeo.capabilities import ComparableVersion

Expand Down Expand Up @@ -1182,6 +1183,7 @@ def _fresh_job_registry(next_job_id="job-1234", output_root: Optional[Path] = No
status='running',
process={'process_graph': {'foo': {'process_id': 'foo', 'arguments': {}}}},
created=datetime(2017, 1, 1, 9, 32, 12),
started=datetime(2017, 1, 1, 12, 0, 0),
),
(TEST_USER, '53c71345-09b4-46b4-b6b0-03fd6fe1f199'): BatchJobMetadata(
id='53c71345-09b4-46b4-b6b0-03fd6fe1f199',
Expand Down Expand Up @@ -1391,6 +1393,18 @@ def test_get_job_info_invalid(self, api):
resp = api.get('/jobs/deadbeef-f00', headers=self.AUTH_HEADER).assert_error(404, "JobNotFound")
assert resp.json["message"] == "The batch job 'deadbeef-f00' does not exist."

@pytest.mark.parametrize("backend_config_overrides", [{"simple_job_progress_estimation": 600}])
def test_get_job_info_simple_job_progress_estimation(self, api100, backend_config_overrides):
with self._fresh_job_registry(), time_machine.travel(datetime(2017, 1, 1, 12, 5, 0)):
resp = api100.get("/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc", headers=self.AUTH_HEADER)
assert resp.assert_status_code(200).json == {
"id": "07024ee9-7847-4b8a-b260-6c879a2b3cdc",
"status": "running",
"progress": pytest.approx(33.33, abs=0.1),
"created": "2017-01-01T09:32:12Z",
"process": {"process_graph": {"foo": {"process_id": "foo", "arguments": {}}}},
}

def test_list_user_jobs_100(self, api100):
with self._fresh_job_registry():
resp = api100.get('/jobs', headers=self.AUTH_HEADER)
Expand Down
42 changes: 41 additions & 1 deletion tests/util/test_date_math.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import datetime as dt

import pandas as pd
import pytest
import time_machine

from openeo_driver.util.date_math import month_shift
from openeo_driver.util.date_math import month_shift, simple_job_progress_estimation


def test_month_shift_date():
Expand Down Expand Up @@ -63,3 +65,41 @@ def test_month_shift_pandas_timestamp():
def test_month_shift_overflow_pandas_timestamp():
assert month_shift(pd.to_datetime("2022-01-31"), months=1) == pd.Timestamp(2022, 2, 28)
assert month_shift(pd.to_datetime("2022-03-31"), months=-1) == pd.Timestamp(2022, 2, 28)


@time_machine.travel("2024-12-06T12:00:00+00")
@pytest.mark.parametrize(
"tzinfo",
[
None, # Naive
dt.timezone.utc, # Explicit UTC
],
)
def test_simple_job_progress_estimation_basic(tzinfo):
# Started 1 second ago
assert simple_job_progress_estimation(
dt.datetime(2024, 12, 6, 11, 59, 59, tzinfo=tzinfo),
average_run_time=600,
) == pytest.approx(0.0, abs=0.01)
# Started 5 minutes ago
assert simple_job_progress_estimation(
dt.datetime(2024, 12, 6, 11, 55, tzinfo=tzinfo),
average_run_time=600,
) == pytest.approx(0.33, abs=0.01)
# Long overdue
assert simple_job_progress_estimation(
dt.datetime(2024, 12, 5, tzinfo=tzinfo),
average_run_time=600,
) == pytest.approx(1.0, abs=0.01)


@time_machine.travel("2024-12-06T12:00:00+00")
def test_simple_job_progress_estimation_negative():
# OMG a job from the future.
assert (
simple_job_progress_estimation(
started=dt.datetime(2024, 12, 8),
average_run_time=600,
)
== 0.0
)