From f741056105a8f88b8370d7e65c3efb34bd3e19bb Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 14 Nov 2023 21:04:53 +0100 Subject: [PATCH] Issue #424 Add `DataCube.apply_polygon` --- CHANGELOG.md | 1 + openeo/rest/datacube.py | 43 ++- tests/rest/__init__.py | 8 +- tests/rest/datacube/test_datacube.py | 1 + tests/rest/datacube/test_datacube100.py | 377 +++++++++++++++++------- 5 files changed, 326 insertions(+), 104 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bd39ec84..0f16251cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#310](https://github.com/Open-EO/openeo-python-client/issues/310)) - Add `collection_property()` helper to easily build collection metadata property filters for `Connection.load_collection()` ([#331](https://github.com/Open-EO/openeo-python-client/pull/331)) +- Add `DataCube.apply_polygon()` (standardized version of experimental `chunk_polygon`) ([#424](https://github.com/Open-EO/openeo-python-client/issues/424)) ### Changed diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index 1d686052b..fd02db53c 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -1193,11 +1193,11 @@ def reduce_spatial( metadata=self.metadata.reduce_spatial(), ) - # @openeo_process + @deprecated("Use :py:meth:`apply_polygon`.", version="0.26.0") def chunk_polygon( self, chunks: Union[shapely.geometry.base.BaseGeometry, dict, str, pathlib.Path, Parameter, VectorCube], - process: Union[str, PGNode, typing.Callable], + process: Union[str, PGNode, typing.Callable, UDF], mask_value: float = None, context: Optional[dict] = None, ) -> DataCube: @@ -1237,6 +1237,45 @@ def chunk_polygon( ), ) + @openeo_process + def apply_polygon( + self, + polygons: Union[shapely.geometry.base.BaseGeometry, dict, str, pathlib.Path, Parameter, VectorCube], + process: Union[str, PGNode, typing.Callable, UDF], + mask_value: Optional[float] = None, + context: Optional[dict] = None, + ) -> DataCube: + """ + Apply a process to segments of the data cube that are defined by the given polygons. + For each polygon provided, all pixels for which the point at the pixel center intersects + with the polygon (as defined in the Simple Features standard by the OGC) are collected into sub data cubes. + If a pixel is part of multiple of the provided polygons (e.g., when the polygons overlap), + the GeometriesOverlap exception is thrown. + Each sub data cube is passed individually to the given process. + + .. warning:: experimental process: not generally supported, API subject to change. + + :param polygons: Polygons, provided as a shapely geometry, a GeoJSON-style dictionary, + a public GeoJSON URL, or a path (that is valid for the back-end) to a GeoJSON file. + :param process: "child callback" function, see :ref:`callbackfunctions` + :param mask_value: The value used for pixels outside the polygon. + :param context: Additional data to be passed to the process. + """ + process = build_child_callback(process, parent_parameters=["data"], connection=self.connection) + valid_geojson_types = ["Polygon", "MultiPolygon", "Feature", "FeatureCollection"] + polygons = self._get_geometry_argument(polygons, valid_geojson_types=valid_geojson_types) + mask_value = float(mask_value) if mask_value is not None else None + return self.process( + process_id="apply_polygon", + data=THIS, + polygons=polygons, + process=process, + arguments=dict_no_none( + mask_value=mask_value, + context=context, + ), + ) + def reduce_bands(self, reducer: Union[str, PGNode, typing.Callable, UDF]) -> DataCube: """ Shortcut for :py:meth:`reduce_dimension` along the band dimension diff --git a/tests/rest/__init__.py b/tests/rest/__init__.py index a85ccf3e7..caf08ff15 100644 --- a/tests/rest/__init__.py +++ b/tests/rest/__init__.py @@ -5,13 +5,15 @@ from openeo.rest.datacube import DataCube -def get_download_graph(cube: DataCube, drop_save_result: bool = False) -> dict: +def get_download_graph(cube: DataCube, *, drop_save_result: bool = False, drop_load_collection: bool = False) -> dict: """ Do fake download of a cube and intercept the process graph :param cube: cube to download - :param connection: connection object + :param drop_save_result: whether to drop the save_result node + :param drop_load_collection: whether to drop the load_collection node :return: """ + # TODO: move this to a testing utility module with mock.patch.object(cube.connection, 'download') as download: cube.download("out.geotiff", format="GTIFF") download.assert_called_once() @@ -21,6 +23,8 @@ def get_download_graph(cube: DataCube, drop_save_result: bool = False) -> dict: save_results_nodes = [k for k, v in actual_graph.items() if v["process_id"] == "save_result"] assert len(save_results_nodes) == 1 del actual_graph[save_results_nodes[0]] + if drop_load_collection: + actual_graph = {k: v for (k, v) in actual_graph.items() if v["process_id"] != "load_collection"} return actual_graph diff --git a/tests/rest/datacube/test_datacube.py b/tests/rest/datacube/test_datacube.py index c4d31149f..7d247d39b 100644 --- a/tests/rest/datacube/test_datacube.py +++ b/tests/rest/datacube/test_datacube.py @@ -65,6 +65,7 @@ def test_min_time(s2cube, api_version): def _get_leaf_node(cube, force_flat=True) -> dict: """Get leaf node (node with result=True), supporting old and new style of graph building.""" + # TODO: replace this with get_download_graph if isinstance(cube, DataCube): if force_flat: flat_graph = cube.flat_graph() diff --git a/tests/rest/datacube/test_datacube100.py b/tests/rest/datacube/test_datacube100.py index 9f38946e5..f17eeb8fd 100644 --- a/tests/rest/datacube/test_datacube100.py +++ b/tests/rest/datacube/test_datacube100.py @@ -38,30 +38,35 @@ basic_geometry_types = [ ( shapely.geometry.box(0, 0, 1, 1), - {"type": "Polygon", "coordinates": (((1.0, 0.0), (1.0, 1.0), (0.0, 1.0), (0.0, 0.0), (1.0, 0.0)),)}, + {"type": "Polygon", "coordinates": [[[1.0, 0.0], [1.0, 1.0], [0.0, 1.0], [0.0, 0.0], [1.0, 0.0]]]}, ), ( - {"type": "Polygon", "coordinates": (((1, 0), (1, 1), (0, 1), (0, 0), (1, 0)),)}, - {"type": "Polygon", "coordinates": (((1, 0), (1, 1), (0, 1), (0, 0), (1, 0)),)}, + {"type": "Polygon", "coordinates": [[[1, 0], [1, 1], [0, 1], [0, 0], [1, 0]]]}, + {"type": "Polygon", "coordinates": [[[1, 0], [1, 1], [0, 1], [0, 0], [1, 0]]]}, ), ( shapely.geometry.MultiPolygon([shapely.geometry.box(0, 0, 1, 1)]), - {"type": "MultiPolygon", "coordinates": [(((1.0, 0.0), (1.0, 1.0), (0.0, 1.0), (0.0, 0.0), (1.0, 0.0)),)]}, + {"type": "MultiPolygon", "coordinates": [[[[1.0, 0.0], [1.0, 1.0], [0.0, 1.0], [0.0, 0.0], [1.0, 0.0]]]]}, ), ( shapely.geometry.GeometryCollection([shapely.geometry.box(0, 0, 1, 1)]), - {"type": "GeometryCollection", "geometries": [ - {"type": "Polygon", "coordinates": (((1.0, 0.0), (1.0, 1.0), (0.0, 1.0), (0.0, 0.0), (1.0, 0.0)),)} - ]}, + { + "type": "GeometryCollection", + "geometries": [ + {"type": "Polygon", "coordinates": [[[1.0, 0.0], [1.0, 1.0], [0.0, 1.0], [0.0, 0.0], [1.0, 0.0]]]} + ], + }, ), ( { - "type": "Feature", "properties": {}, - "geometry": {"type": "Polygon", "coordinates": (((1, 0), (1, 1), (0, 1), (0, 0), (1, 0)),)}, + "type": "Feature", + "properties": {}, + "geometry": {"type": "Polygon", "coordinates": [[[1, 0], [1, 1], [0, 1], [0, 0], [1, 0]]]}, }, { - "type": "Feature", "properties": {}, - "geometry": {"type": "Polygon", "coordinates": (((1, 0), (1, 1), (0, 1), (0, 0), (1, 0)),)}, + "type": "Feature", + "properties": {}, + "geometry": {"type": "Polygon", "coordinates": [[[1, 0], [1, 1], [0, 1], [0, 0], [1, 0]]]}, }, ), ] @@ -206,6 +211,7 @@ def _get_normalizable_crs_inputs(): def _get_leaf_node(cube: DataCube) -> dict: """Get leaf node (node with result=True), supporting old and new style of graph building.""" + # TODO replace this with get_download_graph flat_graph = cube.flat_graph() (node,) = [n for n in flat_graph.values() if n.get("result")] return node @@ -661,16 +667,13 @@ def test_mask_polygon_basic(con100: Connection): @pytest.mark.parametrize(["polygon", "expected_mask"], basic_geometry_types) def test_mask_polygon_types(con100: Connection, polygon, expected_mask): - img = con100.load_collection("S2") - masked = img.mask_polygon(mask=polygon) - assert sorted(masked.flat_graph().keys()) == ["loadcollection1", "maskpolygon1"] - assert masked.flat_graph()["maskpolygon1"] == { - "process_id": "mask_polygon", - "arguments": { - "data": {"from_node": "loadcollection1"}, - "mask": expected_mask - }, - "result": True + cube = con100.load_collection("S2") + masked = cube.mask_polygon(mask=polygon) + assert get_download_graph(masked, drop_save_result=True, drop_load_collection=True) == { + "maskpolygon1": { + "process_id": "mask_polygon", + "arguments": {"data": {"from_node": "loadcollection1"}, "mask": expected_mask}, + } } @@ -1375,112 +1378,286 @@ def test_reduce_temporal_without_metadata(s2cube_without_metadata): def test_chunk_polygon_basic(con100: Connection): - img = con100.load_collection("S2") + cube = con100.load_collection("S2") polygon: shapely.geometry.Polygon = shapely.geometry.box(0, 0, 1, 1) process = lambda data: data.run_udf(udf="myfancycode", runtime="Python") - result = img.chunk_polygon(chunks=polygon, process=process) - assert sorted(result.flat_graph().keys()) == ['chunkpolygon1', 'loadcollection1'] - assert result.flat_graph()["chunkpolygon1"] == { - 'process_id': 'chunk_polygon', - 'arguments': { - 'chunks': { - 'type': 'Polygon', - 'coordinates': (((1.0, 0.0), (1.0, 1.0), (0.0, 1.0), (0.0, 0.0), (1.0, 0.0)),) - }, - 'data': {'from_node': 'loadcollection1'}, - 'process': { - 'process_graph': { - 'runudf1': { - 'process_id': 'run_udf', - 'arguments': {'data': {'from_parameter': 'data'}, 'runtime': 'Python', 'udf': 'myfancycode'}, - 'result': True + with pytest.warns(UserDeprecationWarning, match="Use `apply_polygon`"): + result = cube.chunk_polygon(chunks=polygon, process=process) + assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == { + "chunkpolygon1": { + "process_id": "chunk_polygon", + "arguments": { + "chunks": { + "type": "Polygon", + "coordinates": [[[1.0, 0.0], [1.0, 1.0], [0.0, 1.0], [0.0, 0.0], [1.0, 0.0]]], + }, + "data": {"from_node": "loadcollection1"}, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "data": {"from_parameter": "data"}, + "runtime": "Python", + "udf": "myfancycode", + }, + "result": True, + } } - } - } - }, - 'result': True} + }, + }, + } + } @pytest.mark.parametrize(["polygon", "expected_chunks"], basic_geometry_types) def test_chunk_polygon_types(con100: Connection, polygon, expected_chunks): - img = con100.load_collection("S2") - process = lambda data: data.run_udf(udf="myfancycode", runtime="Python") - result = img.chunk_polygon(chunks=polygon, process=process) - assert sorted(result.flat_graph().keys()) == ['chunkpolygon1', 'loadcollection1'] - assert result.flat_graph()["chunkpolygon1"] == { - 'process_id': 'chunk_polygon', - 'arguments': { - 'chunks': expected_chunks, - 'data': {'from_node': 'loadcollection1'}, - 'process': { - 'process_graph': { - 'runudf1': { - 'process_id': 'run_udf', - 'arguments': {'data': {'from_parameter': 'data'}, 'runtime': 'Python', 'udf': 'myfancycode'}, - 'result': True + cube = con100.load_collection("S2") + process = UDF(code="myfancycode", runtime="Python") + with pytest.warns(UserDeprecationWarning, match="Use `apply_polygon`"): + result = cube.chunk_polygon(chunks=polygon, process=process) + assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == { + "chunkpolygon1": { + "process_id": "chunk_polygon", + "arguments": { + "chunks": expected_chunks, + "data": {"from_node": "loadcollection1"}, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "data": {"from_parameter": "data"}, + "runtime": "Python", + "udf": "myfancycode", + }, + "result": True, + } } - } - } - }, - 'result': True} + }, + }, + } + } def test_chunk_polygon_parameter(con100: Connection): - img = con100.load_collection("S2") + cube = con100.load_collection("S2") polygon = Parameter(name="shape", schema="object") process = lambda data: data.run_udf(udf="myfancycode", runtime="Python") - result = img.chunk_polygon(chunks=polygon, process=process) - assert sorted(result.flat_graph().keys()) == ['chunkpolygon1', 'loadcollection1'] - assert result.flat_graph()["chunkpolygon1"] == { - 'process_id': 'chunk_polygon', - 'arguments': { - 'chunks': {"from_parameter": "shape"}, - 'data': {'from_node': 'loadcollection1'}, - 'process': { - 'process_graph': { - 'runudf1': { - 'process_id': 'run_udf', - 'arguments': {'data': {'from_parameter': 'data'}, 'runtime': 'Python', 'udf': 'myfancycode'}, - 'result': True + with pytest.warns(UserDeprecationWarning, match="Use `apply_polygon`"): + result = cube.chunk_polygon(chunks=polygon, process=process) + assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == { + "chunkpolygon1": { + "process_id": "chunk_polygon", + "arguments": { + "chunks": {"from_parameter": "shape"}, + "data": {"from_node": "loadcollection1"}, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "data": {"from_parameter": "data"}, + "runtime": "Python", + "udf": "myfancycode", + }, + "result": True, + } } - } - } - }, - 'result': True} + }, + }, + } + } def test_chunk_polygon_path(con100: Connection): - img = con100.load_collection("S2") + cube = con100.load_collection("S2") process = lambda data: data.run_udf(udf="myfancycode", runtime="Python") - result = img.chunk_polygon(chunks="path/to/polygon.json", process=process) - assert sorted(result.flat_graph().keys()) == ['chunkpolygon1', 'loadcollection1', 'readvector1'] - assert result.flat_graph()["chunkpolygon1"]['arguments']['chunks'] == {"from_node": "readvector1"} - assert result.flat_graph()["readvector1"] == { - "process_id": "read_vector", - "arguments": {"filename": "path/to/polygon.json"}, + with pytest.warns(UserDeprecationWarning, match="Use `apply_polygon`"): + result = cube.chunk_polygon(chunks="path/to/polygon.json", process=process) + assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == { + "readvector1": {"process_id": "read_vector", "arguments": {"filename": "path/to/polygon.json"}}, + "chunkpolygon1": { + "process_id": "chunk_polygon", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "chunks": {"from_node": "readvector1"}, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "data": {"from_parameter": "data"}, + "runtime": "Python", + "udf": "myfancycode", + }, + "result": True, + } + } + }, + }, + }, } def test_chunk_polygon_context(con100: Connection): - img = con100.load_collection("S2") + cube = con100.load_collection("S2") polygon = shapely.geometry.Polygon([(0, 0), (1, 0), (0, 1), (0, 0)]) process = lambda data: data.run_udf(udf="myfancycode", runtime="Python") - result = img.chunk_polygon(chunks=polygon, process=process, context={"foo": 4}) - assert result.flat_graph()["chunkpolygon1"] == { - 'process_id': 'chunk_polygon', - 'arguments': { - 'data': {'from_node': 'loadcollection1'}, - 'chunks': {'type': 'Polygon', 'coordinates': (((0, 0), (1, 0), (0, 1), (0, 0)),)}, - 'process': {'process_graph': { - 'runudf1': { - 'process_id': 'run_udf', - 'arguments': {'data': {'from_parameter': 'data'}, 'runtime': 'Python', 'udf': 'myfancycode'}, - 'result': True + with pytest.warns(UserDeprecationWarning, match="Use `apply_polygon`"): + result = cube.chunk_polygon(chunks=polygon, process=process, context={"foo": 4}) + assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == { + "chunkpolygon1": { + "process_id": "chunk_polygon", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "chunks": {"type": "Polygon", "coordinates": [[[0, 0], [1, 0], [0, 1], [0, 0]]]}, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "data": {"from_parameter": "data"}, + "runtime": "Python", + "udf": "myfancycode", + }, + "result": True, + } + } + }, + "context": {"foo": 4}, + }, + } + } + + +def test_apply_polygon_basic(con100: Connection): + cube = con100.load_collection("S2") + polygons: shapely.geometry.Polygon = shapely.geometry.box(0, 0, 1, 1) + process = UDF(code="myfancycode", runtime="Python") + result = cube.apply_polygon(polygons=polygons, process=process) + assert get_download_graph(result)["applypolygon1"] == { + "process_id": "apply_polygon", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "polygons": { + "type": "Polygon", + "coordinates": [[[1.0, 0.0], [1.0, 1.0], [0.0, 1.0], [0.0, 0.0], [1.0, 0.0]]], + }, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "runtime": "Python", "udf": "myfancycode"}, + "result": True, + } } - }}, + }, + }, + } + + +@pytest.mark.parametrize(["polygons", "expected_polygons"], basic_geometry_types) +def test_apply_polygon_types(con100: Connection, polygons, expected_polygons): + if isinstance(polygons, shapely.geometry.GeometryCollection): + pytest.skip("apply_polygon does not support GeometryCollection") + cube = con100.load_collection("S2") + process = UDF(code="myfancycode", runtime="Python") + result = cube.apply_polygon(polygons=polygons, process=process) + assert get_download_graph(result)["applypolygon1"] == { + "process_id": "apply_polygon", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "polygons": expected_polygons, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "runtime": "Python", "udf": "myfancycode"}, + "result": True, + } + } + }, + }, + } + + +def test_apply_polygon_parameter(con100: Connection): + cube = con100.load_collection("S2") + polygons = Parameter(name="shapes", schema="object") + process = UDF(code="myfancycode", runtime="Python") + result = cube.apply_polygon(polygons=polygons, process=process) + assert get_download_graph(result)["applypolygon1"] == { + "process_id": "apply_polygon", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "polygons": {"from_parameter": "shapes"}, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "runtime": "Python", "udf": "myfancycode"}, + "result": True, + } + } + }, + }, + } + + +def test_apply_polygon_path(con100: Connection): + cube = con100.load_collection("S2") + process = UDF(code="myfancycode", runtime="Python") + result = cube.apply_polygon(polygons="path/to/polygon.json", process=process) + assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == { + "readvector1": { + "process_id": "read_vector", + "arguments": {"filename": "path/to/polygon.json"}, + }, + "applypolygon1": { + "process_id": "apply_polygon", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "polygons": {"from_node": "readvector1"}, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "data": {"from_parameter": "data"}, + "runtime": "Python", + "udf": "myfancycode", + }, + "result": True, + } + } + }, + }, + }, + } + + +def test_apply_polygon_context(con100: Connection): + cube = con100.load_collection("S2") + polygons = shapely.geometry.Polygon([(0, 0), (1, 0), (0, 1), (0, 0)]) + process = UDF(code="myfancycode", runtime="Python") + result = cube.apply_polygon(polygons=polygons, process=process, context={"foo": 4}) + assert get_download_graph(result)["applypolygon1"] == { + "process_id": "apply_polygon", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "polygons": {"type": "Polygon", "coordinates": [[[0, 0], [1, 0], [0, 1], [0, 0]]]}, + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "runtime": "Python", "udf": "myfancycode"}, + "result": True, + } + } + }, "context": {"foo": 4}, }, - 'result': True, }