Source code for robpy.pca.robpca

from __future__ import annotations

import numpy as np
import logging

from sklearn.decomposition import PCA
from robpy.pca.base import RobustPCA, get_od_cutoff
from robpy.utils.outlyingness import stahel_donoho
from robpy.utils.logging import get_logger
from robpy.covariance import FastMCD


[docs] class ROBPCA(RobustPCA): def __init__( self, *, n_components: int | None = None, k_min_var_explained: float = 0.8, alpha: float = 0.75, final_MCD_step: bool = True, random_seed: int | None = None, verbosity: int = logging.WARNING, ): """Implementation of ROBPCA algorithm as described in Hubert, M., Rousseeuw, P. J., & Vanden Branden, K. (2005). Args: n_components (int | None, optional): Number of components to select. If None, it is set during fit. Defaults to None. k_min_var_explained (float, optional): Minimum variance explained by the components. Only used if n_components is None. Defaults to 0.8. alpha (float, optional): Coverage parameter, determines the robustness and efficiency trade off of the estimator. Smaller alpha gives more robust but less accurate estimates. Must be a number between 0.5 and 1. Defaults to 0.75. final_MCD_step (bool, optional): Whether to apply the final MCD step to get maximally robust estimates. If False, the eigenvectors after projection onto V1 (subspace determined by points with OD < cutoff) are used as the final estimates. Defaults to True. random_seed (int | None, optional): Can be used to provide a random seed. Defaults to None. References: - Hubert, M., Rousseeuw, P. J., & Vanden Branden, K. (2005). ROBPCA: a new approach to robust principal component analysis. Technometrics, 47(1), 64-79. """ super().__init__(n_components=n_components) self.k_min_var_explained = k_min_var_explained self.alpha = alpha self.final_MCD_step = final_MCD_step self.random_seed = random_seed self.logger = get_logger("ROBPCA", level=verbosity) self.verbosity = verbosity
[docs] def fit(self, X: np.ndarray) -> ROBPCA: # step 1: singular value decomposition = applying standard PCA pca = PCA() X = pca.fit_transform(X) self.location_ = np.mean(X, axis=0) loadings = pca.components_.T # step 2: stahel donoho outlyingness --> h subset outlyingness = stahel_donoho(X, random_seed=self.random_seed) if not (isinstance(self.alpha, (int, float)) and 0.5 <= self.alpha <= 1): raise ValueError( f"alpha must be between 0.5 and 1 (inclusive), but received {self.alpha}." ) elif self.n_components is not None: if int(self.alpha * X.shape[0]) < int((X.shape[0] + self.n_components + 1) / 2): self.logger.warning( f"h is too small and therefore set to [(n+k+1)/2]" f" ({int((X.shape[0] + self.n_components + 1) / 2)})." ) h = np.max( [int(self.alpha * X.shape[0]), int((X.shape[0] + self.n_components + 1) / 2)] ) else: if int(self.alpha * X.shape[0]) < int((X.shape[0] + 10 + 1) / 2): self.logger.warning( f"h is too small and therefore set to [(n+10+1)/2]" f" ({int((X.shape[0] + 10 + 1) / 2)})." ) h = np.max([int(self.alpha * X.shape[0]), int((X.shape[0] + 10 + 1) / 2)]) h_index = np.argsort(outlyingness)[:h] h_cov = np.cov(X[h_index], rowvar=False) # step 3: project on k-dimensional subspace eigvals_h, eigvecs_h = np.linalg.eigh(h_cov) sorted_eig_h_idx = np.argsort(eigvals_h)[::-1] var_explained_ratio = eigvals_h[sorted_eig_h_idx].cumsum() / eigvals_h.sum() if self.n_components is None: k = np.argmax(var_explained_ratio >= self.k_min_var_explained) + 1 else: k = self.n_components self.explained_variance_ = eigvals_h[sorted_eig_h_idx[:k]] self.explained_variance_ratio = self.explained_variance_[:k].cumsum() / eigvals_h.sum() h_components = eigvecs_h[:, sorted_eig_h_idx[:k]] # (p, k) X_proj = (X - self.location_) @ h_components @ h_components.T # step 4: orthogonal distance orth_dist = np.linalg.norm((X - self.location_) - X_proj, axis=1) v_index = np.argwhere(orth_dist < get_od_cutoff(orth_dist)).flatten() cov_v = np.cov(X[v_index], rowvar=False) eigvals_v, eigvecs_v = np.linalg.eigh(cov_v) k = min(k, np.linalg.matrix_rank(cov_v, hermitian=True)) sorted_eig_v_idx = np.argsort(eigvals_v)[::-1] self.components_ = eigvecs_v[:, sorted_eig_v_idx[:k]] if self.final_MCD_step and k > 1: # step 5: final MCD step mcd = FastMCD().fit(self.transform(X)) _, eigvecs_mcd = np.linalg.eigh(mcd.covariance) k = min(k, np.linalg.matrix_rank(mcd.covariance, hermitian=True)) self.components_ = self.components_ @ np.flip(eigvecs_mcd[:, -k:], axis=1) # step 6: transform back to original X self.n_components = k X = pca.inverse_transform(X) self.components_ = loadings @ self.components_ self.location_ = np.mean(X, axis=0) return self