Skip to content

Commit

Permalink
Add mogpr timestep option (#104)
Browse files Browse the repository at this point in the history
* remove whitespace

* simplify import, remove whitespaces

* fix bug when limiting input variables

* add option to provide time step

* hide unused variables

* add type hints

* fix default output time dimension name

* add parameters to the mogpr UDF
  • Loading branch information
Matic Lubej authored Oct 23, 2023
1 parent 63c59aa commit 7233f41
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 26 deletions.
48 changes: 23 additions & 25 deletions src/fusets/mogpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
import numpy as np
import pandas as pd
import xarray
from xarray import Dataset

from fusets._xarray_utils import _extract_dates, _time_dimension
from fusets._xarray_utils import _extract_dates, _time_dimension, _output_dates
from fusets.base import BaseEstimator

_openeo_exists = importlib.util.find_spec("openeo") is not None
Expand All @@ -23,10 +22,10 @@

class MOGPRTransformer(BaseEstimator):
"""
MOGPR (multi-output gaussia-process regression) integrates various timeseries and delivers the same amount of reconstructed timeseries. This allows to
fill gaps based on other indicators that are correlated with each other.
"""

def __init__(self) -> None:
Expand Down Expand Up @@ -125,15 +124,15 @@ def transform(self, X):
))
return out_ds

def fit_transform(self, X: Union[Dataset, DataCube], y=None, **fit_params):
def fit_transform(self, X: Union[xarray.Dataset, DataCube], y=None, **fit_params):
if _openeo_exists and isinstance(X, DataCube):
from .openeo import mogpr as mogpr_openeo
return mogpr_openeo(X)

return mogpr(X)


def mogpr(array: Dataset, variables: List[str] = None, time_dimension="t") -> xarray.Dataset:
def mogpr(array: xarray.Dataset, variables: List[str] = None, time_dimension: str="t", prediction_period: str=None) -> xarray.Dataset:
"""
MOGPR (multi-output gaussian-process regression) integrates various timeseries into a single values. This allows to
fill gaps based on other indicators that are correlated with each other.
Expand All @@ -144,34 +143,33 @@ def mogpr(array: Dataset, variables: List[str] = None, time_dimension="t") -> xa
array: An input datacube having at least a temporal dimension over which the smoothing will be applied.
variables: The list of variable names that should be included, or None to use all variables
time_dimension: The name of the time dimension of this datacube. Only needs to be specified to resolve ambiguities.
prediction_period: The duration specified as ISO-8601, e.g. P5D: 5-daily, P1M: monthly. Defaults to input dates.
Returns: A gapfilled datacube.
"""

dates = _extract_dates(array)
time_dimension = _time_dimension(array, time_dimension)
output_time_dimension = 't_new'

dates_np = [d.toordinal() for d in dates]
output_dates = dates
output_time_dimension = "t_new"

if prediction_period is not None:
output_dates = _output_dates(prediction_period, dates[0], dates[-1])

if isinstance(array, xarray.Dataset):
selected_values = [v.values for v in array.values() if variables is None or v.name in variables]
else:
selected_values = [array]
dates_np = np.array([d.toordinal() for d in dates], dtype=np.float64)
output_dates_np = np.array([d.toordinal() for d in output_dates], dtype=np.float64)

tstep = 5
time_vec_min = np.min(dates_np)
time_vec_max = np.max(dates_np)
output_timevec = np.array(range(int(time_vec_min), int(time_vec_max), tstep), dtype=np.float64)
output_time = [datetime.fromordinal(int(_)) for _ in output_timevec]
if variables is not None:
array = array.drop_vars([var for var in list(array.data_vars) if var not in variables])

if len(output_time) == 0:
if len(output_dates) == 0:
raise Exception('The result does not contain any output times, please select a larger range')

def callback(timeseries):
out_mean, out_std, out_qflag, out_model = mogpr_1D(timeseries, list([np.array(dates_np) for i in timeseries]),
0, output_timevec=output_timevec, nt=1, trained_model=None)
out_mean, _, _, _ = mogpr_1D(timeseries, list([dates_np for _ in timeseries]),
0, output_timevec=output_dates_np, nt=1, trained_model=None)
result = np.array(out_mean)
return result

Expand All @@ -180,7 +178,7 @@ def callback(timeseries):
input_core_dims=[["variable", time_dimension]],
output_core_dims=[["variable", output_time_dimension]], vectorize=True)

result = result.assign_coords({output_time_dimension: output_time})
result = result.assign_coords({output_time_dimension: output_dates})
result = result.rename({output_time_dimension: time_dimension, "variable": "bands"})

return result.to_dataset(dim="bands")
Expand Down Expand Up @@ -308,7 +306,7 @@ def _MOGPR_GPY_retrieval(data_in, time_in, master_ind, output_timevec, nt):
def mogpr_1D(data_in, time_in, master_ind, output_timevec, nt, trained_model=None):
"""
Function performing the multioutput gaussian-process regression at pixel level for gapfilling purposes
Args:
data_in (list): List of numpy 1D arrays containing data to be processed
time_in (list): List of numpy 1D arrays containing the (ordinal)dates of each variable in the time dimension
Expand Down Expand Up @@ -356,7 +354,7 @@ def mogpr_1D(data_in, time_in, master_ind, output_timevec, nt, trained_model=Non
Y_vec.append(Y_tmp)
del X_tmp, Y_tmp

# Data Normalization
# Data Normalization
Y_mean_vec.append(np.mean(Y_vec[ind]))
Y_std_vec.append(np.std(Y_vec[ind]))
Y_vec[ind] = (Y_vec[ind] - Y_mean_vec[ind]) / Y_std_vec[ind]
Expand All @@ -376,7 +374,7 @@ def mogpr_1D(data_in, time_in, master_ind, output_timevec, nt, trained_model=Non
# Linear Coregionalization
LCM = LCM(input_dim=1, num_outputs=noutputs, kernels_list=[K] * noutputs, W_rank=1)
if trained_model is None:
# Linear coregionalization
# Linear coregionalization
out_model = GPCoregionalizedRegression(Xtrain, Ytrain, kernel=LCM.copy())
out_model.optimize()
else:
Expand Down Expand Up @@ -418,7 +416,7 @@ def mogpr_1D(data_in, time_in, master_ind, output_timevec, nt, trained_model=Non

del Yp, Vp

# Flatten the series
# Flatten the series
out_mean_list = []
out_std_list = []

Expand Down
6 changes: 5 additions & 1 deletion src/fusets/openeo/mogpr_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ def apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:
home = write_gpy_cfg()

from fusets.mogpr import mogpr
variables = context.get('variables')
time_dimension = context.get('time_dimension', 't')
prediction_period = context.get('prediction_period', '5D')

dims = cube.get_array().dims
result = mogpr(cube.get_array().to_dataset(dim="bands"))
result = mogpr(cube.get_array().to_dataset(dim="bands"), variables=variables, time_dimension=time_dimension, prediction_period=prediction_period)
result_dc = XarrayDataCube(result.to_array(dim="bands").transpose(*dims))
set_home(home)
return result_dc
Expand Down

0 comments on commit 7233f41

Please sign in to comment.