Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features/distances #694

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
241 changes: 241 additions & 0 deletions feature_engine/creation/distance_features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd

from feature_engine._base_transformers.base_numerical import BaseNumericalTransformer
from feature_engine._base_transformers.mixins import (
FitFromDictMixin,
GetFeatureNamesOutMixin,
)


class DistanceFeatures(
BaseNumericalTransformer, FitFromDictMixin, GetFeatureNamesOutMixin
):
"""
DistanceFeatures() computes the distance between pairs of columns containing
coordinates. The distance between two pairs of coordinates is computed using the
Haversine formula (or the great circle formula).

The Haversine formula is not the most precise way to compute the distance between
two points on the Earth. However, it is precise enough for our purposes and is fast.

DistanceFeatures() requires a list of column names of coordinates, i.e., a list of
lists of 4 elements, where each 4-list represents the column names of the pair of
coordinates for which we should compute the distance. Additionally, it is possible
to provide the names of the columns contaning the distances and chose if the
coordinate columns are dropped or not.

Missing data should be imputed before using this transformer.



Parameters
----------
coordinate_columns: List[List[Union[str, int]]],

output_column_names: List[Union[str, None]], default=None
List of names for the column with the computed distance. Note that the list
must have equal length to the `coordinate_columns` list. This is because the
transformer need to know which distance column has which name. If none, the
default names are generated.

drop_original: Optional[bool], default=False
If True, then the `coordinate_columns` columns are dropped. Otherwise,
they are left in the dataframe.

Attributes
----------
...

Methods
-------
fit:
Learns the variable's maximum values.

transform:
Compute the distance using the coordinates provided in the `coordinate_columns`.

References
----------
https://en.wikipedia.org/wiki/Haversine_formula

Examples
--------

>>> import pandas as pd
>>> from feature_engine.creation import DistanceFeatures
>>> X = pd.DataFrame({
'a_latitude': [0., 0., 46.948579],
'a_longitude': [0., 0., 7.436925],
'b_latitude': [0., 12.34, 59.91054],
'b_longitude': [0., 123.45, 10.752695],
})
>>> cf = DistanceFeatures(
coordinate_columns=[['a_latitude', 'a_longitude', 'b_latitude', 'b_longitude']],
output_column_names=['distance_between_a_and_b'],
drop_original=False,
)
>>> cf.fit(X)
>>> cf.transform(X)
a_latitude a_longitude b_latitude b_longitude distance_between_a_and_b
0 0. 0. 0. 0. 0.
1 0. 0. 12.34 123.45 13630.28
2 46.94 7.43 59.91 10.75 1457.49
"""

_EARTH_RADIUS: float = 6371.0 # radius of Earth in kms

def __init__(
self,
coordinate_columns: List[List[Union[str, int]]],
output_column_names: Optional[List[Union[str, None]]] = None,
drop_original: bool = False,
) -> None:

# the coordinate_columns variable is rewritten in this way to speed up
# computation later, i.e., to use vectorization
(self.a_latitudes, self.a_longitudes, self.b_latitudes, self.b_longitudes) = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the init, we check that the user enters allowed values, and then we assign those values to the parameters. We can't have any more functionality than that.

The unpacking needs to be in the fit() part of the class.

self._check_coordinate_columns(columns=coordinate_columns)
)
self.output_column_name = self._check_output_columns_names(
column_name=output_column_names,
coordinate_columns=coordinate_columns,
)

self.drop_original = self._check_drop_original(parameter=drop_original)

self.variables = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this parameter. I'd suggest using RelativeFeatures as template to model this class: https://github.com/VascoSch92/feature_engine/blob/e1e927625678ee73c5c3a9edcf79e955ff9c5e8e/feature_engine/creation/relative_features.py

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variables is a parameter that we have in all transformers, so I would stick to this name instead of using coordinate_columns

In short, let's replace coordinate_columns by variables.


def _check_drop_original(self, parameter: bool) -> bool:
if isinstance(parameter, bool) is False:
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"Expected boolean value for parameter `drop_original`, "
f"but got {parameter} with type {type(parameter)}"
)
return parameter

def _check_coordinate_columns(
self, columns: List[List[Union[str, int]]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this list of lists? I think it is just a list.

) -> Tuple[List[Union[str, int]], ...]:
if not columns:
raise ValueError("Empty list for `coordinate_columns` not allowed!")
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
for idx, coordinate_column in enumerate(columns):
if len(coordinate_column) != 4:
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
f"Needed 4 values to compute a distance, "
f"but got {len(coordinate_column)} columns \n"
f"at the index {idx} of the list coordinate columns."
)
return (
[coordinate[0] for coordinate in columns],
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
[coordinate[1] for coordinate in columns],
[coordinate[2] for coordinate in columns],
[coordinate[3] for coordinate in columns],
)

def _check_output_columns_names(
self,
column_name: Optional[List[Union[str, None]]],
coordinate_columns: List[List[Union[str, int]]],
) -> Optional[List[Union[str, None]]]:
if column_name is None:
return [f"distance_{c[0]}_{c[1]}_{c[2]}_{c[3]}" for c in coordinate_columns]
if len(column_name) != len(coordinate_columns):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this check, we are returning only 1 distance variable, so we don't need to check how many names, or we need to check that it is only 1 name

raise ValueError(
"Not enough output column names provided.\n "
f"Expected {len(coordinate_columns)} column names, "
f"but got {len(column_name)}."
)
return column_name

VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
def fit(self, X: pd.DataFrame):
# there is no fit for this transformer
super().fit(X)
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
return self

def transform(self, X: pd.DataFrame):
"""
Compute the distance on heart using the Haversine formula.

Parameters
----------
X: Pandas DataFrame of shame = [n_samples, n_features]
The data to be transformed.

Returns
-------
X_new: Pandas dataframe.
The original dataframe plus the distances between the given coordinates.
"""
X = self._check_transform_input_and_state(X)
X = self._check_lat_lon_columns_are_in_df(X)
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
X = self._check_correctness_of_coordinates(X)

self._compute_distance(X)

if self.drop_original:
X.drop(
columns=[
*self.a_latitudes,
*self.a_longitudes,
*self.b_latitudes,
*self.b_longitudes,
],
inplace=True,
)

return X

def _check_lat_lon_columns_are_in_df(self, X) -> pd.DataFrame:
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
coordinate_columns = [
*self.a_latitudes,
*self.a_longitudes,
*self.b_latitudes,
*self.b_longitudes,
]
if set(coordinate_columns).issubset(set(X.columns)) is False:
raise ValueError(
f"The columns {set(coordinate_columns).issubset(set(X.columns))} "
f"were not found in the dataframe."
)
return X

def _check_correctness_of_coordinates(self, X: pd.DataFrame) -> pd.DataFrame:
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
# recall that the latitude is a number between -90 and +90,
# while longitudes is between -180 and +180.
irregular_latitudes = (
(X[[*self.a_latitudes, *self.b_latitudes]].abs() > 90).sum().sum()
)
irregular_longitudes = (
(X[[*self.a_longitudes, *self.b_longitudes]].abs() > 180).sum().sum()
)

if irregular_latitudes > 0:
raise ValueError("The dataframe contains irregular latitudes")
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
elif irregular_longitudes > 0:
raise ValueError("The dataframe contains irregular longitudes")
else:
return X

def _compute_distance(self, X: pd.DataFrame):

# convert latitude and longitude in radians
phi_1 = np.radians(X[self.a_latitudes].to_numpy())
phi_2 = np.radians(X[self.b_latitudes].to_numpy())
lambda_1 = np.radians(X[self.a_longitudes].to_numpy())
lambda_2 = np.radians(X[self.b_longitudes].to_numpy())

# compute delta, i.e., difference, between radians
delta_phi = phi_2 - phi_1
delta_lambda = lambda_2 - lambda_1

# compute distance using Haversine formula
VascoSch92 marked this conversation as resolved.
Show resolved Hide resolved
inner_part = (
np.sin(delta_phi / 2) ** 2
+ np.cos(phi_1) * np.cos(phi_2) * np.sin(delta_lambda / 2) ** 2
)
X[self.output_column_name] = (
self._EARTH_RADIUS * 2 * np.arcsin(np.sqrt(inner_part))
)
Loading