import logging
import numpy as np
import math as math
from dataclasses import dataclass
from scipy.linalg import sqrtm
from scipy.stats import chi2, gamma, rankdata, norm
from robpy.covariance.base import RobustCovarianceEstimator
from robpy.utils.distance import mahalanobis_distance
from robpy.utils.logging import get_logger
from robpy.univariate.qn import QnEstimator
from robpy.covariance.ogk import OGKEstimator
from robpy.univariate.tau import TauEstimator
[docs]
@dataclass
class HSubset:
indices: np.ndarray
location: np.ndarray
scale: np.ndarray
determinant: float
n_c_steps: int = 0
[docs]
class FastMCDEstimator(RobustCovarianceEstimator):
def __init__(
self,
*,
h_size: float | int | None = None,
n_initial_subsets: int = 500,
n_initial_c_steps: int = 2,
n_best_subsets: int = 10,
n_partitions: int | None = None,
tolerance: float = 1e-8,
correct_covariance: bool = True,
reweighting: bool = True,
verbosity: int = logging.WARNING,
store_precision=True,
assume_centered=False,
):
"""
Fast MCD estimator based on the algorithm proposed in Rousseeuw and Van Driessen (1999)
Args:
h_size (int | None, optional):
size of the h subset.
If an integer between n/2 and n is passed, it is interpreted as an absolute value.
If a float between 0.5 and 1 is passed, it is interpreted as a proportation
of n (the training set size).
If None, it is set to (n+p+1) / 2.
Defaults to None.
n_initial_subsets (int, optional):
number of initial random subsets of size p+1
n_initial_c_steps (int, optional):
number of initial c steps to perform on all initial subsets
n_best_subsets (int, optional):
number of best subsets to keep and perform c steps on until convergence
n_partitions (int, optional):
Number of partitions to split the data into.
This can speed up the algorithm for large datasets (n > 600 suggested in paper)
If None, 5 partitions are used if n > 600, otherwise 1 partition is used.
tolerance (float, optional):
Minimum difference in determinant between two iterations to stop the C-step
correct_covariance (bool, optional):
Whether to apply a consistency correction to the raw covariance estimate
reweighting (bool, optional):
Whether to apply reweighting to the raw covariance estimate
References:
Rousseeuw and Van Driessen, A Fast Algorithm for the Minimum Covariance Determinant
Estimator, 1999, American Statistical Association and
the American Society for Quality, TECHNOMETRICS
"""
super().__init__(store_precision=store_precision, assume_centered=assume_centered)
self.h_size = h_size
self.n_initial_subsets = n_initial_subsets
self.n_initial_c_steps = n_initial_c_steps
self.n_best_subsets = n_best_subsets
self.n_partitions = n_partitions
self.tolerance = tolerance
self.correct_covariance = correct_covariance
self.reweighting = reweighting
self.logger = get_logger("FastMCDEstimator", level=verbosity)
self.verbosity = verbosity
[docs]
def calculate_covariance(self, X) -> np.ndarray:
if self.h_size == 1 or self.h_size == X.shape[0]:
self.logger.warning(f"Default covariance is returned as h_size is {self.h_size}.")
self.location_ = X.mean(0)
return np.cov(X, rowvar=False)
# partition data (n_partitions > 1 can speed up algorithm for large datasets)
partitions = self._partition_data(X)
self.logger.info(f"Partitioned data into {len(partitions)} partitions")
n_initial_subsets = self.n_initial_subsets // len(partitions)
best_subsets = []
# perform initial c steps on all initial subsets
for data in partitions:
subsets = self._get_initial_subsets(data, n_initial_subsets)
subsets = [
self._perform_multiple_c_steps(subset, data, n_iterations=self.n_initial_c_steps)
for subset in subsets
]
best_subsets.extend(sorted(subsets, key=lambda x: x.determinant)[: self.n_best_subsets])
self.logger.info(f"Selecting {self.n_best_subsets} best subsets from {len(best_subsets)}")
# perform additional c-steps on the best subsets
best_subsets = [
self._perform_multiple_c_steps(subset, X, n_iterations=self.n_initial_c_steps)
for subset in best_subsets
]
best_subsets = sorted(best_subsets, key=lambda x: x.determinant)[: self.n_best_subsets]
best_subset = best_subsets[0] # reference subset
# perform c-steps until convergence
for subset in best_subsets:
while True:
new_subset = self._perform_c_step(subset, X)
if (
np.all(new_subset.indices == subset.indices)
or (new_subset.determinant - subset.determinant) < self.tolerance
):
break
subset = new_subset
if subset.determinant < best_subset.determinant:
best_subset = subset
self.best_subset = best_subset
# post processing
self.location_ = best_subset.location
scale = best_subset.scale
if self.correct_covariance:
self.logger.debug("Applying consistency correction to the raw covariance estimate")
scale = self._correct_covariance(X, best_subset)
if self.reweighting:
self.logger.debug("Applying reweighting to the raw covariance estimate")
distances = mahalanobis_distance(X, best_subset.location, scale)
self._mask = distances < np.sqrt(chi2.ppf(0.975, X.shape[1]))
self.location_ = X[self._mask].mean(axis=0)
scale = np.cov(X[self._mask], rowvar=False)
if self.correct_covariance:
# See Croux & Haesbroeck 1999 and R code for robustbase>covMcd>MCDCons
# (https://rdrr.io/cran/robustbase/src/R/covMcd.R)
self.logger.debug("Applying consistency correction after reweighting.")
factor = 1 / (
gamma.cdf((chi2.ppf(0.975, X.shape[1]) / 2), X.shape[1] / 2 + 1) / 0.975
)
scale *= factor
return scale
def _correct_covariance(self, X: np.ndarray, best_subset: HSubset) -> np.ndarray:
self._rescale_factor = np.median(
np.square(mahalanobis_distance(X, best_subset.location, best_subset.scale))
) / chi2.ppf(0.5, X.shape[1])
return self._rescale_factor * best_subset.scale
def _perform_multiple_c_steps(
self, subset: HSubset, X: np.ndarray, n_iterations: int
) -> HSubset:
for _ in range(n_iterations):
subset = self._perform_c_step(subset, X)
return subset
def _perform_c_step(self, subset: HSubset, X: np.ndarray) -> HSubset:
"""
Perform a single C-step on the subset of the data
Args:
subset_idx: indices of the current subset
X: data
Returns:
indices for new h subset
"""
# Calculate the Mahalanobis distances
mahalanobis = mahalanobis_distance(X, subset.location, subset.scale)
# Find the h_size smallest distances
h = self._get_h(X)
idx = np.argsort(mahalanobis)[:h]
return self._get_subset(indices=idx, X=X, n_c_steps=subset.n_c_steps + 1)
def _get_h(self, X: np.ndarray) -> int:
if self.h_size is None:
return int((X.shape[0] + X.shape[1] + 1) / 2)
elif isinstance(self.h_size, int) and (X.shape[0] / 2 <= self.h_size <= X.shape[0]):
return self.h_size
elif (isinstance(self.h_size, float) and (0.5 <= self.h_size <= 1)) or self.h_size == 1:
return int(self.h_size * X.shape[0])
else:
raise ValueError(
f"h_size must be an integer between n/2 ({X.shape[0] // 2}) and n ({X.shape[0]}) or"
f" a float between 0.5 and 1, but received {self.h_size}."
)
def _partition_data(self, X: np.ndarray) -> list[np.ndarray]:
if self.n_partitions is None:
n_partitions = 5 if X.shape[0] > 600 else 1
else:
n_partitions = self.n_partitions
return np.array_split(X, n_partitions)
def _get_subset(
self,
indices: np.ndarray,
X: np.ndarray,
n_c_steps: int = 0,
ensure_non_singular: bool = False,
) -> HSubset:
"""
Construct an HSubset from a set of data indices and calculate location, scale and
determinant.
Args:
- indices: data indices
- X: complete dataset
- n_c_steps: will be passed directly to the HSubset
- ensure_non_singular: whether to resample in case the determinant is 0 (relevant for
sampling initial subsets)
"""
mu = X[indices].mean(axis=0)
cov = np.cov(X[indices], rowvar=False)
det = np.linalg.det(cov)
if ensure_non_singular:
while math.isclose(det, 0):
new_index = np.random.choice(np.delete(np.arange(X.shape[0]), indices))
indices = np.append(indices, new_index)
mu = X[indices].mean(axis=0)
cov = np.cov(X[indices], rowvar=False)
det = np.linalg.det(cov)
return HSubset(indices, mu, cov, det, n_c_steps=n_c_steps)
def _get_initial_subsets(self, X: np.ndarray, n_subsets: int) -> list[HSubset]:
return [
self._get_subset(
indices=np.random.choice(X.shape[0], X.shape[1] + 1, replace=False),
X=X,
ensure_non_singular=True,
)
for _ in range(n_subsets)
]
[docs]
class DetMCDEstimator(RobustCovarianceEstimator):
def __init__(
self,
*,
h_size: float | int | None = None,
tolerance: float = 1e-8,
correct_covariance: bool = True,
reweighting: bool = True,
verbosity: int = logging.WARNING,
):
"""
Deterministic MCD estimator (DetMCD) based on the algorithm proposed in
Hubert, Rousseeuw and Verdonck (2012)
Args:
h_size (int | None, optional):
size of the h subset.
If an integer between n/2 and n is passed, it is interpreted as an absolute value.
If a float between 0.5 and 1 is passed, it is interpreted as a proportation
of n (the training set size).
If None, it is set to (n+p+1) / 2.
Defaults to None.
tolerance (float, optional):
Minimum difference in determinant between two iterations to stop the C-step
correct_covariance (bool, optional):
Whether to apply a consistency correction to the raw covariance estimate
reweighting (bool, optional):
Whether to apply reweighting to the raw covariance estimate
References:
Hubert, Rousseeuw and Verdonck, A deterministic algorithm for robust location
and scatter, 2012, Journal of Computational and Graphical Statistics
"""
super().__init__()
self.h_size = h_size
self.tolerance = tolerance
self.correct_covariance = correct_covariance
self.reweighting = reweighting
self.logger = get_logger("DetMCDEstimator", level=verbosity)
self.verbosity = verbosity
[docs]
def calculate_covariance(self, X: np.ndarray) -> np.ndarray:
if self.h_size == 1 or self.h_size == X.shape[0]:
self.logger.warning(f"Default covariance is returned as h_size is {self.h_size}.")
self.location_ = X.mean(0)
return np.cov(X, rowvar=False)
n, p = X.shape
# Step 0: standardize X
if n < 1000:
Z = (X - np.median(X, axis=0)) / self._Qn_scale(X)
else:
Z = (X - np.median(X, axis=0)) / self._tau_scale(X)
# Steps 1-3:
best_subsets = self._get_initial_best_subsets(Z, X, n, p)
# Step 4: C-steps until convergence
best_subset = best_subsets[0] # reference subset
for subset in best_subsets:
while True:
new_subset = self._perform_c_step(subset, X)
if (
np.all(new_subset.indices == subset.indices)
or (new_subset.determinant - subset.determinant) < self.tolerance
):
break
subset = new_subset
if subset.determinant < best_subset.determinant:
best_subset = subset
self.best_subset = best_subset
# Step 5: post processing
self.location_ = best_subset.location
scale = best_subset.scale
if self.correct_covariance:
self.logger.debug("Applying consistency correction to the raw covariance estimate")
scale = self._correct_covariance(X, best_subset)
if self.reweighting:
self.logger.debug("Applying reweighting to the raw covariance estimate")
distances = mahalanobis_distance(X, best_subset.location, scale)
self._mask = distances < np.sqrt(chi2.ppf(0.975, X.shape[1]))
self.location_ = X[self._mask].mean(axis=0)
scale = np.cov(X[self._mask], rowvar=False)
if self.correct_covariance:
# See Croux & Haesbroeck 1999 and R code for robustbase>covMcd>MCDCons
# (https://rdrr.io/cran/robustbase/src/R/covMcd.R)
self.logger.debug("Applying consistency correction after reweighting.")
factor = 1 / (
gamma.cdf((chi2.ppf(0.975, X.shape[1]) / 2), X.shape[1] / 2 + 1) / 0.975
)
scale *= factor
return scale
def _Qn_scale(self, X: np.ndarray, axis=0):
if X.ndim == 1:
return QnEstimator().fit(X).scale
elif axis == 0:
return [QnEstimator().fit(col).scale for col in X.T]
elif axis == 1:
return [QnEstimator().fit(col).scale for col in X]
else:
raise ValueError(f"axis {axis} not supported")
def _tau_scale(self, X: np.ndarray, axis=0):
if X.ndim == 1:
return TauEstimator().fit(X).scale
elif axis == 0:
return [TauEstimator().fit(col).scale for col in X.T]
elif axis == 1:
return [TauEstimator().fit(col).scale for col in X]
else:
raise ValueError(f"axis {axis} not supported")
def _get_initial_best_subsets(self, Z: np.ndarray, X: np.ndarray, n: int, p: int):
# Step 1: construct 6 preliminary estimates Sk of covariance or correlation
Y = np.tanh(Z)
S1 = np.corrcoef(Y, rowvar=False)
R = rankdata(Z, axis=0)
S2 = np.corrcoef(R, rowvar=False)
S3 = np.corrcoef(norm.ppf((R - 1 / 3) / (n + 1 / 3)), rowvar=False)
znorm = np.sqrt(np.sum(Z * Z, axis=1))
w = 1 / znorm
S4 = np.dot((Z * w[:, np.newaxis]).T, (Z * w[:, np.newaxis])) / n
idx = np.argsort(znorm)[: math.ceil(n / 2)]
S5 = np.cov(Z[idx, :], rowvar=False)
S6 = OGKEstimator(
location_estimator=np.median, scale_estimator=self._Qn_scale, reweighting=False
).calculate_covariance(Z)
estimates_S = [S1, S2, S3, S4, S5, S6]
# Step 2: construct 6 initial location and scatter estimates
estimates_sigma = []
estimates_mu = []
for S in estimates_S:
_, E = np.linalg.eigh(S)
E = E[:, np.arange(p - 1, -1, -1)]
B = Z @ E
L = np.diag(np.power(self._Qn_scale(B), 2))
cov = E @ L @ E.T
estimates_sigma.append(cov)
root_cov = sqrtm(cov)
inv_root_cov = np.linalg.inv(root_cov)
mu = root_cov @ np.median(Z @ inv_root_cov, axis=0)
estimates_mu.append(mu)
# Step 3: calculate statistical distances
best_subsets = []
for mu, cov in zip(estimates_mu, estimates_sigma):
idx_h0 = np.argsort(mahalanobis_distance(Z, mu, cov))[: math.ceil(n / 2)]
H = self._get_subset(idx_h0, X) # h0
H = self._perform_c_step(H, X) # h
best_subsets.append(H)
return best_subsets
# TODO: de functies hieronder moeten nog naar ergens anders.
def _get_h(self, X: np.ndarray) -> int:
if self.h_size is None:
return int((X.shape[0] + X.shape[1] + 1) / 2)
elif isinstance(self.h_size, int) and (X.shape[0] / 2 <= self.h_size <= X.shape[0]):
return self.h_size
elif (isinstance(self.h_size, float) and (0.5 <= self.h_size <= 1)) or self.h_size == 1:
return int(self.h_size * X.shape[0])
else:
raise ValueError(
f"h_size must be an integer between n/2 ({X.shape[0] // 2}) and n ({X.shape[0]}) or"
f" a float between 0.5 and 1, but received {self.h_size}."
)
def _get_subset(
self,
indices: np.ndarray,
X: np.ndarray,
n_c_steps: int = 0,
ensure_non_singular: bool = False,
) -> HSubset:
"""Construct an HSubset from a set of data indices and calculate location, scale and
determinant.
Args:
- indices: data indices
- X: complete dataset
- n_c_steps: will be passed directly to the HSubset
- ensure_non_singular: whether to resample in case the determinant is 0 (relevant for
sampling initial subsets)
"""
mu = X[indices].mean(axis=0)
cov = np.cov(X[indices], rowvar=False)
det = np.linalg.det(cov)
if ensure_non_singular:
while math.isclose(det, 0):
new_index = np.random.choice(np.delete(np.arange(X.shape[0]), indices))
indices = np.append(indices, new_index)
mu = X[indices].mean(axis=0)
cov = np.cov(X[indices], rowvar=False)
det = np.linalg.det(cov)
return HSubset(indices, mu, cov, det, n_c_steps=n_c_steps)
def _perform_c_step(self, subset: HSubset, X: np.ndarray) -> HSubset:
"""Perform a single C-step on the subset of the data
Args:
subset_idx: indices of the current subset
X: data
Returns:
indices for new h subset
"""
# Calculate the Mahalanobis distances
mahalanobis = mahalanobis_distance(X, subset.location, subset.scale)
# Find the h_size smallest distances
h = self._get_h(X)
idx = np.argsort(mahalanobis)[:h]
return self._get_subset(indices=idx, X=X, n_c_steps=subset.n_c_steps + 1)
def _correct_covariance(self, X: np.ndarray, best_subset: HSubset) -> np.ndarray:
self._rescale_factor = np.median(
np.square(mahalanobis_distance(X, best_subset.location, best_subset.scale))
) / chi2.ppf(0.5, X.shape[1])
return self._rescale_factor * best_subset.scale