from collections.abc import MutableMapping
from numbers import Real
from typing import Any, ClassVar, Literal, Self, TypeVar
import numpy as np
import scipy.stats as st
from numpy.typing import ArrayLike, NDArray
from sklearn.base import MetaEstimatorMixin, _fit_context, check_is_fitted, clone
from sklearn.linear_model import HuberRegressor
from sklearn.utils._available_if import available_if
from sklearn.utils._metadata_requests import RequestMethod
from sklearn.utils._param_validation import HasMethods, Interval, StrOptions
from ..._common import Parameter
from ..._types import FloatArrayLike, FloatNDArray, IntNDArray, ParameterConstraint
from ...metrics import Metric
from ...utils._sklearn_compat import _estimator_has, validate_data # type: ignore[attr-defined]
from ..csclassifier import CostSensitiveClassifier
CostStr = Literal['tp_cost', 'tn_cost', 'fn_cost', 'fp_cost']
K = TypeVar('K')
V = TypeVar('V')
CSCLASSIFIER_PARAMS = CostSensitiveClassifier._parameter_constraints.copy()
CSCLASSIFIER_PARAMS.pop('loss')
[docs]
class RobustCSClassifier(MetaEstimatorMixin, CostSensitiveClassifier): # type: ignore[misc]
"""
Cost-sensitive classifier that is robust to outliers in the instance-dependent costs.
The costs are adjusted by fitting an outlier estimator to the costs and imputing the costs for the outliers.
Outliers are detected by the standardized residuals of the cost and the predicted cost.
The costs passed to the cost-sensitive classifier are a combination of the original costs (for non-outliers) and
the imputed predicted costs (for outliers).
Read more in the :ref:`User Guide <robustcs>`.
Parameters
----------
estimator : Estimator
The cost-sensitive classifier to fit.
The estimator must take tp_cost, tn_cost, fn_cost, and fp_cost as keyword arguments in its fit method
or should use :class:`~empulse.metrics.Metric` as their loss/criterion.
outlier_estimator : Estimator, optional
The outlier estimator to fit to the costs.
If not provided, a :class:`sklearn:sklearn.linear_model.HuberRegressor` is used with default settings.
outlier_threshold : float, default=2.5
The threshold for the standardized residuals to detect outliers.
If the absolute value of the standardized residual is greater than the threshold,
the cost is an outlier and will be imputed with the predicted cost.
detect_outliers_for : {'all', 'tp_cost', 'tn_cost', 'fn_cost', 'fp_cost', list}, default='all'
The costs for which to detect outliers.
By default, all instance-dependent costs are used for outlier detection.
If a single cost is passed, only that cost is used for outlier detection.
If a list of costs is passed, only those costs are used for outlier detection.
.. note::
This parameter is ignored if the underlying estimator
uses :class:`~empulse.metrics.Metric` as its loss/criterion.
Then all costs that are marked as outlier-sensitive in the metric loss
are used for outlier detection.
This can be done through the :meth:`~empulse.metrics.Metric.mark_outlier_sensitive` method.
tp_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of true positives. If ``float``, then all true positives have the same cost.
If array-like, then it is the cost of each true positive classification.
Is overwritten if another `tp_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
fp_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of false positives. If ``float``, then all false positives have the same cost.
If array-like, then it is the cost of each false positive classification.
Is overwritten if another `fp_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
tn_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of true negatives. If ``float``, then all true negatives have the same cost.
If array-like, then it is the cost of each true negative classification.
Is overwritten if another `tn_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
fn_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of false negatives. If ``float``, then all false negatives have the same cost.
If array-like, then it is the cost of each false negative classification.
Is overwritten if another `fn_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
Attributes
----------
estimator_ : Estimator
The fitted cost-sensitive classifier.
outlier_estimators_ : dict{str, Estimator or None}
The fitted outlier estimators.
If no outliers are detected for this cost, the value is None.
The keys of the dictionary are 'tp_cost', 'tn_cost', 'fn_cost', and 'fp_cost'.
costs_ : dict
The imputed costs for the cost-sensitive classifier.
Notes
-----
Constant costs are not used for outlier detection and imputation.
Code adapted from [1]_.
Examples
--------
.. code-block:: python
import numpy as np
from empulse.models import CSLogitClassifier, RobustCSClassifier
from sklearn.datasets import make_classification
X, y = make_classification()
fn_cost = np.random.rand(y.size) # instance-dependent cost
fp_cost = 5 # constant cost
model = RobustCSClassifier(CSLogitClassifier(C=0.1))
model.fit(X, y, fn_cost=fn_cost, fp_cost=fp_cost)
Example with Metric loss:
.. code-block:: python
import numpy as np
import sympy as sp
from empulse.metrics import Metric, Cost, CostMatrix
from empulse.models import CSLogitClassifier, RobustCSClassifier
from sklearn.datasets import make_classification
X, y = make_classification()
a, b = sp.symbols('a b')
cost_loss = Metric(
CostMatrix().add_fp_cost(a).add_fn_cost(b).mark_outlier_sensitive(a), Cost()
)
fn_cost = np.random.rand(y.size)
model = RobustCSClassifier(CSLogitClassifier(loss=cost_loss))
model.fit(X, y, a=np.random.rand(y.size), b=5)
Example with passing instance-dependent costs through cross-validation:
.. code-block:: python
import numpy as np
from empulse.models import CSBoostClassifier, RobustCSClassifier
from sklearn import set_config
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
set_config(enable_metadata_routing=True)
X, y = make_classification()
fn_cost = np.random.rand(y.size)
fp_cost = 5
pipeline = Pipeline([
('scaler', StandardScaler()),
(
'model',
RobustCSClassifier(CSBoostClassifier()).set_fit_request(
fn_cost=True, fp_cost=True
),
),
])
cross_val_score(pipeline, X, y, params={'fn_cost': fn_cost, 'fp_cost': fp_cost})
Example with passing instance-dependent costs through a grid search:
.. code-block:: python
import numpy as np
from empulse.metrics import expected_cost_loss
from empulse.models import CSLogitClassifier, RobustCSClassifier
from sklearn import set_config
from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
set_config(enable_metadata_routing=True)
X, y = make_classification(n_samples=50)
fn_cost = np.random.rand(y.size)
fp_cost = 5
pipeline = Pipeline([
('scaler', StandardScaler()),
(
'model',
RobustCSClassifier(CSLogitClassifier()).set_fit_request(
fn_cost=True, fp_cost=True
),
),
])
param_grid = {'model__estimator__C': np.logspace(-5, 2, 5)}
scorer = make_scorer(
expected_cost_loss,
response_method='predict_proba',
greater_is_better=False,
normalize=True,
)
scorer = scorer.set_score_request(fn_cost=True, fp_cost=True)
grid_search = GridSearchCV(pipeline, param_grid=param_grid, scoring=scorer)
grid_search.fit(X, y, fn_cost=fn_cost, fp_cost=fp_cost)
References
----------
.. [1] De Vos, S., Vanderschueren, T., Verdonck, T., & Verbeke, W. (2023).
Robust instance-dependent cost-sensitive classification.
Advances in Data Analysis and Classification, 17(4), 1057-1079.
"""
_parameter_constraints: ClassVar[ParameterConstraint] = {
**CSCLASSIFIER_PARAMS,
'estimator': [HasMethods(['fit', 'predict_proba']), None],
'outlier_estimator': [HasMethods(['fit', 'predict']), None],
'outlier_threshold': [Interval(Real, 0, None, closed='right')],
'detect_outliers_for': [StrOptions({'all', 'tp_cost', 'tn_cost', 'fn_cost', 'fp_cost'}), list],
}
def _get_metric_loss(self) -> Metric | None:
"""Get the metric loss function if available."""
return self.estimator._get_metric_loss() if isinstance(self.estimator, CostSensitiveClassifier) else None
def __init__(
self,
estimator: Any,
outlier_estimator: Any = None,
*,
outlier_threshold: float = 2.5,
detect_outliers_for: Literal['all'] | CostStr | list[CostStr] = 'all',
tp_cost: FloatArrayLike | float = 0.0,
tn_cost: FloatArrayLike | float = 0.0,
fn_cost: FloatArrayLike | float = 0.0,
fp_cost: FloatArrayLike | float = 0.0,
):
self.estimator = estimator
self.outlier_estimator = outlier_estimator
self.outlier_threshold = outlier_threshold
self.detect_outliers_for = detect_outliers_for
super().__init__(tp_cost=tp_cost, tn_cost=tn_cost, fp_cost=fp_cost, fn_cost=fn_cost, loss=None)
def __post_init__(self) -> None:
# Allow passing costs accepted by the metric loss through metadata routing
if isinstance(self._get_metric_loss(), Metric):
self.__class__.set_fit_request = RequestMethod(
'fit',
sorted(
self.get_metadata_routing().fit.requests.keys() | self._get_metric_loss()._all_symbols # type: ignore[union-attr]
),
)
[docs]
@_fit_context(prefer_skip_nested_validation=False) # type: ignore[misc]
def fit(
self,
X: FloatArrayLike,
y: ArrayLike,
*,
tp_cost: FloatArrayLike | float | Parameter = Parameter.UNCHANGED,
tn_cost: FloatArrayLike | float | Parameter = Parameter.UNCHANGED,
fn_cost: FloatArrayLike | float | Parameter = Parameter.UNCHANGED,
fp_cost: FloatArrayLike | float | Parameter = Parameter.UNCHANGED,
**fit_params: Any,
) -> Self:
"""
Fit the estimator with the adjusted costs.
Parameters
----------
X : array-like of shape (n_samples, n_features)
y : array-like of shape (n_samples,)
tp_cost : float or array-like, shape=(n_samples,), default=$UNCHANGED$
Cost of true positives. If ``float``, then all true positives have the same cost.
If array-like, then it is the cost of each true positive classification.
fp_cost : float or array-like, shape=(n_samples,), default=$UNCHANGED$
Cost of false positives. If ``float``, then all false positives have the same cost.
If array-like, then it is the cost of each false positive classification.
tn_cost : float or array-like, shape=(n_samples,), default=$UNCHANGED$
Cost of true negatives. If ``float``, then all true negatives have the same cost.
If array-like, then it is the cost of each true negative classification.
fn_cost : float or array-like, shape=(n_samples,), default=$UNCHANGED$
Cost of false negatives. If ``float``, then all false negatives have the same cost.
If array-like, then it is the cost of each false negative classification.
fit_params : dict
Additional keyword arguments to pass to the estimator's fit method.
Returns
-------
self : RobustCSClassifier
Fitted RobustCSClassifier model.
"""
X, y = validate_data(self, X, y)
if (
isinstance(self.estimator, CostSensitiveClassifier)
and (metric_loss := self.estimator._get_metric_loss()) is not None
):
self.costs_: dict[str, int | float | FloatNDArray] = {}
outlier_symbols = metric_loss.cost_matrix._outlier_sensitive_symbols
imputed_costs = {}
self.outlier_estimators_ = {}
for symbol in outlier_symbols:
target = fit_params.get(str(symbol))
if target is None:
alias = _invert_dict(metric_loss.cost_matrix._aliases)[str(symbol)]
target = fit_params.get(alias)
if target is None:
raise ValueError(f"Cost '{symbol}' is not provided in fit_params.")
if not isinstance(target, np.ndarray):
raise TypeError(f"Cost '{symbol}' is not an array. Cannot detect outliers for this cost.")
pos_symbols = metric_loss.tp_cost.free_symbols | metric_loss.fn_cost.free_symbols
neg_symbols = metric_loss.tn_cost.free_symbols | metric_loss.fp_cost.free_symbols
if symbol in pos_symbols and symbol not in neg_symbols:
X_relevant, target_relevant = X[y > 0], target[y > 0]
elif symbol in neg_symbols and symbol not in pos_symbols:
X_relevant, target_relevant = X[y == 0], target[y == 0]
else:
X_relevant, target_relevant = X.copy(), target.copy()
if X_relevant.size > 0:
outlier_estimator = clone(
self.outlier_estimator if self.outlier_estimator is not None else HuberRegressor()
).fit(X_relevant, target_relevant)
cost_predictions = outlier_estimator.predict(X)
residuals = np.abs(target - cost_predictions)
std_residuals = residuals / st.sem(target)
outliers = std_residuals > self.outlier_threshold
fit_params[str(symbol)] = np.where(outliers, cost_predictions, target)
self.costs_[str(symbol)] = fit_params[str(symbol)]
self.outlier_estimators_[str(symbol)] = outlier_estimator
else:
tp_cost, tn_cost, fn_cost, fp_cost = self._check_costs(
tp_cost=tp_cost, tn_cost=tn_cost, fn_cost=fn_cost, fp_cost=fp_cost
)
self.costs_ = {
'tp_cost': tp_cost if isinstance(tp_cost, int | float) else np.array(tp_cost), # take copy of the array
'tn_cost': tn_cost if isinstance(tn_cost, int | float) else np.array(tn_cost),
'fn_cost': fn_cost if isinstance(fn_cost, int | float) else np.array(fn_cost),
'fp_cost': fp_cost if isinstance(fp_cost, int | float) else np.array(fp_cost),
}
should_fit = self._determine_outlier_costs()
self._fit_outlier_estimators(X, y, should_fit)
imputed_costs = self.costs_.copy()
# with the imputed costs fit the estimator
self.estimator_ = clone(self.estimator).fit(X, y, **imputed_costs, **fit_params)
if hasattr(self.estimator_, 'n_features_in_'):
self.n_features_in_ = self.estimator_.n_features_in_
if hasattr(self.estimator_, 'feature_names_in_'):
self.feature_names_in_ = self.estimator_.feature_names_in_
return self
def _determine_outlier_costs(self) -> list[str]:
"""Determine which costs to fit the outlier estimator on."""
# only fit on the costs that are arrays and have a standard deviation greater than 0
should_fit: list[str] = [
cost_name for cost_name, cost in self.costs_.items() if isinstance(cost, np.ndarray) and np.std(cost) > 0
]
if self.detect_outliers_for != 'all':
if isinstance(self.detect_outliers_for, str):
if self.detect_outliers_for in self.costs_: # single cost
if self.detect_outliers_for not in should_fit:
raise ValueError(
f"Cost '{self.detect_outliers_for}' is not an array or has a standard deviation of 0."
' Cannot detect outliers for this cost.'
)
should_fit = [self.detect_outliers_for] # type: ignore[list-item]
else:
raise ValueError(
f"Invalid cost name '{self.detect_outliers_for}' in detect_outliers_for."
" Must be one of 'all', 'tp_cost', 'tn_cost', 'fn_cost', 'fp_cost', or a list of these."
)
elif isinstance(self.detect_outliers_for, list):
for cost_name in self.detect_outliers_for:
if cost_name not in self.costs_:
raise ValueError(f"Invalid cost name '{cost_name}' in detect_outliers_for.")
if cost_name not in should_fit:
raise ValueError(
f"Cost '{cost_name}' is not an array or has a standard deviation of 0."
' Cannot detect outliers for this cost.'
)
should_fit = [cost_name for cost_name in self.detect_outliers_for if cost_name in should_fit]
else:
raise TypeError(
f"Invalid type '{type(self.detect_outliers_for)}' for detect_outliers_for."
" Must be one of 'all', 'tp_cost', 'tn_cost', 'fn_cost', 'fp_cost', or a list of these."
)
return should_fit
def _fit_outlier_estimators(self, X: FloatNDArray, y: FloatNDArray, should_fit: list[str]) -> None:
self.outlier_estimators_ = {}
for cost_name in self.costs_:
if cost_name in should_fit:
target = self.costs_[cost_name]
if not isinstance(target, np.ndarray):
raise TypeError(f"Cost '{cost_name}' is not an array. Cannot detect outliers for this cost.")
if cost_name in {'tp_cost', 'fn_cost'}:
X_relevant, target_relevant = X[y > 0], target[y > 0]
else:
X_relevant, target_relevant = X[y == 0], target[y == 0]
if X_relevant.size > 0:
outlier_estimator = clone(
self.outlier_estimator if self.outlier_estimator is not None else HuberRegressor()
).fit(X_relevant, target_relevant)
cost_predictions = outlier_estimator.predict(X)
residuals = np.abs(target - cost_predictions)
std_residuals = residuals / st.sem(target)
outliers = std_residuals > self.outlier_threshold
self.costs_[cost_name] = np.where(outliers, cost_predictions, target)
self.outlier_estimators_[cost_name] = outlier_estimator
else:
self.outlier_estimators_[cost_name] = None
else:
self.outlier_estimators_[cost_name] = None
def _fit(self, X: FloatNDArray, y: IntNDArray, loss: Metric, **loss_params: Any) -> Self: # type: ignore[empty-body]
pass
[docs]
@available_if(_estimator_has('predict')) # type: ignore[misc]
def predict(self, X: FloatArrayLike) -> FloatNDArray: # noqa: D102
check_is_fitted(self, 'estimator_')
y_pred: FloatNDArray = self.estimator_.predict(X)
return y_pred
@available_if(_estimator_has('predict_proba')) # type: ignore[misc]
def predict_proba(self, X: FloatArrayLike) -> FloatNDArray: # noqa: D102
check_is_fitted(self, 'estimator_')
y_proba: FloatNDArray = self.estimator_.predict_proba(X)
return y_proba
@available_if(_estimator_has('decision_function')) # type: ignore[misc]
def decision_function(self, X: FloatArrayLike) -> FloatNDArray: # noqa: D102
check_is_fitted(self, 'estimator_')
y_score: FloatNDArray = self.estimator_.decision_function(X)
return y_score
@property
def classes_(self) -> NDArray[Any]: # noqa: D102
classes: NDArray[Any] = self.estimator_.classes_
return classes
@classes_.setter
def classes_(self, value: NDArray[Any]) -> None:
if estimator := getattr(self, 'estimator_', None):
estimator.classes_ = value
else:
raise AttributeError('The underlying estimator is not fitted yet.')
def _invert_dict(d: MutableMapping[K, V]) -> dict[V, K]:
"""Invert a dictionary, swapping keys and values."""
return {v: k for k, v in d.items()}