Skip to content

Commit

Permalink
Add unit tests for utility functions (#4)
Browse files Browse the repository at this point in the history
* add python worfklow

* add tests for utils
  • Loading branch information
gmgeorg authored Apr 24, 2024
1 parent 033377c commit 9cab675
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 5 deletions.
3 changes: 0 additions & 3 deletions pypsps/tests/test_losses.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,5 @@ def test_end_to_end_dataset_model_fit():
assert preds.shape[0] == ks_data.n_samples

outcome_pred, scale_pred, weights, prop_score = utils.split_y_pred(preds)

preds_comb = np.hstack([outcome_pred, scale_pred, weights, prop_score])
np.testing.assert_allclose(preds, preds_comb)
ate = inference.predict_ate(model, inputs[0])
assert ate > 0
88 changes: 88 additions & 0 deletions pypsps/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Test utils"""

from typing import Tuple

import numpy as np
import pytest
import tensorflow as tf
import random

from .. import datasets
from ..keras import losses, models
from .. import utils, inference


def test_split_y_does_not_drop_columns():
np.random.seed(13)
ks_data = datasets.KangSchafer(true_ate=10).sample(n_samples=1000)
tf.random.set_seed(13)
model = models.build_toy_model(
n_states=5, n_features=ks_data.features.shape[1], compile=True
)
inputs, outputs = ks_data.to_keras_inputs_outputs()
preds = model.predict(inputs)

assert preds.shape[0] == ks_data.n_samples

outcome_pred, scale_pred, weights, prop_score = utils.split_y_pred(preds)
preds_comb = np.hstack([outcome_pred, scale_pred, weights, prop_score])
np.testing.assert_allclose(preds, preds_comb)


def test_agg_outcome_preds_works():
tf.random.set_seed(0)
random.seed(0)
np.random.seed(0)
ks_data = datasets.KangSchafer(true_ate=10).sample(n_samples=1000)
tf.random.set_seed(13)
model = models.build_toy_model(
n_states=5, n_features=ks_data.features.shape[1], compile=True
)
inputs, outputs = ks_data.to_keras_inputs_outputs()

_ = model.fit(
inputs,
outputs,
epochs=2,
batch_size=64,
verbose=2,
validation_split=0.2,
)

preds = model.predict(inputs)
assert preds.shape[0] == ks_data.n_samples

outcome_pred, _, weights, _ = utils.split_y_pred(preds)
avg_outcome = utils.agg_outcome_pred(preds)
assert avg_outcome.shape[0] == ks_data.n_samples

np.testing.assert_allclose(avg_outcome, (outcome_pred * weights).sum(axis=1))
cor_pred_true = np.corrcoef(avg_outcome, outputs[:, 0])
print(cor_pred_true)
assert cor_pred_true[0, 1] > 0.4


def test_prepare_keras_inputs_outputs():
random.seed(0)
np.random.seed(0)
ks_data = datasets.KangSchafer(true_ate=10).sample(n_samples=1000)

res = utils.prepare_keras_inputs_outputs(
ks_data.features, ks_data.treatments, ks_data.outcomes
)

res_direct = ks_data.to_keras_inputs_outputs()

assert len(res) == 2
assert len(res[0]) == 2

np.testing.assert_allclose(res[0][0], ks_data.features.values.astype("float32"))
np.testing.assert_allclose(res[0][1], ks_data.treatments.values)
np.testing.assert_allclose(
res[1][:, 0:1], ks_data.outcomes.values.astype("float32")
)

np.testing.assert_allclose(res[0][0], res_direct[0][0].astype("float32"))
np.testing.assert_allclose(
res[1].astype("float32"), res_direct[1].astype("float32")
)
30 changes: 28 additions & 2 deletions pypsps/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from typing import Tuple, Union
import numpy as np
import tensorflow as tf
import pandas as pd

_Y_PRED_DTYPE = Union[np.ndarray, tf.Tensor]
_DATA_DTYPE = Union[np.ndarray, pd.DataFrame]


def get_n_states(y_pred: _Y_PRED_DTYPE) -> int:
Expand Down Expand Up @@ -40,11 +42,35 @@ def agg_outcome_pred(y_pred: _Y_PRED_DTYPE) -> np.ndarray:
of outcome prediction in state j equals the state level weight of the causal
state simplex predictions.
"""
_, outcome_pred, _, weights = split_y_pred(y_pred)
outcome_pred, _, weights, _ = split_y_pred(y_pred)

if isinstance(weights, np.ndarray):
weighted_outcome = (weights * outcome_pred).sum(axis=1)[:, np.newaxis]
weighted_outcome = (weights * outcome_pred).sum(axis=1)
else:
weighted_outcome = tf.sum(weights * outcome_pred, axis=1)

return weighted_outcome


def prepare_keras_inputs_outputs(
features: _DATA_DTYPE, treatments: _DATA_DTYPE, outcomes: _DATA_DTYPE
) -> Tuple[Tuple[np.ndarray], np.ndarray]:
"""Prepares inputs/outputs for the keras model training and prediction interface."""
if isinstance(features, pd.DataFrame):
features = features.values
if isinstance(treatments, pd.DataFrame):
treatments = treatments.values
if outcomes is not None:
if isinstance(outcomes, pd.DataFrame):
outcomes = outcomes.values

input_data = [features.astype("float32"), treatments]
if outcomes is None:
output_data = None
else:
output_data = np.hstack([outcomes.astype("float32"), treatments])

return (
input_data,
output_data,
)

0 comments on commit 9cab675

Please sign in to comment.