Source code for robpy.preprocessing.utils
import numpy as np
from typing import Callable
from scipy.stats import median_abs_deviation
[docs]
def wrapping_transformation(
X: np.ndarray,
b: float = 1.5,
c: float = 4.0,
q1: float = 1.540793,
q2: float = 0.8622731,
rescale: bool = True,
location_estimator: Callable[[np.ndarray, int], np.ndarray] = np.median,
scale_estimator: Callable[[np.ndarray, int], np.ndarray] = median_abs_deviation,
) -> np.ndarray:
"""
Implementation of the wrapping transformation using the following function:
.. math::
\\Psi_{b, c}(z) =
\\begin{cases}
z & \\text{if } \\ 0 \\leq |z| < b, \\\\
q_1 \\tanh\\left(q_2 (c - |z|)\\right) \\mathrm{sign}(z) & \\text{if } \\ b \\leq |z|
\\leq c, \\\\
0 & \\text{if } \\ c < |z|.
\\end{cases}
Args:
X (np.ndarray):
Data to be transformed, must have shape (N, D).
b (float, optional):
Lower cutoff. Defaults to 1.5.
c (float, optional):
Upper cutoff. Defaults to 4.0.
q1 (float, optional):
Transformation parameter (see formula). Defaults to 1.540793.
q2 (float, optional):
Transformation parameter (see formula). Defaults to 0.8622731.
rescale (bool, optional):
Whether to rescale the wrapped data such that the robust location and
scale of the transformed data are the same as the original data.
Defaults to True.
location_estimator (Callable[[np.ndarray, int], np.ndarray], optional):
Function to estimate the location of the data, should accept an array like input as
first value and a named argument axis. Defaults to np.median.
scale_estimator (Callable[[np.ndarray, int], np.ndarray], optional):
Function to estimate the scale of the data, should accept an array like input as
first value and a named argument axis. Defaults to median_abs_deviation.
Returns:
np.ndarray: The transformed data.
References:
- Raymaekers, J., & Rousseeuw, P. J. (2021). Fast robust correlation for
high-dimensional data. Technometrics, 63(2), 184-198.
"""
locations = location_estimator(X, axis=0)
scales = scale_estimator(X, axis=0)
scales_no_zero = np.where(scales == 0, 1, scales)
z = (X - locations) / scales_no_zero
z_wrapped = np.where(
np.abs(z) < b,
z,
np.where(np.abs(z) <= c, q1 * np.tanh(q2 * (c - np.abs(z))) * np.sign(z), 0),
)
if rescale:
z_wrapped_mean = np.mean(z_wrapped, axis=0)
z_wrapped_std = np.std(z_wrapped, axis=0)
z_wrapped_std_no_zero = np.where(z_wrapped_std == 0, 1, z_wrapped_std)
return (
z_wrapped * (scales / z_wrapped_std_no_zero)
+ locations
- (z_wrapped_mean * (scales / z_wrapped_std_no_zero))
)
else:
return z_wrapped * scales + locations