Source code for robpy.preprocessing.transfo
import numpy as np
import logging
from sklearn.base import (
BaseEstimator,
OneToOneFeatureMixin,
TransformerMixin,
)
from typing import Literal
from robpy.utils.rho import TukeyBisquare
from scipy.stats import median_abs_deviation, norm, chi2
from scipy.optimize import minimize_scalar
from robpy.univariate import HuberOneStepM
[docs]
class RobustPowerTransformer(OneToOneFeatureMixin, TransformerMixin, BaseEstimator):
def __init__(
self,
method: Literal["boxcox", "yeojohnson", "auto"] = "yeojohnson",
standardize: bool = True,
lambda_range: tuple[float, float] = (-4.0, 6.0),
quantile: float = 0.99,
nsteps: int = 2,
):
"""
Apply a robust power transformation using reweighted maximum likelihood to transform the
features closer to normality. Uses the Box-Cox or the Yeo-Johnson transformation.
Args:
method (Literal str, optional): Method used for the power transformation.
Can be "boxcox" for Box-Cox, "yeojohnson" for Yeo-Johnson, or "auto"
for the solution with the lowest objective function. Box-Cox can only be used for
strictly positive features. Defaults to "auto".
standardize (boolean, optional): Whether to standardize the features before and after
the power transformation. Defaults to True.
quantile (float, optional): Quantile used to calculate the weights. Defaults to 0.99.
nsteps (int, optional): Number of steps used in the reweighted maximum likelihood.
Defaults to 2.
References:
- Raymaekers, J., & Rousseeuw, P. J. (2024). Transforming variables to central
normality. Machine Learning, 113(8), 4953-4975.
"""
self.method = method
self.standardize = standardize
self.lambda_range = list(lambda_range)
self.quantile = quantile
self.nsteps = nsteps
self.logger = logging.getLogger("RobustPowerTransformer")
[docs]
def fit(self, x: np.ndarray):
"""Calculates lambda, the transformation parameter depending on the method.
Args:
x (np.ndarray): The data.
"""
self._get_method(x)
x_sorted = np.sort(x)
if self.method in ["boxcox", "auto"]:
lambda_boxcox, mu_rew_boxcox, sd_rew_boxcox, scale_boxcox = self._fit_boxcox(x_sorted)
if self.method == "auto":
crit_val_boxcox = self._robnormality(x, "boxcox", lambda_boxcox)
if self.method in ["yeojohnson", "auto"]:
(
lambda_yeojohnson,
mu_rew_yeojohnson,
sd_rew_yeojohnson,
loc_yeojohnson,
scale_yeojohnson,
) = self._fit_yeojohnson(x_sorted)
if self.method == "auto":
crit_val_yeojohnson = self._robnormality(x, "yeojohnson", lambda_yeojohnson)
if self.method == "auto":
if crit_val_boxcox < crit_val_yeojohnson:
self.method = "boxcox"
else:
self.method = "yeojohnson"
if self.method == "boxcox":
if self.standardize:
self.scale_pre = scale_boxcox
self.location_post = mu_rew_boxcox
self.scale_post = sd_rew_boxcox
self.lambda_rew = lambda_boxcox
else: # yeojohnson
if self.standardize:
self.location_pre = loc_yeojohnson
self.scale_pre = scale_yeojohnson
self.location_post = mu_rew_yeojohnson
self.scale_post = sd_rew_yeojohnson
self.lambda_rew = lambda_yeojohnson
return self
[docs]
def transform(self, x: np.ndarray) -> np.ndarray:
"""Transforms the data using the calculated lambda estimate and the corresponding method.
Args:
x (np.ndarray): The data.
"""
if self.method == "boxcox":
if np.min(x) <= 0:
raise ValueError(
"The data is not strictly positive. Box-Cox transformation "
"cannot be applied."
)
if self.standardize:
x = x / self.scale_pre
x = self._transf_boxcox(x, self.lambda_rew)[0]
if self.standardize:
x = (x - self.location_post) / self.scale_post
else: # yeojohnson
if self.standardize:
x = (x - self.location_pre) / self.scale_pre
x = self._transf_yeojohnson(x, self.lambda_rew)[0]
if self.standardize:
x = (x - self.location_post) / self.scale_post
return x
[docs]
def inverse_transform(self, x: np.ndarray) -> np.ndarray:
"""Transforms the data back using inverse Yeo-Johnson/Box-Cox. The previously fitted lambda
estimate and the corresponding method are used.
Args:
x (np.ndarray): The data.
"""
if self.method == "boxcox":
if self.standardize:
x = x * self.scale_post + self.location_post
x = self._handle_bounds(x)
x = self._inv_transf_boxcox(x, self.lambda_rew)
if self.standardize:
x = x * self.scale_pre
else: # yeojohnson
if self.standardize:
x = x * self.scale_post + self.location_post
x = self._handle_bounds(x)
x = self._inv_transf_yeojohnson(x, self.lambda_rew)
if self.standardize:
x = x * self.scale_pre + self.location_pre
return x
def _fit_boxcox(self, x: np.ndarray):
"""Fits the Box-Cox power transformation.
Args:
x (np.ndarray): The data.
"""
lambdarange = self.lambda_range
converged = False
while not converged:
if self.standardize:
scale_boxcox = np.median(x)
x = x / scale_boxcox
else:
scale_boxcox = None
lambda_raw = self._calculate_lambda_0(x, "boxcox_rect", lambdarange)
lambda_rew, mu_rew_boxcox, sd_rew_boxcox = self._reweighted_max_likelihood_robust(
x,
lambda_raw,
"boxcox",
lambdarange,
self.quantile,
self.nsteps,
)
converged = np.min(np.abs(lambda_rew - lambdarange)) > np.diff(lambdarange) * 0.05
if not converged:
lambdarange = 1 + (lambdarange - 1) * (
1 + (abs(lambda_rew - lambdarange) == min(abs(lambda_rew - lambdarange)))
)
return lambda_rew, mu_rew_boxcox, sd_rew_boxcox, scale_boxcox
def _fit_yeojohnson(self, x: np.ndarray):
"""Fits the Yeo-Johnson power transformation.
Args:
x (np.ndarray): The data.
"""
lambdarange = self.lambda_range
converged = False
while not converged:
if self.standardize:
loc_yeojohnson = np.median(x)
scale_yeojohnson = median_abs_deviation(x, scale="normal")
x = (x - loc_yeojohnson) / scale_yeojohnson
else:
loc_yeojohnson, scale_yeojohnson = None, None
lambda_raw = self._calculate_lambda_0(x, "yeojohnson_rect", lambdarange)
(
lambda_rew,
mu_rew_yeojohnson,
sd_rew_yeojohnson,
) = self._reweighted_max_likelihood_robust(
x,
lambda_raw,
"yeojohnson",
lambdarange,
self.quantile,
self.nsteps,
)
converged = np.min(np.abs(lambda_rew - lambdarange)) > np.diff(lambdarange) * 0.05
if not converged:
lambdarange = 1 + (lambdarange - 1) * (
1 + (abs(lambda_rew - lambdarange) == min(abs(lambda_rew - lambdarange)))
)
return lambda_rew, mu_rew_yeojohnson, sd_rew_yeojohnson, loc_yeojohnson, scale_yeojohnson
def _calculate_bounds(self):
"""Calculates the bounds of the range of the power transformation."""
lower_bound, upper_bound = None, None
if self.method == "boxcox" and self.lambda_rew > 0.0:
lower_bound = -1.0 / np.abs(self.lambda_rew)
elif self.method == "boxcox" and self.lambda_rew < 0.0:
upper_bound = 1.0 / np.abs(self.lambda_rew)
elif self.method == "yeojohnson" and self.lambda_rew > 2.0:
lower_bound = -1.0 / np.abs(self.lambda_rew - 2.0)
elif self.method == "yeojohnson" and self.lambda_rew < 0.0:
upper_bound = 1.0 / np.abs(self.lambda_rew)
return lower_bound, upper_bound
def _handle_bounds(self, x: np.ndarray):
lower_bound, upper_bound = self._calculate_bounds()
if lower_bound is not None and np.min(x) < lower_bound:
new_bound = 0.95 * lower_bound
self.logger.info(
f"Some (standardized) values in x are below the range of the {self.method} "
f"transformation. These values were replaced by {new_bound} so they "
"can be transformed back."
)
x = np.where(x < lower_bound, new_bound, x)
if upper_bound is not None and np.max(x) > upper_bound:
new_bound = 0.95 * upper_bound
self.logger.info(
f"Some (standardized) values in x are above the range of the {self.method} "
f"transformation. These values were replaced by {new_bound} so they "
"can be transformed back."
)
x = np.where(x > upper_bound, new_bound, x)
return x
def _transf_boxcox_rectified(
self, x: np.ndarray, my_lambda: float, standardize_too: bool = False
):
"""Rectified Box-Cox transformation."""
if np.min(x) <= 0:
raise ValueError(
"Data values should be strictly positive for a Box-Cox transformation."
)
xt = np.zeros_like(x)
if my_lambda == 1.0:
xt = x - 1.0
else:
changepoint = self._get_rectified_boxcox_changepoint(x, my_lambda)
if my_lambda > 1.0:
mask = x < changepoint
else:
mask = x > changepoint
if my_lambda == 0:
xt[~mask] = np.log(x[~mask])
changepoint_transf = np.log(changepoint)
xt[mask] = changepoint_transf + (x[mask] - changepoint) / changepoint
else:
xt[~mask] = (x[~mask] ** my_lambda - 1) / my_lambda
changepoint_transf = (changepoint**my_lambda - 1) / my_lambda
xt[mask] = changepoint_transf + (x[mask] - changepoint) * changepoint ** (
my_lambda - 1
)
if standardize_too:
loc_scale = HuberOneStepM().fit(xt)
zt = (xt - loc_scale.location) / loc_scale.scale
else:
zt = None
return xt, zt
def _get_rectified_boxcox_changepoint(
self, x: np.ndarray, my_lambda: float, factor: float = 1.5, eps: float = 1e-5
) -> float:
"""Get C_u or C_l for the rectified Box-Cox transform."""
n = len(x)
Q1 = x[int(np.ceil(n / 4.0) - 1.0)]
Q3 = x[int(n - np.ceil(n / 4.0))]
if my_lambda < 1:
changepoint = self._transf_boxcox(Q3, my_lambda)[0] * factor
elif my_lambda > 1:
changepoint = self._transf_boxcox(Q1, my_lambda)[0] * factor
else:
raise ValueError("There is no changepoint for lambda equals 1.")
if my_lambda < 0.0:
changepoint = min(changepoint, np.abs(1.0 / my_lambda) - eps)
elif my_lambda > 0.0:
changepoint = max(changepoint, -1.0 / my_lambda + eps)
changepoint = self._inv_transf_boxcox(changepoint, my_lambda)
changepoint = min(max(changepoint, x[0]), x[n - 1])
return changepoint
def _transf_boxcox(self, x: np.ndarray, my_lambda: float, standardize_too: bool = False):
"""Classical Box-Cox transformation."""
if np.min(x) <= 0:
raise ValueError(
"Data values should be strictly positive for a Box-Cox transformation."
)
if my_lambda == 0:
xt = np.log(x)
else:
xt = (x**my_lambda - 1) / my_lambda
if standardize_too:
loc_scale = HuberOneStepM().fit(xt)
zt = (xt - loc_scale.location) / loc_scale.scale
else:
zt = None
return xt, zt
def _transf_yeojohnson(self, x: np.ndarray, my_lambda: float, standardize_too: bool = False):
"""Classical Yeo-Johnson transformation."""
positive_mask = x >= 0.0
xt = np.zeros_like(x)
if my_lambda == 0.0:
xt[positive_mask] = np.log(1.0 + x[positive_mask])
xt[~positive_mask] = -((1 - x[~positive_mask]) ** (2 - my_lambda) - 1) / (2 - my_lambda)
elif my_lambda == 2.0:
xt[positive_mask] = ((1 + x[positive_mask]) ** my_lambda - 1) / my_lambda
xt[~positive_mask] = -np.log(1 - x[~positive_mask])
else:
xt[positive_mask] = ((1 + x[positive_mask]) ** my_lambda - 1) / my_lambda
xt[~positive_mask] = -((1 - x[~positive_mask]) ** (2 - my_lambda) - 1) / (2 - my_lambda)
if standardize_too:
loc_scale = HuberOneStepM().fit(xt)
zt = (xt - loc_scale.location) / loc_scale.scale
else:
zt = None
return xt, zt
def _transf_yeojohnson_rectified(
self, x: np.ndarray, my_lambda: float, standardize_too: bool = False
):
"""Rectified Yeo-Johnson transformation."""
xt = np.zeros_like(x)
if my_lambda == 1.0:
xt = x
else:
changepoint = self._get_rectified_yeojohnson_changepoint(x, my_lambda)
positive_mask = x >= 0.0
negative_mask = x < 0.0
if my_lambda > 1.0:
mask_lower = x < changepoint
mask_upper = (changepoint <= x) & (x < 0)
xt[positive_mask] = ((1 + x[positive_mask]) ** my_lambda - 1.0) / my_lambda
if my_lambda == 2:
xt[mask_upper] = -np.log(1 - x[mask_upper])
xt[mask_lower] = -np.log(1 - changepoint) + (
x[mask_lower] - changepoint
) * 1 / (1 - changepoint)
else:
xt[mask_upper] = -((1 - x[mask_upper]) ** (2 - my_lambda) - 1) / (2 - my_lambda)
changepoint_transf = self._transf_yeojohnson(changepoint, my_lambda)[0]
xt[mask_lower] = changepoint_transf + (x[mask_lower] - changepoint) * (
1 + np.abs(changepoint)
) ** (1 - my_lambda)
else:
mask_upper = x > changepoint
mask_lower = (x >= 0) & (x <= changepoint)
xt[negative_mask] = -((1 - x[negative_mask]) ** (2 - my_lambda) - 1) / (
2 - my_lambda
)
if my_lambda == 0.0:
xt[mask_lower] = np.log(1 + x[mask_lower])
xt[mask_upper] = np.log(1 + changepoint) + (x[mask_upper] - changepoint) * 1 / (
1 + changepoint
)
else:
xt[mask_lower] = ((1 + x[mask_lower]) ** my_lambda - 1) / my_lambda
changepoint_transf = self._transf_yeojohnson(changepoint, my_lambda)[0]
xt[mask_upper] = changepoint_transf + (x[mask_upper] - changepoint) * (
1 + np.abs(changepoint)
) ** (my_lambda - 1)
if standardize_too:
loc_scale = HuberOneStepM().fit(xt)
zt = (xt - loc_scale.location) / loc_scale.scale
else:
zt = None
return xt, zt
def _get_rectified_yeojohnson_changepoint(
self, x: np.ndarray, my_lambda: float, factor: float = 1.5, eps: float = 1e-5
) -> float:
"""Get C_u or C_l for the rectified Box-Cox transformation."""
n = len(x)
Q1 = x[int(np.ceil(n / 4.0) - 1.0)]
Q3 = x[int(n - np.ceil(n / 4.0))]
if my_lambda < 1:
changepoint = self._transf_yeojohnson(Q3, my_lambda)[0] * factor
elif my_lambda > 1:
changepoint = self._transf_yeojohnson(Q1, my_lambda)[0] * factor
else:
raise ValueError("There is no changepoint for lambda equals 1.")
if my_lambda < 0.0:
changepoint = min(changepoint, np.abs(1.0 / my_lambda) - eps)
elif my_lambda > 2.0:
changepoint = max(changepoint, (1.0 / (2 - my_lambda)) + eps)
changepoint = self._inv_transf_yeojohnson(changepoint, my_lambda)
changepoint = min(max(changepoint, x[0]), x[n - 1])
return changepoint
def _inv_transf_boxcox(self, x: np.ndarray, my_lambda: float) -> np.ndarray:
"""Classical Box-Cox transformation inversed."""
if my_lambda == 0:
xt = np.exp(x)
else:
xt = (x * my_lambda + 1) ** (1.0 / my_lambda)
return xt
def _inv_transf_yeojohnson(self, x: np.ndarray, my_lambda: float) -> np.ndarray:
"""Classical Yeo-Johnson transformation inversed."""
positive_mask = x >= 0.0
xt = np.zeros_like(x)
if my_lambda == 0.0:
xt[positive_mask] = np.exp(x[positive_mask]) - 1.0
xt[~positive_mask] = 1.0 - (1 + (my_lambda - 2) * x[~positive_mask]) ** (
1 / (2 - my_lambda)
)
elif my_lambda == 2.0:
xt[positive_mask] = (my_lambda * x[positive_mask] + 1) ** (1 / my_lambda) - 1
xt[~positive_mask] = 1 - np.exp(-x[~positive_mask])
else:
xt[positive_mask] = (my_lambda * x[positive_mask] + 1) ** (1 / my_lambda) - 1
xt[~positive_mask] = 1.0 - (1 + (my_lambda - 2) * x[~positive_mask]) ** (
1 / (2 - my_lambda)
)
return xt
def _robnormality(
self,
x: np.ndarray,
transf: Literal["boxcox_rect", "yeojohnson_rect", "boxcox", "yeojohnson"],
my_lambda: float,
) -> float:
"""Objective function of Equation (5) in Raymaekers, J., & Rousseeuw, P. J. (2024)."""
if not np.all(np.diff(x) >= 0):
x = np.sort(x)
if transf == "boxcox_rect":
x = self._transf_boxcox_rectified(x, my_lambda)[0]
elif transf == "yeojohnson_rect":
x = self._transf_yeojohnson_rectified(x, my_lambda)[0]
elif transf == "boxcox":
x = self._transf_boxcox(x, my_lambda)[0]
elif transf == "yeojohnson":
x = self._transf_yeojohnson(x, my_lambda)[0]
else:
raise NotImplementedError("Other transformations not yet implemented.")
x = x[~np.isnan(x)]
n = len(x)
loc_scale = HuberOneStepM().fit(x)
x = (x - loc_scale.location) / np.where(loc_scale.scale == 0, 1, loc_scale.scale)
theo_quantile = norm.ppf((np.arange(1, n + 1) - 1 / 3) / (n + 1 / 3))
obj = TukeyBisquare(c=0.5).rho(x - theo_quantile)
crit = np.sum(obj)
return crit
def _calculate_lambda_0(
self,
x: np.ndarray,
transf: Literal["boxcox_rect", "yeojohnson_rect"] = "boxcox_rect",
lambdarange: list[float] = [-4.0, 6.0],
):
"""Computes the initial estimate for lambda by optimizing the objective function."""
lambda_0 = minimize_scalar(
lambda lambdatemp: self._robnormality(x, transf, lambdatemp),
bounds=lambdarange,
method="bounded",
)
return lambda_0.x
def _reweighted_max_likelihood_robust(
self,
x: np.ndarray,
lambda_raw: float,
transf: Literal["boxcox", "yeojohnson"],
lambdarange: list[float] = [-4.0, 6.0],
quantile: float = 0.99,
nsteps: int = 2,
):
"""Computes the reweighted estimate for lambda by performing reweighted ML."""
if transf == "boxcox":
zt = self._transf_boxcox_rectified(x, lambda_raw, standardize_too=True)[1]
elif transf == "yeojohnson":
zt = self._transf_yeojohnson_rectified(x, lambda_raw, standardize_too=True)[1]
for _ in range(nsteps):
w = np.abs(zt) <= np.sqrt(chi2.ppf(quantile, 1))
lambda_rew = self._estimate_max_likelihood(x[w], transf, lambdarange)
if transf == "boxcox":
xt = self._transf_boxcox(x, lambda_rew, standardize_too=False)[0]
elif transf == "yeojohnson":
xt = self._transf_yeojohnson(x, lambda_rew, standardize_too=False)[0]
zt = (xt - np.mean(xt)) / np.std(xt)
mu_rew = np.mean(xt[w])
sd_rew = np.std(xt[w])
return lambda_rew, mu_rew, sd_rew
def _estimate_max_likelihood(
self,
x: np.ndarray,
transf: Literal["boxcox", "yeojohnson"],
lambdarange: list[float] = [-4.0, 6.0],
):
"""Computes an estimate for lambda by using ML."""
n = len(x)
if transf == "boxcox":
def obj_func(lambdatemp):
x_bc = self._transf_boxcox(x, lambdatemp)[0]
mu = np.mean(x_bc)
sigma2 = np.mean((x_bc - mu) ** 2)
return (n / 2) * np.log(sigma2) - (lambdatemp - 1) * np.sum(np.log(x))
elif transf == "yeojohnson":
def obj_func(lambdatemp):
x_yj = self._transf_yeojohnson(x, lambdatemp)[0]
mu = np.mean(x_yj)
sigma2 = np.mean((x_yj - mu) ** 2)
return (n / 2) * np.log(sigma2) - (lambdatemp - 1) * np.sum(
np.sign(x) * np.log(1 + np.abs(x))
)
else:
raise NotImplementedError("Wrong transformation.")
lambda_est = minimize_scalar(obj_func, bounds=lambdarange, method="bounded").x
return lambda_est
def _get_method(self, x: np.ndarray):
if self.method == "boxcox":
if np.min(x) <= 0:
raise ValueError("The data is not strictly positive. Box-Cox cannot be applied.")
elif self.method == "auto":
if np.min(x) <= 0:
self.method = "yeojohnson"
elif self.method != "yeojohnson":
raise ValueError(
'The only supported methods of transformation are "boxcox", "yeojohnson" or "auto".'
)
return self