Source code for robpy.covariance.cellmcd

import logging
import numpy as np
import matplotlib.pyplot as plt

from scipy.stats import chi2, median_abs_deviation
from typing import Literal, Optional
from robpy.covariance.utils.alter_covariance import truncated_covariance
from robpy.covariance.base import RobustCovariance
from robpy.utils.logging import get_logger
from robpy.covariance.utils.cellmcd_utils import objective_function
from robpy.utils.general import inverse_submatrix
from robpy.covariance.utils.cellmcd_visualization_utils import (
    annote_outliers,
    annote_outliers_ellipse,
    draw_ellipse,
    draw_threshold_lines,
    get_thresholds,
)
from robpy.preprocessing.scaling import RobustScaler
from robpy.univariate.onestep_m import OneStepWrapping
from robpy.covariance.initial_ddcw import InitialDDCW
from sklearn.exceptions import NotFittedError


[docs] class CellMCD(RobustCovariance): def __init__( self, *, alpha: float = 0.75, quantile: float = 0.99, crit: float = 1e-4, max_c_steps: int = 100, min_eigenvalue: float = 1e-4, verbosity: int = logging.WARNING, ): """ Cell MCD estimator based on the algorithm proposed in Raymaekers, J., & Rousseeuw, P. J. (2024). Args: alpha (float, optional): Percentage indicating how many cells must remain unflagged in each column. Must lie within 0.5 to 1.0. Defaults to 0.75. quantile (float, optional): Cutoff value to flag cells. Defaults to 0.99. crit (float, optional): Stop iterating when successive covariance matrices of the standardized data differ by less than crit. Defaults to 1e-4. max_c_steps (int, optional): Maximum number of C-steps used in the algorithm. Defaults to 100. min_eigenvalue (float, optional): Lower bound on the minimum eigenvalue of the covariance estimator on the standardized data. Should be at least 1e-6. Defaults to 1e-4. References: - Raymaekers, J., & Rousseeuw, P. J. (2024). The cellwise minimum covariance determinant estimator. Journal of the American Statistical Association, 119(548), 2610-2621. """ if min_eigenvalue < 1e-6: raise ValueError("The lower bound on the eigenvalues should be at least 1e-6.") super().__init__(store_precision=False, assume_centered=False, nans_allowed=True) self.alpha = alpha self.quantile = quantile self.crit = crit self.max_c_steps = max_c_steps self.min_eigenvalue = min_eigenvalue self.logger = get_logger("CellMCD", level=verbosity) self.verbosity = verbosity
[docs] def calculate_covariance(self, X: np.ndarray) -> np.ndarray: # check that alpha creates a h-subset larger than [n/2]+1 n = X.shape[0] if 0.5 <= self.alpha <= 1: if self.alpha < (int(n / 2) + 1.0) / n: self.logger.warning( f"h = alpha*n is too small and therefore set to [n/2] + 1" f" ({(int(n / 2) + 1.0) / n})." ) self.alpha = np.max([self.alpha, (int(n / 2) + 1.0) / n]) else: raise ValueError(f"alpha must a float between 0.5 and 1, but received {self.alpha}.") # Step 0: robustly standardize the data mads = median_abs_deviation(X, nan_policy="omit", axis=0) if np.min(mads) < 1e-8: raise ValueError("At least one variable has an almost zero median absolute deviation.") scaler = RobustScaler(scale_estimator=OneStepWrapping()) scaler.fit(X, ignore_nan=True) X_scaled = scaler.transform(X) # Step 1: Check that there aren't too many marginal outliers and too many nan's. # Check that there are enough observations compared to variables. self._check_data(X_scaled) # Step 2: calculate the initial estimates ddcw = InitialDDCW(alpha=self.alpha, min_eigenvalue=self.min_eigenvalue) ddcw.fit(X_scaled) initial_location = ddcw.location_ initial_cov = ddcw.covariance_ initial_cov_inv = ddcw.precision_ X_scaled[np.abs(X_scaled) > 3] = np.nan # Step 3: MCD iterations (we work with X_scaled) location, cov, cov_inv, W = self._C_steps_until_convergence( X_scaled, initial_location, initial_cov, initial_cov_inv ) # Step 4: make prediction predictions, conditional_variances = self._make_predictions( X_scaled, cov, cov_inv, location, W ) # Step 5: transform back cov = np.diag(scaler.scales_) @ cov @ np.diag(scaler.scales_) location = scaler.locations_ + location * scaler.scales_ predictions = scaler.locations_ + predictions * scaler.scales_ conditional_stds = np.sqrt(conditional_variances) * scaler.scales_ X_imputed = X.copy() X_imputed[W == 0] = predictions[W == 0] residuals = (X - predictions) / conditional_stds scaler_residuals = RobustScaler(scale_estimator=OneStepWrapping()) scaler_residuals.fit(residuals, ignore_nan=True) residuals = residuals / scaler_residuals.scales_ self.W = W self.location_ = location self.predictions = predictions self.conditional_stds = conditional_stds self.X_imputed = X_imputed self.residuals = residuals self.covariance_ = cov self.X = X return cov
[docs] def cell_MCD_plot( self, variable: int, variable_name: str = "variable", row_names: Optional[list] = None, second_variable: Optional[int] = None, second_variable_name: str = "second variable", plottype: Literal[ "indexplot", "residuals_vs_variable", "residuals_vs_predictions", "variable_vs_predictions", "bivariate", ] = "indexplot", figsize: tuple[int, int] = (8, 8), annotation_quantile: float | None = None, ): """ Function to plot the results of a cell MCD analysis: 5 types of diagnostic plots. Arguments: variable (int): Index of the variable under consideration. variable_name (str, optional): Name of the variable of interest for the axis label. Defaults to "variable". row_names (list of strings, optional): Row names of the observations if you want the outliers annoted with their name. second_variable (int, optional): Index of the second variable under consideration, only needed for plottype "bivariate". second_variable_name (str, optional): Name of the second variable for the axis label, only relevant for plottype "bivariate". Defaults to "second variable". plottype (Literal string, optional): * "indexplot": plots the residuals of a variable versus the case numbers. * "residuals_vs_variable": plots the residuals of a variable versus the variable itself. * "residuals_vs_predictions": plots the residuals of a variable versus the predictions of that variable. * "variable_vs_predictions": plots a variable against its predictions. * "bivariate": plots two variables against each other. Defaults to "indexplot". figsize (tuple[int,int], optional): Size of the figure. Defaults to (8,8). annotation_quantile (float | None, optional): The quantile used to draw an imaginary threshold around the data. Only points outside these thresholds will be annotated. If None, use self.quantile. """ if not hasattr(self, "covariance_"): raise NotFittedError() fig, ax = plt.subplots(1, 1, figsize=figsize) cutoff = float(np.sqrt(chi2.ppf(self.quantile, 1))) annotation_cutoff = ( cutoff if annotation_quantile is None else float(np.sqrt(chi2.ppf(annotation_quantile, 1))) ) if plottype == "indexplot": x, y = np.arange(self.W.shape[0]), self.residuals[:, variable] h_thresholds, v_thresholds = (-cutoff, cutoff), None h_thresholds_annotate, v_thresholds_annotate = ( -annotation_cutoff, annotation_cutoff, ), None xlabel, ylabel = "index", f"standardized residuals of {variable_name}" title = "Indexplot: standardized residuals" elif plottype == "residuals_vs_variable": x, y = self.X[:, variable], self.residuals[:, variable] h_thresholds, v_thresholds = (-cutoff, cutoff), get_thresholds(cutoff, x) h_thresholds_annotate, v_thresholds_annotate = ( -annotation_cutoff, annotation_cutoff, ), get_thresholds(annotation_cutoff, x) xlabel, ylabel = variable_name, f"standardized residuals of {variable_name}" title = f"Standardized residuals versus the {variable_name}" elif plottype == "residuals_vs_predictions": x, y = self.predictions[:, variable], self.residuals[:, variable] h_thresholds, v_thresholds = (-cutoff, cutoff), get_thresholds(cutoff, x) h_thresholds_annotate, v_thresholds_annotate = ( -annotation_cutoff, annotation_cutoff, ), get_thresholds(annotation_cutoff, x) xlabel, ylabel = ( f"predictions of {variable_name}", f"standardized residuals of {variable_name}", ) title = f"Standardized residuals versus the predictions of the {variable_name}" elif plottype == "variable_vs_predictions": x, y = self.predictions[:, variable], self.X[:, variable] h_thresholds, v_thresholds = get_thresholds(cutoff, y), get_thresholds(cutoff, x) h_thresholds_annotate, v_thresholds_annotate = get_thresholds( annotation_cutoff, y ), get_thresholds(annotation_cutoff, x) xlabel, ylabel = f"predictions of {variable_name}", f"observed {variable_name}" ax.axline((x[0], x[0]), slope=1, color="grey", linestyle="-.") title = f"{variable_name} versus its predictions" elif plottype == "bivariate": if second_variable is None: raise ValueError("second_variable must be provided for bivariate plot.") x, y = self.X[:, variable], self.X[:, second_variable] h_thresholds, v_thresholds = get_thresholds(cutoff, y), get_thresholds(cutoff, x) xlabel, ylabel = variable_name, second_variable_name draw_ellipse( self.covariance_[np.ix_([variable, second_variable], [variable, second_variable])], self.location_[[variable, second_variable]], ax, self.quantile, ) title = f"{second_variable_name} versus {variable_name}" ax.scatter(x=x, y=y) ax.set(xlabel=xlabel, ylabel=ylabel, title=title) if row_names is not None: if plottype != "bivariate": annote_outliers( ax, row_names, x, y, h_thresholds_annotate, v_thresholds_annotate, ) else: annote_outliers_ellipse( ax, row_names, self.location_, self.covariance_, variable, second_variable, x, y, self.quantile if annotation_quantile is None else annotation_quantile, ) draw_threshold_lines(ax, h_thresholds, v_thresholds) return fig
def _make_predictions( self, X: np.ndarray, sigma: np.ndarray, sigma_inv: np.ndarray, mu: np.ndarray, W: np.ndarray ) -> tuple[np.ndarray, np.ndarray]: """Calculate the predictions of the cells given the other clean cells in the same row.""" n, p = X.shape predictions = np.zeros([n, p]) conditional_variances = np.zeros([n, p]) unique_rows = np.unique(W, axis=0) for w in unique_rows: # iterate over all w patterns row_idx_w = np.where(np.all(W == w, axis=1))[0] # rows with the corresponding w pattern missing_col_idx = np.where(w == 0)[0] observed_col_idx = np.where(w == 1)[0] if len(missing_col_idx) < p and len(missing_col_idx) > 0: # there are both observed and missing values # predict the missing values predictions, conditional_variances = self._predict_missing_values( X, predictions, conditional_variances, row_idx_w, missing_col_idx, observed_col_idx, mu, sigma, sigma_inv, ) # predict the observed values predictions, conditional_variances = self._predict_observed_values( X, predictions, conditional_variances, row_idx_w, observed_col_idx, mu, sigma, sigma_inv, ) elif len(missing_col_idx) == 0: # no missings in the w-pattern predictions, conditional_variances = self._predict_no_missing( X, predictions, conditional_variances, row_idx_w, mu, sigma, sigma_inv ) else: # all missings in the w-pattern predictions, conditional_variances = self._predict_all_missing( predictions, conditional_variances, row_idx_w, mu, sigma_inv ) return predictions, conditional_variances def _C_steps_until_convergence( self, X_scaled: np.ndarray, initial_location: np.array, initial_cov: np.ndarray, initial_cov_inv: np.ndarray, ): """Perform C-steps (update W, location and covariance) until convergence. Arguments: - X_scaled (np.ndarray): scaled data - initial_location (np.array): initial location estimate - initial_cov (np.ndarray): initial covariance estimate - initial_cov_inv (np.ndarray): inverse of the initial covariance estimate """ n, p = X_scaled.shape h = round(self.alpha * n) initial_W = np.ones([n, p]) initial_W[np.isnan(X_scaled)] = 0 objective_values = np.full(self.max_c_steps + 1, np.nan) step = 0 q = ( chi2.ppf(0.99, 1) - np.log(np.diag(initial_cov_inv)) + np.log(2 * np.pi) ) # equation (21) penalty = np.sum(q * np.sum(1 - initial_W, axis=0)) objective_values[step] = ( objective_function(X_scaled, initial_W, initial_location, initial_cov, initial_cov_inv) + penalty ) convergence_criteria = 1 W_old = initial_W location_old = initial_location cov_old = initial_cov cov_inv_old = initial_cov_inv while convergence_criteria > self.crit and step < self.max_c_steps: W, location, cov = self._C_step( X_scaled, W_old, location_old, cov_old, cov_inv_old, q, h ) convergence_criteria = np.max(np.abs(cov - cov_old)) cov = truncated_covariance(cov, self.min_eigenvalue) cov_inv = np.linalg.inv(cov) penalty = np.sum(q * np.sum(1 - W, axis=0)) objective_value = objective_function(X_scaled, W, location, cov, cov_inv) + penalty if objective_value > objective_values[step]: return location_old, cov_old, cov_inv_old, W_old step = step + 1 objective_values[step] = objective_value W_old = W location_old = location cov_old = cov cov_inv_old = cov_inv return location, cov, cov_inv, W def _C_step( self, X: np.ndarray, W: np.ndarray, mu: np.array, sigma: np.ndarray, sigma_inv: np.ndarray, q: np.array, h: int, ): n, p = X.shape # first update W W = self._update_W(X, W, mu, sigma, sigma_inv, q, h) # next: update mu & sigma X_imputed = np.copy(X) bias = np.zeros([p, p]) for i in range(n): missing_col_idx = np.where(W[i, :] == 0)[0] observed_col_idx = np.where(W[i, :] == 1)[0] if len(missing_col_idx) > 0: if len(missing_col_idx) == p: # if all are missing X_imputed[i, :] = mu bias = bias + sigma else: sigma_inv_mis_inv = np.linalg.inv( sigma_inv[np.ix_(missing_col_idx, missing_col_idx)] ) X_imputed[np.ix_([i], missing_col_idx)] = ( mu[missing_col_idx] - ( sigma_inv_mis_inv @ sigma_inv[np.ix_(missing_col_idx, observed_col_idx)] @ (X[np.ix_([i], observed_col_idx)] - mu[observed_col_idx]).T ).flatten() ) bias[np.ix_(missing_col_idx, missing_col_idx)] = ( bias[np.ix_(missing_col_idx, missing_col_idx)] + sigma_inv_mis_inv ) mu = np.mean(X_imputed, axis=0) bias = bias / n sigma = np.cov(X_imputed, rowvar=False) * (n - 1) / n + bias return W, mu, sigma def _update_W( self, X: np.ndarray, W: np.ndarray, mu: np.array, sigma: np.ndarray, sigma_inv: np.ndarray, q: np.array, h: int, ) -> np.ndarray: """Part (a) of the C-step: updating the matrix W while keeping mu and sigma unchanged. This is done per column.""" n = W.shape[0] ordering = np.argsort(np.sum(W, axis=0), kind="stable") for j in ordering: delta = self._calculate_delta(X, W, sigma, sigma_inv, mu, j) good_cells = np.where(delta <= q[j])[0] if len(good_cells) < h: # cannot set less than h cells to 1 delta_rank = np.argsort(delta, kind="stable") good_cells = delta_rank[:h] w_new = np.zeros(n) w_new[good_cells] = 1 W[:, j] = w_new return W def _calculate_delta( self, X: np.ndarray, W: np.ndarray, sigma: np.ndarray, sigma_inv: np.ndarray, mu: np.ndarray, j, ): """Calculates the delta's from equation (20) for the column j (without q_j).""" n = X.shape[0] delta = np.full(n, np.inf) unique_rows = np.unique(W, axis=0) for w in unique_rows: x = X[:, j] row_idx_w = np.where(np.all(W == w, axis=1))[0] # rows with current w pattern finite_rows = row_idx_w[np.isfinite(x[row_idx_w])] # row_idx_w with finite/not-nan x_j if len(finite_rows) > 0: # if there are rows that aren't nans or infs x = x[finite_rows] # the relevant entries w1 = w.copy() # current W pattern w0 = w.copy() w0[j] = 0 # current W pattern, but W_ij is set to zero if np.any(w0): # current W pattern contains nonzeros (without looking at j) w1[j] = 1 # current W pattern, but W_ij is set to one index_0 = np.nonzero(w0)[0] # ones in current W pattern (without j) index_1 = np.nonzero(w1)[0] mu_0 = mu[index_0] mu_1 = mu[index_1] sigma_1 = sigma[np.ix_(index_1, index_1)] sigma_inv_0 = inverse_submatrix(sigma, sigma_inv, index_0) jtemp = np.where(index_1 == j)[0] # current j in index 1 jtemp_n = np.where(index_1 != j)[0] # ones in index 1 (without j) Xtemp = X[np.ix_(finite_rows, index_0)] - mu_0 x_hat = mu_1[jtemp] + (Xtemp @ sigma_inv_0 @ sigma_1[np.ix_(jtemp_n, jtemp)]).T c = ( sigma_1[np.ix_(jtemp, jtemp)] - sigma_1[np.ix_(jtemp, jtemp_n)] @ sigma_inv_0 @ sigma_1[np.ix_(jtemp_n, jtemp)] ) delta[finite_rows] = (x - x_hat) ** 2 / c + np.log(c) + np.log(2 * np.pi) else: # current W pattern has zeros everywhere (except on j) delta[finite_rows] = ( (x - mu[j]) ** 2 / sigma[j, j] + np.log(sigma[j, j]) + np.log(2 * np.pi) ) return delta def _predict_missing_values( self, X: np.ndarray, predictions: np.ndarray, conditional_variances: np.ndarray, row_idx_w: np.array, missing_col_idx: np.array, observed_col_idx: np.array, mu: np.array, sigma: np.ndarray, sigma_inv: np.ndarray, ): """Predict the missing values when there are both observed and missing values.""" sigma_observed_inv = inverse_submatrix(sigma, sigma_inv, observed_col_idx) conditional_variance_missing = np.diag( sigma[np.ix_(missing_col_idx, missing_col_idx)] - sigma[np.ix_(missing_col_idx, observed_col_idx)] @ sigma_observed_inv @ sigma[np.ix_(observed_col_idx, missing_col_idx)] ) # predict the missing values for index in row_idx_w: conditional_variances[np.ix_([index], missing_col_idx)] = conditional_variance_missing predictions[np.ix_([index], missing_col_idx)] = ( mu[missing_col_idx] + (X[np.ix_([index], observed_col_idx)] - mu[observed_col_idx]) @ sigma_observed_inv @ sigma[np.ix_(observed_col_idx, missing_col_idx)] ) return predictions, conditional_variances def _predict_observed_values( self, X: np.ndarray, predictions: np.ndarray, conditional_variances: np.ndarray, row_idx_w: np.ndarray, observed_col_idx: np.ndarray, mu: np.ndarray, sigma: np.ndarray, sigma_inv: np.ndarray, ): """Predict the observed values when there are both observed and missing values.""" if len(observed_col_idx) == 1: # only 1 observed value for index in row_idx_w: predictions[np.ix_([index], observed_col_idx)] = mu[observed_col_idx] conditional_variances[np.ix_([index], observed_col_idx)] = sigma[ observed_col_idx, observed_col_idx ] else: # multiple observed values for obs in observed_col_idx: other_observations = observed_col_idx[observed_col_idx != obs] sigma_others_inv = inverse_submatrix(sigma, sigma_inv, other_observations) conditional_variance_others = ( sigma[obs, obs] - sigma[np.ix_([obs], other_observations)] @ sigma_others_inv @ sigma[np.ix_(other_observations, [obs])] ) for index in row_idx_w: predictions[index, obs] = ( mu[obs] + (X[np.ix_([index], other_observations)] - mu[other_observations]) @ sigma_others_inv @ sigma[np.ix_(other_observations, [obs])] ) conditional_variances[index, obs] = conditional_variance_others return predictions, conditional_variances def _predict_no_missing( self, X: np.ndarray, predictions: np.ndarray, conditional_variances: np.ndarray, row_idx_w: np.ndarray, mu: np.ndarray, sigma: np.ndarray, sigma_inv: np.ndarray, ): """Make predictions when no values in the w-pattern are missing.""" p = X.shape[1] conditional_variance_nomissings = 1.0 / np.diag(sigma_inv) for index in row_idx_w: conditional_variances[index, :] = conditional_variance_nomissings for j in range(p): j_neg = np.arange(p) j_neg = j_neg[j_neg != j] sigma_jneg_inv = inverse_submatrix(sigma, sigma_inv, j_neg) Xtemp = X[np.ix_(row_idx_w, j_neg)] - mu[j_neg] predictions[np.ix_(row_idx_w, [j])] = ( mu[j] + Xtemp @ sigma_jneg_inv @ sigma[np.ix_(j_neg, [j])] ) return predictions, conditional_variances def _predict_all_missing( self, predictions: np.ndarray, conditional_variances: np.ndarray, row_idx_w: np.array, mu: np.array, sigma_inv: np.ndarray, ): """Make predictions when all values in the w-pattern are missing.""" for index in row_idx_w: predictions[index, :] = mu conditional_variances[index, :] = np.diag(sigma_inv) return predictions, conditional_variances def _check_data(self, X_scaled: np.ndarray): """Checks that there aren't too many nans or marginal outliers per column. Checks that there are enough observations. Arguments: X_scaled (np.ndarray): robustly standardized data set.""" X = np.copy(X_scaled) cutoff = np.sqrt(chi2.ppf(self.quantile, 1)) n_marginal_outliers = np.sum(np.abs(X) > cutoff, axis=0) / X.shape[0] if np.max(n_marginal_outliers) > 1 - self.alpha: raise ValueError( f"\nAt least one variable has more than {100 * (1 - self.alpha)}% of " "marginal outliers." ) X[np.abs(X) > cutoff] = np.nan n_bad_values = np.isnan(X).sum(axis=0) / X.shape[0] if np.max(n_bad_values) > 1 - self.alpha: raise ValueError( f"\nAt least one variable has more than {100 * (1 - self.alpha)}% of nan's or " "marginal outliers." ) if X.shape[0] < 5 * X.shape[1]: raise ValueError( "There are not enough observations compared to the number of " f"variables, n/p = {X.shape[0]/X.shape[1]} < 5." )