diff --git a/requirements.txt b/requirements.txt index 336c0fc0..6994bea9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,4 +35,5 @@ dask[dataframe] pyarrow >= 14.0.1 # 14.0.0 has security vulnerability osmium # has dependencies on `cmake` and `boost` which require brew install tqdm +polars==0.20.13 -e . diff --git a/src/transport_performance/_metrics/tp_utils.py b/src/transport_performance/_metrics/tp_utils.py index 574f3677..2a746e56 100644 --- a/src/transport_performance/_metrics/tp_utils.py +++ b/src/transport_performance/_metrics/tp_utils.py @@ -1,13 +1,15 @@ """Transport performance helper functions.""" +import pathlib +import warnings +from typing import Union +import os +import polars as pl import geopandas as gpd import numpy as np import pandas as pd -import pathlib -import warnings - from haversine import haversine_vector -from typing import Union +from numpy import arccos, cos, radians, sin def _transport_performance_pandas( @@ -18,7 +20,7 @@ def _transport_performance_pandas( distance_threshold: float = 11.25, sources_col: str = "from_id", destinations_col: str = "to_id", -) -> pd.DataFrame: +) -> gpd.GeoDataFrame: """Calculate transport performance using pandas. Parameters @@ -45,7 +47,7 @@ def _transport_performance_pandas( Returns ------- - pd.DataFrame + gpd.GeoDataFrame Transport performance metrics, grouped by destination column IDs. """ @@ -128,12 +130,182 @@ def _transport_performance_pandas( ) # merge on population geospatial data - perf_gdf = populations.merge( - perf_df, - left_on="id", - right_on=destinations_col, - how="right", - ).drop([destinations_col], axis=1) + perf_gdf = ( + populations.merge( + perf_df, + left_on="id", + right_on=destinations_col, + how="right", + ) + .drop([destinations_col], axis=1) + .sort_values("id", ascending=True) + ) + + return perf_gdf + + +def _transport_performance_polars( + filepath_or_dirpath: pathlib.Path, + centroids: gpd.GeoDataFrame, + populations: gpd.GeoDataFrame, + travel_time_threshold: int = 45, + distance_threshold: float = 11.25, + sources_col: str = "from_id", + destinations_col: str = "to_id", +) -> gpd.GeoDataFrame: + """Calculate transport performance using polars (and some pandas). + + Parameters + ---------- + filepath_or_dirpath : Union[str, pathlib.Path] + File path or directory path to `analyse_network` output(s). Files must + be in '.parquet' format. + centroids : gpd.GeoDataFrame + Populations geodataframe containing centroid geometries. + populations : gpd.GeoDataFrame + Populations geodataframe containing population data and cell + geometries. + travel_time_threshold : int, optional + Maximum threshold for travel times, by default 45 (minutes). Used when + calculating accessibility. + distance_threshold : float, optional + Maximum threshold for source/destination distance, by default 11.25 + (km). Used when calculating accessibility and proximity. + sources_col : str, optional + The sources column name in the travel time data, by default "from_id". + destinations_col : str, optional + The destinations column name in the travel time data, by default + "to_id". + + Returns + ------- + gpd.GeoDataFrame + Transport performance metrics, grouped by destination column IDs. + + """ + + def _haversine(lat1, lon1, lat2, lon2): + """Haversine function for use with polars. + + Description + ----------- + Return an array of the haversine distance in KM. Assumes coordinates + are in degrees. + """ + return 6371 * arccos( + (sin(radians(lat1)) * sin(radians(lat2))) + + cos(radians(lat1)) + * cos(radians(lat2)) + * cos(radians(lon2) - radians(lon1)) + ) + + # create local copy before manipulation since `centroids` is a mutable + # dtype - create pass-by-value effect and won't impact input variable. + centroids_gdf = centroids.copy() + # make centroid coords individual columns + centroids_gdf["centroid_x"] = centroids_gdf["centroid"].apply( + lambda coord: coord.x + ) + centroids_gdf["centroid_y"] = centroids_gdf["centroid"].apply( + lambda coord: coord.y + ) + centroids_gdf.drop("centroid", axis=1, inplace=True) + # fix batch path if dir + if os.path.splitext(filepath_or_dirpath)[1] == "": + filepath_or_dirpath = os.path.join(filepath_or_dirpath, "*") + # create relevant polars LazyFrame's + batch_lf = ( + pl.scan_parquet(filepath_or_dirpath) + .select([sources_col, destinations_col, "travel_time"]) + .lazy() + ) + pop_lf = pl.from_pandas(populations[["population", "id"]]).lazy() + centroids_lf = pl.from_pandas(centroids_gdf).lazy() + # combine for a faster join + cent_pop_lf = centroids_lf.join( + pop_lf.select({"id", "population"}), on="id", how="left" + ) + # merge all datasetss + merged = ( + batch_lf.select(pl.exclude("within_urban_centre")) + .join( + cent_pop_lf.select( + ["id", "centroid_x", "centroid_y", "population"] + ), + left_on=sources_col, + right_on="id", + how="left", + ) + .rename( + { + "centroid_x": "from_centroid_x", + "centroid_y": "from_centroid_y", + "population": "from_population", + } + ) + .join( + centroids_lf.select(["id", "centroid_x", "centroid_y"]), + left_on=destinations_col, + right_on="id", + how="left", + ) + .rename({"centroid_x": "to_centroid_x", "centroid_y": "to_centroid_y"}) + .with_columns( + _haversine( + pl.col("from_centroid_y"), + pl.col("from_centroid_x"), + pl.col("to_centroid_y"), + pl.col("to_centroid_x"), + ).alias("dist") + ) + ) + # calculate accessible and proximity populations for TP + accessibility = ( + merged.filter(pl.col("dist") <= distance_threshold) + .filter(pl.col("travel_time") <= travel_time_threshold) + .group_by(destinations_col) + .sum() + .select(destinations_col, "from_population") + .rename({"from_population": "accessible_population"}) + ) + + proximity = ( + merged.filter(pl.col("dist") <= distance_threshold) + .group_by(destinations_col) + .sum() + .select(destinations_col, "from_population") + .rename({"from_population": "proximity_population"}) + ) + # calculate TP and covert back to pandas + perf_df = ( + ( + accessibility.join( + proximity, on=destinations_col, validate="1:1" + ).with_columns( + ( + pl.col("accessible_population").truediv( + pl.col("proximity_population") + ) + ) + .mul(100) + .alias("transport_performance") + ) + ) + .collect() + .to_pandas() + ) + # merge on population geospatial data + perf_gdf = ( + populations.merge( + perf_df, + left_on="id", + right_on=destinations_col, + how="right", + ) + .drop([destinations_col], axis=1) + .sort_values("id", ascending=True) + .reset_index(drop=True) + ) return perf_gdf diff --git a/tests/_metrics/metrics_fixtures.py b/tests/_metrics/metrics_fixtures.py index 22806c79..72cd2a01 100644 --- a/tests/_metrics/metrics_fixtures.py +++ b/tests/_metrics/metrics_fixtures.py @@ -14,6 +14,7 @@ import pathlib import pandas as pd import pytest +import shutil from pyprojroot import here from typing import Union @@ -111,6 +112,8 @@ def expected_transport_performance() -> Union[ def multi_tt_fixture(tt_fixture, tmp_path) -> pathlib.Path: """Build a mock travel time input across multiple parquet files.""" multi_tt_path = os.path.join(tmp_path, "mock_multi_tt") + if os.path.exists(multi_tt_path): + shutil.rmtree(multi_tt_path) os.makedirs(multi_tt_path) tt = pd.read_parquet(tt_fixture) for id in tt.to_id.unique(): diff --git a/tests/_metrics/test_tp_utils.py b/tests/_metrics/test_tp_utils.py index e08a3e2f..aec4acf8 100644 --- a/tests/_metrics/test_tp_utils.py +++ b/tests/_metrics/test_tp_utils.py @@ -4,6 +4,7 @@ from pytest_lazyfixture import lazy_fixture from transport_performance._metrics.tp_utils import ( + _transport_performance_polars, _transport_performance_pandas, _transport_performance_stats, ) @@ -93,6 +94,87 @@ def test__transport_performance_pandas_source_dest_cols( assert_frame_equal(tp_df[test_subset_cols], expected_tp) +class TestTransportPerformancePolars: + """Unit tests for _transport_performance_polars().""" + + @pytest.mark.parametrize( + "tt_path", + [lazy_fixture("tt_fixture"), lazy_fixture("multi_tt_fixture")], + ) + def test__transport_performance_polars( + self, + centroid_gdf_fixture, + pop_gdf_fixture, + tt_path, + expected_transport_performance, + ) -> None: + """Test main behaviour of _transport_performance_pandas(). + + Test with both single and multiple travel time input parquet files. + + Parameters + ---------- + centroid_gdf_fixture + A mock centroid test fixture. + pop_gdf_fixture + A mock population test fixture. + tt_path + A path to mock travel time fixture(s). + expected_transport_performance + A mock travel time test fixture. + + """ + # call transport_performance() using the test fixtures + tp_df = _transport_performance_polars( + tt_path, + centroid_gdf_fixture, + pop_gdf_fixture, + travel_time_threshold=3, + distance_threshold=0.11, + ) + + # upack expected results and confirm equivalence + test_subset_cols, expected_tp, _ = expected_transport_performance + assert_frame_equal(tp_df[test_subset_cols], expected_tp) + + def test__transport_performance_polars_source_dest_cols( + self, + centroid_gdf_fixture, + pop_gdf_fixture, + change_tt_cols_fixture, + expected_transport_performance, + ) -> None: + """Test non default `sources_col` and `destinations_col`. + + Parameters + ---------- + centroid_gdf_fixture + A mock centroid test fixture. + pop_gdf_fixture + A mock population test fixture. + change_tt_cols_fixture + A mock travel time fixture with alternative column names. See + `change_tt_cols_fixture` for more details. + expected_transport_performance + Expected transport performance results. + + """ + # call transport_performance() using the test fixtures + tp_df = _transport_performance_polars( + change_tt_cols_fixture, + centroid_gdf_fixture, + pop_gdf_fixture, + sources_col="from", + destinations_col="to", + travel_time_threshold=3, + distance_threshold=0.11, + ) + + # upack expected results and confirm equivalence + test_subset_cols, expected_tp, _ = expected_transport_performance + assert_frame_equal(tp_df[test_subset_cols], expected_tp) + + class TestTransportPerformanceStats: """Unit tests for `_transport_performance_stats()`."""