Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mogpr timestep option #104

Merged
merged 9 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions src/fusets/mogpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
import numpy as np
import pandas as pd
import xarray
from xarray import Dataset

from fusets._xarray_utils import _extract_dates, _time_dimension
from fusets._xarray_utils import _extract_dates, _time_dimension, _output_dates
from fusets.base import BaseEstimator

_openeo_exists = importlib.util.find_spec("openeo") is not None
Expand All @@ -23,10 +22,10 @@

class MOGPRTransformer(BaseEstimator):
"""

MOGPR (multi-output gaussia-process regression) integrates various timeseries and delivers the same amount of reconstructed timeseries. This allows to
fill gaps based on other indicators that are correlated with each other.

"""

def __init__(self) -> None:
Expand Down Expand Up @@ -125,15 +124,15 @@ def transform(self, X):
))
return out_ds

def fit_transform(self, X: Union[Dataset, DataCube], y=None, **fit_params):
def fit_transform(self, X: Union[xarray.Dataset, DataCube], y=None, **fit_params):
if _openeo_exists and isinstance(X, DataCube):
from .openeo import mogpr as mogpr_openeo
return mogpr_openeo(X)

return mogpr(X)


def mogpr(array: Dataset, variables: List[str] = None, time_dimension="t") -> xarray.Dataset:
def mogpr(array: xarray.Dataset, variables: List[str] = None, time_dimension: str="t", prediction_period: str=None) -> xarray.Dataset:
"""
MOGPR (multi-output gaussian-process regression) integrates various timeseries into a single values. This allows to
fill gaps based on other indicators that are correlated with each other.
Expand All @@ -144,34 +143,33 @@ def mogpr(array: Dataset, variables: List[str] = None, time_dimension="t") -> xa
array: An input datacube having at least a temporal dimension over which the smoothing will be applied.
variables: The list of variable names that should be included, or None to use all variables
time_dimension: The name of the time dimension of this datacube. Only needs to be specified to resolve ambiguities.
prediction_period: The duration specified as ISO-8601, e.g. P5D: 5-daily, P1M: monthly. Defaults to input dates.

Returns: A gapfilled datacube.

"""

dates = _extract_dates(array)
time_dimension = _time_dimension(array, time_dimension)
output_time_dimension = 't_new'

dates_np = [d.toordinal() for d in dates]
output_dates = dates
output_time_dimension = "t_new"

if prediction_period is not None:
output_dates = _output_dates(prediction_period, dates[0], dates[-1])

if isinstance(array, xarray.Dataset):
selected_values = [v.values for v in array.values() if variables is None or v.name in variables]
else:
selected_values = [array]
dates_np = np.array([d.toordinal() for d in dates], dtype=np.float64)
output_dates_np = np.array([d.toordinal() for d in output_dates], dtype=np.float64)

tstep = 5
time_vec_min = np.min(dates_np)
time_vec_max = np.max(dates_np)
output_timevec = np.array(range(int(time_vec_min), int(time_vec_max), tstep), dtype=np.float64)
output_time = [datetime.fromordinal(int(_)) for _ in output_timevec]
if variables is not None:
array = array.drop_vars([var for var in list(array.data_vars) if var not in variables])

if len(output_time) == 0:
if len(output_dates) == 0:
raise Exception('The result does not contain any output times, please select a larger range')

def callback(timeseries):
out_mean, out_std, out_qflag, out_model = mogpr_1D(timeseries, list([np.array(dates_np) for i in timeseries]),
0, output_timevec=output_timevec, nt=1, trained_model=None)
out_mean, _, _, _ = mogpr_1D(timeseries, list([dates_np for _ in timeseries]),
0, output_timevec=output_dates_np, nt=1, trained_model=None)
result = np.array(out_mean)
return result

Expand All @@ -180,7 +178,7 @@ def callback(timeseries):
input_core_dims=[["variable", time_dimension]],
output_core_dims=[["variable", output_time_dimension]], vectorize=True)

result = result.assign_coords({output_time_dimension: output_time})
result = result.assign_coords({output_time_dimension: output_dates})
result = result.rename({output_time_dimension: time_dimension, "variable": "bands"})

return result.to_dataset(dim="bands")
Expand Down Expand Up @@ -308,7 +306,7 @@ def _MOGPR_GPY_retrieval(data_in, time_in, master_ind, output_timevec, nt):
def mogpr_1D(data_in, time_in, master_ind, output_timevec, nt, trained_model=None):
"""
Function performing the multioutput gaussian-process regression at pixel level for gapfilling purposes

Args:
data_in (list): List of numpy 1D arrays containing data to be processed
time_in (list): List of numpy 1D arrays containing the (ordinal)dates of each variable in the time dimension
Expand Down Expand Up @@ -356,7 +354,7 @@ def mogpr_1D(data_in, time_in, master_ind, output_timevec, nt, trained_model=Non
Y_vec.append(Y_tmp)
del X_tmp, Y_tmp

# Data Normalization
# Data Normalization
Y_mean_vec.append(np.mean(Y_vec[ind]))
Y_std_vec.append(np.std(Y_vec[ind]))
Y_vec[ind] = (Y_vec[ind] - Y_mean_vec[ind]) / Y_std_vec[ind]
Expand All @@ -376,7 +374,7 @@ def mogpr_1D(data_in, time_in, master_ind, output_timevec, nt, trained_model=Non
# Linear Coregionalization
LCM = LCM(input_dim=1, num_outputs=noutputs, kernels_list=[K] * noutputs, W_rank=1)
if trained_model is None:
# Linear coregionalization
# Linear coregionalization
out_model = GPCoregionalizedRegression(Xtrain, Ytrain, kernel=LCM.copy())
out_model.optimize()
else:
Expand Down Expand Up @@ -418,7 +416,7 @@ def mogpr_1D(data_in, time_in, master_ind, output_timevec, nt, trained_model=Non

del Yp, Vp

# Flatten the series
# Flatten the series
out_mean_list = []
out_std_list = []

Expand Down
6 changes: 5 additions & 1 deletion src/fusets/openeo/mogpr_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ def apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:
home = write_gpy_cfg()

from fusets.mogpr import mogpr
variables = context.get('variables')
time_dimension = context.get('time_dimension', 't')
prediction_period = context.get('prediction_period', '5D')

dims = cube.get_array().dims
result = mogpr(cube.get_array().to_dataset(dim="bands"))
result = mogpr(cube.get_array().to_dataset(dim="bands"), variables=variables, time_dimension=time_dimension, prediction_period=prediction_period)
result_dc = XarrayDataCube(result.to_array(dim="bands").transpose(*dims))
set_home(home)
return result_dc
Expand Down