Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polars TP #257

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ dask[dataframe]
pyarrow >= 14.0.1 # 14.0.0 has security vulnerability
osmium # has dependencies on `cmake` and `boost` which require brew install
tqdm
polars==0.20.13
-e .
196 changes: 184 additions & 12 deletions src/transport_performance/_metrics/tp_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Transport performance helper functions."""
import pathlib
import warnings
from typing import Union
import os

import polars as pl
import geopandas as gpd
import numpy as np
import pandas as pd
import pathlib
import warnings

from haversine import haversine_vector
from typing import Union
from numpy import arccos, cos, radians, sin


def _transport_performance_pandas(
Expand All @@ -18,7 +20,7 @@ def _transport_performance_pandas(
distance_threshold: float = 11.25,
sources_col: str = "from_id",
destinations_col: str = "to_id",
) -> pd.DataFrame:
) -> gpd.GeoDataFrame:
"""Calculate transport performance using pandas.

Parameters
Expand All @@ -45,7 +47,7 @@ def _transport_performance_pandas(

Returns
-------
pd.DataFrame
gpd.GeoDataFrame
Transport performance metrics, grouped by destination column IDs.

"""
Expand Down Expand Up @@ -128,12 +130,182 @@ def _transport_performance_pandas(
)

# merge on population geospatial data
perf_gdf = populations.merge(
perf_df,
left_on="id",
right_on=destinations_col,
how="right",
).drop([destinations_col], axis=1)
perf_gdf = (
populations.merge(
perf_df,
left_on="id",
right_on=destinations_col,
how="right",
)
.drop([destinations_col], axis=1)
.sort_values("id", ascending=True)
)

return perf_gdf


def _transport_performance_polars(
filepath_or_dirpath: pathlib.Path,
centroids: gpd.GeoDataFrame,
populations: gpd.GeoDataFrame,
travel_time_threshold: int = 45,
distance_threshold: float = 11.25,
sources_col: str = "from_id",
destinations_col: str = "to_id",
) -> gpd.GeoDataFrame:
"""Calculate transport performance using polars (and some pandas).

Parameters
----------
filepath_or_dirpath : Union[str, pathlib.Path]
File path or directory path to `analyse_network` output(s). Files must
be in '.parquet' format.
centroids : gpd.GeoDataFrame
Populations geodataframe containing centroid geometries.
populations : gpd.GeoDataFrame
Populations geodataframe containing population data and cell
geometries.
travel_time_threshold : int, optional
Maximum threshold for travel times, by default 45 (minutes). Used when
calculating accessibility.
distance_threshold : float, optional
Maximum threshold for source/destination distance, by default 11.25
(km). Used when calculating accessibility and proximity.
sources_col : str, optional
The sources column name in the travel time data, by default "from_id".
destinations_col : str, optional
The destinations column name in the travel time data, by default
"to_id".

Returns
-------
gpd.GeoDataFrame
Transport performance metrics, grouped by destination column IDs.

"""

def _haversine(lat1, lon1, lat2, lon2):
"""Haversine function for use with polars.

Description
-----------
Return an array of the haversine distance in KM. Assumes coordinates
are in degrees.
"""
return 6371 * arccos(
(sin(radians(lat1)) * sin(radians(lat2)))
+ cos(radians(lat1))
* cos(radians(lat2))
* cos(radians(lon2) - radians(lon1))
)

# create local copy before manipulation since `centroids` is a mutable
# dtype - create pass-by-value effect and won't impact input variable.
centroids_gdf = centroids.copy()
# make centroid coords individual columns
centroids_gdf["centroid_x"] = centroids_gdf["centroid"].apply(
lambda coord: coord.x
)
centroids_gdf["centroid_y"] = centroids_gdf["centroid"].apply(
lambda coord: coord.y
)
centroids_gdf.drop("centroid", axis=1, inplace=True)
# fix batch path if dir
if os.path.splitext(filepath_or_dirpath)[1] == "":
filepath_or_dirpath = os.path.join(filepath_or_dirpath, "*")
# create relevant polars LazyFrame's
batch_lf = (
pl.scan_parquet(filepath_or_dirpath)
.select([sources_col, destinations_col, "travel_time"])
.lazy()
)
pop_lf = pl.from_pandas(populations[["population", "id"]]).lazy()
centroids_lf = pl.from_pandas(centroids_gdf).lazy()
# combine for a faster join
cent_pop_lf = centroids_lf.join(
pop_lf.select({"id", "population"}), on="id", how="left"
)
# merge all datasetss
merged = (
batch_lf.select(pl.exclude("within_urban_centre"))
.join(
cent_pop_lf.select(
["id", "centroid_x", "centroid_y", "population"]
),
left_on=sources_col,
right_on="id",
how="left",
)
.rename(
{
"centroid_x": "from_centroid_x",
"centroid_y": "from_centroid_y",
"population": "from_population",
}
)
.join(
centroids_lf.select(["id", "centroid_x", "centroid_y"]),
left_on=destinations_col,
right_on="id",
how="left",
)
.rename({"centroid_x": "to_centroid_x", "centroid_y": "to_centroid_y"})
.with_columns(
_haversine(
pl.col("from_centroid_y"),
pl.col("from_centroid_x"),
pl.col("to_centroid_y"),
pl.col("to_centroid_x"),
).alias("dist")
)
)
# calculate accessible and proximity populations for TP
accessibility = (
merged.filter(pl.col("dist") <= distance_threshold)
.filter(pl.col("travel_time") <= travel_time_threshold)
.group_by(destinations_col)
.sum()
.select(destinations_col, "from_population")
.rename({"from_population": "accessible_population"})
)

proximity = (
merged.filter(pl.col("dist") <= distance_threshold)
.group_by(destinations_col)
.sum()
.select(destinations_col, "from_population")
.rename({"from_population": "proximity_population"})
)
# calculate TP and covert back to pandas
perf_df = (
(
accessibility.join(
proximity, on=destinations_col, validate="1:1"
).with_columns(
(
pl.col("accessible_population").truediv(
pl.col("proximity_population")
)
)
.mul(100)
.alias("transport_performance")
)
)
.collect()
.to_pandas()
)
# merge on population geospatial data
perf_gdf = (
populations.merge(
perf_df,
left_on="id",
right_on=destinations_col,
how="right",
)
.drop([destinations_col], axis=1)
.sort_values("id", ascending=True)
.reset_index(drop=True)
)

return perf_gdf

Expand Down
3 changes: 3 additions & 0 deletions tests/_metrics/metrics_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pathlib
import pandas as pd
import pytest
import shutil

from pyprojroot import here
from typing import Union
Expand Down Expand Up @@ -111,6 +112,8 @@ def expected_transport_performance() -> Union[
def multi_tt_fixture(tt_fixture, tmp_path) -> pathlib.Path:
"""Build a mock travel time input across multiple parquet files."""
multi_tt_path = os.path.join(tmp_path, "mock_multi_tt")
if os.path.exists(multi_tt_path):
shutil.rmtree(multi_tt_path)
os.makedirs(multi_tt_path)
tt = pd.read_parquet(tt_fixture)
for id in tt.to_id.unique():
Expand Down
82 changes: 82 additions & 0 deletions tests/_metrics/test_tp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pytest_lazyfixture import lazy_fixture

from transport_performance._metrics.tp_utils import (
_transport_performance_polars,
_transport_performance_pandas,
_transport_performance_stats,
)
Expand Down Expand Up @@ -93,6 +94,87 @@ def test__transport_performance_pandas_source_dest_cols(
assert_frame_equal(tp_df[test_subset_cols], expected_tp)


class TestTransportPerformancePolars:
"""Unit tests for _transport_performance_polars()."""

@pytest.mark.parametrize(
"tt_path",
[lazy_fixture("tt_fixture"), lazy_fixture("multi_tt_fixture")],
)
def test__transport_performance_polars(
self,
centroid_gdf_fixture,
pop_gdf_fixture,
tt_path,
expected_transport_performance,
) -> None:
"""Test main behaviour of _transport_performance_pandas().

Test with both single and multiple travel time input parquet files.

Parameters
----------
centroid_gdf_fixture
A mock centroid test fixture.
pop_gdf_fixture
A mock population test fixture.
tt_path
A path to mock travel time fixture(s).
expected_transport_performance
A mock travel time test fixture.

"""
# call transport_performance() using the test fixtures
tp_df = _transport_performance_polars(
tt_path,
centroid_gdf_fixture,
pop_gdf_fixture,
travel_time_threshold=3,
distance_threshold=0.11,
)

# upack expected results and confirm equivalence
test_subset_cols, expected_tp, _ = expected_transport_performance
assert_frame_equal(tp_df[test_subset_cols], expected_tp)

def test__transport_performance_polars_source_dest_cols(
self,
centroid_gdf_fixture,
pop_gdf_fixture,
change_tt_cols_fixture,
expected_transport_performance,
) -> None:
"""Test non default `sources_col` and `destinations_col`.

Parameters
----------
centroid_gdf_fixture
A mock centroid test fixture.
pop_gdf_fixture
A mock population test fixture.
change_tt_cols_fixture
A mock travel time fixture with alternative column names. See
`change_tt_cols_fixture` for more details.
expected_transport_performance
Expected transport performance results.

"""
# call transport_performance() using the test fixtures
tp_df = _transport_performance_polars(
change_tt_cols_fixture,
centroid_gdf_fixture,
pop_gdf_fixture,
sources_col="from",
destinations_col="to",
travel_time_threshold=3,
distance_threshold=0.11,
)

# upack expected results and confirm equivalence
test_subset_cols, expected_tp, _ = expected_transport_performance
assert_frame_equal(tp_df[test_subset_cols], expected_tp)


class TestTransportPerformanceStats:
"""Unit tests for `_transport_performance_stats()`."""

Expand Down
Loading