Skip to content

Commit

Permalink
Add tests for target encoder + bufixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ClaudioSalvatoreArcidiacono committed Dec 10, 2024
1 parent c34eae0 commit 284f525
Show file tree
Hide file tree
Showing 4 changed files with 472 additions and 70 deletions.
3 changes: 2 additions & 1 deletion sklearo/encoding/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .target import TargetEncoder
from .woe import WOEEncoder

__all__ = ["WOEEncoder"]
__all__ = ["WOEEncoder", "TargetEncoder"]
130 changes: 63 additions & 67 deletions sklearo/encoding/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,29 @@ def __init__(
nw.Categorical,
nw.String,
),
underrepresented_categories: Literal["raise", "fill"] = "raise",
fill_values_underrepresented: Sequence[int | float | None] = (
-999.0,
999.0,
),
unseen: Literal["raise", "ignore"] = "raise",
fill_value_unseen: int | float | None | Literal["mean"] = "mean",
missing_values: Literal["encode", "ignore", "raise"] = "encode",
type_of_target: Literal["auto", "binary", "multiclass", "continuous"] = "auto",
) -> None:
self.columns = columns
self.underrepresented_categories = underrepresented_categories
self.missing_values = missing_values
self.fill_values_underrepresented = fill_values_underrepresented or (None, None)
self.unseen = unseen
self.fill_value_unseen = fill_value_unseen
self.type_of_target = type_of_target

def _calculate_mean_target(
self, x_y: IntoFrameT, target_cols: Sequence[str], column: str
self, x_y: IntoFrameT, target_col: Sequence[str], column: str
) -> dict:
debug_df = x_y.to_native()
mean_target_all_categories = (
x_y.group_by(column)
.agg(nw.col(target_col).mean() for target_col in target_cols)
.rows(named=True)
x_y.group_by(column).agg(nw.col(target_col).mean()).rows(named=True)
)

if len(target_cols) == 1:
mean_target = {}
[target_column_name] = target_cols
for mean_target_per_category in mean_target_all_categories:
mean_target[mean_target_per_category[column]] = (
mean_target_per_category[target_column_name]
)
else:
mean_target = defaultdict(dict)
for target_column in target_cols:
class_ = target_column.split("_")[-1]
for mean_target_per_category in mean_target_all_categories:
mean_target[class_][mean_target_per_category[column]] = (
mean_target_per_category[target_column]
)
mean_target = dict(mean_target)
mean_target = {}
for mean_target_per_category in mean_target_all_categories:
mean_target[mean_target_per_category[column]] = mean_target_per_category[
target_col
]

return mean_target

Expand All @@ -78,8 +58,7 @@ def fit(self, X: IntoFrameT, y: IntoSeriesT) -> "TargetEncoder":
"""

self.columns_ = list(select_columns(X, self.columns))
if not self.columns_:
return self
self.encoding_map_ = {}

X = self._handle_missing_values(X)

Expand All @@ -106,48 +85,47 @@ def fit(self, X: IntoFrameT, y: IntoSeriesT) -> "TargetEncoder":
else:
self.is_zero_one_target_ = False

X = X[self.columns_]

if "target" in X.columns:
target_col_name = "__target__"

else:
target_col_name = "target"

X_y = X.with_columns(**{target_col_name: y})
if not self.columns_:
return self

X_y = X[self.columns_].with_columns(**{target_col_name: y})

if self.type_of_target_ == "multiclass":
unique_classes = y.unique().sort().to_list()
self.unique_classes_ = unique_classes

X_y = X_y.with_columns(
nw.when(nw.col(target_col_name) == class_)
.then(1)
.otherwise(0)
.alias(f"{target_col_name}_is_class_{class_}")
for class_ in unique_classes
)
target_cols = [
f"{target_col_name}_is_class_{class_}" for class_ in unique_classes
]

self.encoding_map_ = defaultdict(dict)
if self.unseen == "fill" and self.fill_value_unseen == "mean":
mean_targets = [X_y[target_cols].mean().rows(named=True)]
mean_target_per_class = {}
for target_col, class_ in zip(target_cols, unique_classes):
mean_target_per_class[class_] = mean_targets[target_col]
self.mean_target_ = mean_target_per_class
self.mean_target_ = {}
for class_ in unique_classes:
X_y_binarized = X_y.with_columns(
nw.when(nw.col(target_col_name) == class_)
.then(1)
.otherwise(0)
.alias(target_col_name)
)
for column in self.columns_:
debug_df = X_y_binarized[[column, target_col_name]].to_native()
self.encoding_map_[column][class_] = self._calculate_mean_target(
X_y_binarized[[column, target_col_name]],
target_col=target_col_name,
column=column,
)
if self.unseen == "fill" and self.fill_value_unseen == "mean":
self.mean_target_[class_] = X_y_binarized[target_col_name].mean()

else:
target_cols = [target_col_name]
if self.unseen == "fill" and self.fill_value_unseen == "mean":
self.mean_target_ = X_y[target_col_name].mean()

self.encoding_map_ = {}
for column in self.columns_:
self.encoding_map_[column] = self._calculate_mean_target(
X_y[target_cols + [column]], target_cols=target_cols, column=column
)
for column in self.columns_:
self.encoding_map_[column] = self._calculate_mean_target(
X_y[[column, target_col_name]],
target_col=target_col_name,
column=column,
)

self.feature_names_in_ = list(X.columns)
return self
Expand All @@ -157,7 +135,7 @@ def _transform_binary_continuous(
) -> IntoFrameT:
fill_value_unseen = (
self.fill_value_unseen
if self.fill_value_unseen != "mean"
if self.fill_value_unseen != "mean" or self.unseen != "fill"
else self.mean_target_
)
return X.with_columns(
Expand All @@ -177,11 +155,12 @@ def _transform_multiclass(
) -> IntoFrameT:
fill_value_unseen = (
{class_: self.fill_value_unseen for class_ in self.unique_classes_}
if self.fill_value_unseen != "mean"
if self.fill_value_unseen != "mean" or self.unseen != "fill"
else self.mean_target_
)
return X.with_columns(
nw.col(column).replace_strict(
nw.col(column)
.replace_strict(
{
**mapping,
**{
Expand All @@ -190,9 +169,24 @@ def _transform_multiclass(
},
}
)
.alias(f"{column}_mean_target_class_{class_}")
for column, class_mapping in self.encoding_map_.items()
for class_, mapping in class_mapping.items()
)
).drop(self.columns_)

@check_if_fitted
def get_feature_names_out(self) -> list[str]:
if self.type_of_target_ in ("binary", "continuous"):
return self.feature_names_in_

else: # multiclass
return [
feat for feat in self.feature_names_in_ if feat not in self.columns_
] + [
f"{column}_mean_target_class_{class_}"
for column in self.columns_
for class_ in self.unique_classes_
]

@nw.narwhalify
@check_if_fitted
Expand All @@ -205,12 +199,14 @@ def transform(self, X: IntoFrameT) -> IntoFrameT:
X = self._handle_missing_values(X)
unseen_per_col = {}
for column, mapping in self.encoding_map_.items():
if self.type_of_target_ in ("binary", "continuous"):
seen_categories = mapping.keys()
else:
seen_categories = next(iter(mapping.values())).keys()

uniques = X[column].unique()
unseen_cats = uniques.filter(
(
~uniques.is_in(next(iter(mapping.values())).keys())
& ~uniques.is_null()
)
(~uniques.is_in(seen_categories) & ~uniques.is_null())
).to_list()
if unseen_cats:
unseen_per_col[column] = unseen_cats
Expand Down
4 changes: 2 additions & 2 deletions sklearo/encoding/woe.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ def fit(self, X: IntoFrameT, y: IntoSeriesT) -> "WOEEncoder":
X (DataFrame): The input data.
y (Series): The target variable.
"""
self.feature_names_in_ = list(X.columns)
self.columns_ = list(select_columns(X, self.columns))

X = self._handle_missing_values(X)

self.feature_names_in_ = list(X.columns)
self.columns_ = list(select_columns(X, self.columns))
self.encoding_map_ = {}
self.is_zero_one_target_ = False
unique_classes = sorted(y.unique().to_list())
Expand Down
Loading

0 comments on commit 284f525

Please sign in to comment.