import warnings
from collections.abc import Callable, Sequence
from functools import partial
from typing import Any, ClassVar, Literal, Self, TypeVar, overload
import numpy as np
from numpy.typing import ArrayLike
from scipy.special import expit
from sklearn.base import clone
from sklearn.utils._param_validation import HasMethods
from sklearn.utils.validation import check_is_fitted
from ..._types import FloatArrayLike, FloatNDArray, IntNDArray, ParameterConstraint
from ...utils._sklearn_compat import validate_data # type: ignore[attr-defined]
try:
from xgboost import XGBClassifier
except ImportError:
XGBClassifier = TypeVar('XGBClassifier') # type: ignore[misc, assignment]
try:
from lightgbm import LGBMClassifier
except ImportError:
LGBMClassifier = TypeVar('LGBMClassifier') # type: ignore[misc, assignment]
try:
from catboost import CatBoostClassifier
except ImportError:
CatBoostClassifier = TypeVar('CatBoostClassifier') # type: ignore[misc, assignment]
from ..._common import Parameter
from ...metrics import Metric
from ...metrics._loss import cy_boost_grad_hess
from ..csclassifier import CostSensitiveClassifier
# Hessian is 0 at score 0.5
# which means that at initialization the model optimization doesn't do anything
# therefore we add a small nudge which kickstarts the optimization algorithm (so hessian is not 0)
_BASE_SCORE = 0.5 + 1e-2
class LGBMObjective:
"""AEC objective for lightgbm."""
def __init__(self, gradient_const: FloatNDArray):
self.gradient_const = gradient_const
def __call__(self, y_true: FloatNDArray, y_score: FloatNDArray) -> tuple[FloatNDArray, FloatNDArray]:
"""
Create an objective function for the AEC measure.
Parameters
----------
y_true : np.ndarray
Ground truth labels
y_score : np.ndarray
Predicted labels
Returns
-------
gradient : np.ndarray
Gradient of the objective function.
hessian : np.ndarray
Hessian of the objective function.
"""
gradient: FloatNDArray
hessian: FloatNDArray
gradient, hessian = cy_boost_grad_hess(y_true, y_score, self.gradient_const)
return gradient, hessian
class LGBMMetricObjective:
"""Metric objective wrapper for lightgbm using dynamic gradient/hessian evaluation."""
def __init__(self, metric: Metric, **loss_params: FloatNDArray | float):
self.metric = metric
self.loss_params = loss_params
def __call__(self, y_true: FloatNDArray, y_score: FloatNDArray) -> tuple[FloatNDArray, FloatNDArray]:
"""Compute the gradient and hessian of the metric objective."""
gradient, hessian = self.metric._gradient_boost_objective(y_true, y_score, **self.loss_params)
return gradient, hessian
[docs]
class CSBoostClassifier(CostSensitiveClassifier):
"""
Cost-sensitive gradient boosting classifier.
CSBoostClassifier supports :class:`xgboost:xgboost.XGBClassifier`, :class:`lightgbm:lightgbm.LGBMClassifier`
and :class:`catboost.CatBoostClassifier` as base estimators.
By default, it uses XGBoost classifier with default hyperparameters.
Read more in the :ref:`User Guide <csboost>`.
.. seealso::
:class:`~empulse.models.CSLogitClassifier` : Cost-sensitive logistic regression classifier.
:class:`~empulse.models.CSTreeClassifier` : Cost-sensitive decision tree classifier.
:class:`~empulse.models.CSForestClassifier` : Cost-sensitive random forest classifier.
Parameters
----------
estimator : :class:`xgboost:xgboost.XGBClassifier`, :class:`lightgbm:lightgbm.LGBMClassifier` \
or :class:`catboost.CatBoostClassifier`, optional
XGBoost or LightGBM classifier to be fit with desired hyperparameters.
If not provided, a XGBoost classifier with default hyperparameters is used.
tp_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of true positives. If ``float``, then all true positives have the same cost.
If array-like, then it is the cost of each true positive classification.
Is overwritten if another `tp_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
fp_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of false positives. If ``float``, then all false positives have the same cost.
If array-like, then it is the cost of each false positive classification.
Is overwritten if another `fp_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
tn_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of true negatives. If ``float``, then all true negatives have the same cost.
If array-like, then it is the cost of each true negative classification.
Is overwritten if another `tn_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
fn_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of false negatives. If ``float``, then all false negatives have the same cost.
If array-like, then it is the cost of each false negative classification.
Is overwritten if another `fn_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
loss : :class:`empulse.metrics.Metric`, default=None
Loss function to optimize. Metric parameters are passed as ``loss_params``
to the :Meth:`~empulse.models.CSBoostClassifier.fit` method.
Attributes
----------
classes_ : numpy.ndarray, shape=(n_classes,)
Unique classes in the target.
estimator_ : :class:`xgboost:xgboost.XGBClassifier`
Fitted XGBoost classifier.
Examples
--------
.. code-block:: python
import numpy as np
from empulse.models import CSBoostClassifier
from sklearn.datasets import make_classification
X, y = make_classification()
fn_cost = np.random.rand(y.size) # instance-dependent cost
fp_cost = 5 # constant cost
model = CSBoostClassifier()
model.fit(X, y, fn_cost=fn_cost, fp_cost=fp_cost)
y_proba = model.predict_proba(X)
Example with passing instance-dependent costs through cross-validation:
.. code-block:: python
import numpy as np
from empulse.models import CSBoostClassifier
from sklearn import set_config
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
set_config(enable_metadata_routing=True)
X, y = make_classification()
fn_cost = np.random.rand(y.size)
fp_cost = 5
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', CSBoostClassifier().set_fit_request(fn_cost=True, fp_cost=True))
])
cross_val_score(pipeline, X, y, params={'fn_cost': fn_cost, 'fp_cost': fp_cost})
Example with passing instance-dependent costs through a grid search:
.. code-block:: python
import numpy as np
from empulse.metrics import expected_cost_loss
from empulse.models import CSBoostClassifier
from sklearn import set_config
from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from xgboost import XGBClassifier
set_config(enable_metadata_routing=True)
X, y = make_classification(n_samples=50)
fn_cost = np.random.rand(y.size)
fp_cost = 5
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', CSBoostClassifier(
XGBClassifier(n_jobs=2, n_estimators=10)
).set_fit_request(fn_cost=True, fp_cost=True))
])
param_grid = {
'model__estimator__learning_rate': np.logspace(-5, 0, 5),
}
scorer = make_scorer(
expected_cost_loss,
response_method='predict_proba',
greater_is_better=False,
normalize=True
)
scorer = scorer.set_score_request(fn_cost=True, fp_cost=True)
grid_search = GridSearchCV(pipeline, param_grid=param_grid, scoring=scorer)
grid_search.fit(X, y, fn_cost=fn_cost, fp_cost=fp_cost)
References
----------
.. [1] Höppner, S., Baesens, B., Verbeke, W., & Verdonck, T. (2022).
Instance-dependent cost-sensitive learning for detecting transfer fraud.
European Journal of Operational Research, 297(1), 291-300.
"""
_parameter_constraints: ClassVar[ParameterConstraint] = {
'estimator': [HasMethods(['fit', 'predict_proba']), None],
**CostSensitiveClassifier._parameter_constraints,
}
def __init__(
self,
estimator: XGBClassifier | LGBMClassifier | CatBoostClassifier | None = None,
*,
tp_cost: FloatArrayLike | float = 0.0,
tn_cost: FloatArrayLike | float = 0.0,
fn_cost: FloatArrayLike | float = 0.0,
fp_cost: FloatArrayLike | float = 0.0,
loss: Metric | None = None,
) -> None:
self.estimator = estimator
super().__init__(tp_cost=tp_cost, tn_cost=tn_cost, fp_cost=fp_cost, fn_cost=fn_cost, loss=loss)
[docs]
def fit(
self,
X: FloatArrayLike,
y: ArrayLike,
*,
tp_cost: FloatArrayLike | float | Parameter = Parameter.UNCHANGED,
tn_cost: FloatArrayLike | float | Parameter = Parameter.UNCHANGED,
fn_cost: FloatArrayLike | float | Parameter = Parameter.UNCHANGED,
fp_cost: FloatArrayLike | float | Parameter = Parameter.UNCHANGED,
fit_params: dict[str, Any] | None = None,
**loss_params: Any,
) -> Self:
"""
Fit the model.
Parameters
----------
X : array-like of shape (n_samples, n_features)
y : array-like of shape (n_samples,)
tp_cost : float or array-like, shape=(n_samples,), default=$UNCHANGED$
Cost of true positives. If ``float``, then all true positives have the same cost.
If array-like, then it is the cost of each true positive classification.
fp_cost : float or array-like, shape=(n_samples,), default=$UNCHANGED$
Cost of false positives. If ``float``, then all false positives have the same cost.
If array-like, then it is the cost of each false positive classification.
tn_cost : float or array-like, shape=(n_samples,), default=$UNCHANGED$
Cost of true negatives. If ``float``, then all true negatives have the same cost.
If array-like, then it is the cost of each true negative classification.
fn_cost : float or array-like, shape=(n_samples,), default=$UNCHANGED$
Cost of false negatives. If ``float``, then all false negatives have the same cost.
If array-like, then it is the cost of each false negative classification.
fit_params : dict
Additional keyword arguments to pass to the estimator's fit method.
loss_params : dict
Additional keyword arguments to pass to the loss function if using a custom loss function.
Returns
-------
self : CSBoostClassifier
Fitted CSBoost model.
"""
super().fit(
X,
y,
tp_cost=tp_cost,
tn_cost=tn_cost,
fn_cost=fn_cost,
fp_cost=fp_cost,
fit_params=fit_params,
**loss_params,
)
return self
def _fit(
self,
X: FloatNDArray,
y: IntNDArray,
loss: Metric,
*,
fit_params: dict[str, Any] | None = None,
**loss_params: Any,
) -> Self:
if fit_params is None:
fit_params = {}
# allow sample weights still to be passed as kwargs to comply with sklearn interface
if 'sample_weight' in loss_params:
fit_params['sample_weight'] = loss_params.pop('sample_weight')
if self.estimator is None:
self._initialize_default_estimator(y=y, loss=loss, **loss_params)
else:
self._initialize_custom_estimator(y=y, loss=loss, **loss_params)
if not isinstance(XGBClassifier, TypeVar) and isinstance(self.estimator_, XGBClassifier):
self.estimator_.fit(X, y, **fit_params)
elif not isinstance(LGBMClassifier, TypeVar) and isinstance(self.estimator_, LGBMClassifier):
self.estimator_.fit(X, y, init_score=np.full(y.shape, _BASE_SCORE), **fit_params)
elif not isinstance(CatBoostClassifier, TypeVar) and isinstance(self.estimator_, CatBoostClassifier):
indices = np.arange(X.shape[0])
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message='Can\'t optimize method "calc_ders_range" because self argument is used',
category=UserWarning,
)
warnings.filterwarnings(
'ignore',
message='Can\'t optimize method "evaluate" because self argument is used',
category=UserWarning,
)
if 'sample_weight' in fit_params:
raise ValueError('Sample weights are not allowed when training CatBoostClassifier.')
self.estimator_.fit(X, y, sample_weight=indices, baseline=np.full(y.shape, _BASE_SCORE), **fit_params)
else:
raise TypeError('Estimator must be an instance of XGBClassifier, LGBMClassifier, or CatBoostClassifier')
return self
def _initialize_default_estimator(
self,
y: FloatNDArray,
loss: Metric,
**loss_params: Any,
) -> None:
if isinstance(XGBClassifier, TypeVar):
raise ImportError( # noqa: TRY004
f'XGBoost package is required to use {type(self).__name__}. '
'Install optional dependencies through `pip install empulse[optional]` or '
'`pip install xgboost`'
)
objective = self._get_objective('xgboost', y, loss=loss, **loss_params)
self.estimator_ = XGBClassifier(objective=objective, base_score=_BASE_SCORE)
def _initialize_custom_estimator(
self,
y: FloatNDArray,
loss: Metric,
**loss_params: Any,
) -> None:
if not isinstance(XGBClassifier, TypeVar) and isinstance(self.estimator, XGBClassifier):
objective = self._get_objective('xgboost', y=y, loss=loss, **loss_params)
self.estimator_ = clone(self.estimator).set_params(objective=objective, base_score=_BASE_SCORE)
elif not isinstance(LGBMClassifier, TypeVar) and isinstance(self.estimator, LGBMClassifier):
objective = self._get_objective('lightgbm', y=y, loss=loss, **loss_params)
self.estimator_ = clone(self.estimator).set_params(objective=objective)
elif not isinstance(CatBoostClassifier, TypeVar) and isinstance(self.estimator, CatBoostClassifier):
# self._initialize_catboost_estimator(tp_cost, tn_cost, fn_cost, fp_cost, **loss_params)
loss_function, eval_metric = self._get_objective('catboost', y=y, loss=loss, **loss_params)
self.estimator_ = clone(self.estimator).set_params(loss_function=loss_function, eval_metric=eval_metric)
else:
raise TypeError('Estimator must be an instance of XGBClassifier, LGBMClassifier, or CatBoostClassifier')
@overload
def _get_objective(
self,
framework: Literal['xgboost'],
y: FloatNDArray,
loss: Metric,
**loss_params: Any,
) -> Callable[..., Any]: ...
@overload
def _get_objective(
self,
framework: Literal['lightgbm'],
y: FloatNDArray,
loss: Metric,
**loss_params: Any,
) -> LGBMObjective | LGBMMetricObjective: ...
@overload
def _get_objective(
self,
framework: Literal['catboost'],
y: FloatNDArray,
loss: Metric,
**loss_params: Any,
) -> tuple['CatBoostObjective', 'CatBoostMetric']: ...
def _get_objective(
self,
framework: Literal['xgboost', 'lightgbm', 'catboost'],
y: FloatNDArray,
loss: Metric,
**loss_params: Any,
) -> Callable[..., Any] | LGBMObjective | LGBMMetricObjective | tuple['CatBoostObjective', 'CatBoostMetric']:
# MaxProfit for boosting requires dynamic thresholding from current round predictions,
# so we evaluate gradients/hessians directly from the metric each iteration.
if loss.strategy.name == 'max profit':
if framework == 'xgboost':
return partial(loss._gradient_boost_objective, **loss_params)
if framework == 'lightgbm':
return LGBMMetricObjective(loss, **loss_params)
loss_params = {
name: np.full(y.shape, param) if np.isscalar(param) else param.reshape(-1)
for name, param in loss_params.items()
}
return CatBoostObjective(loss, **loss_params), CatBoostMetric(loss, **loss_params)
if framework == 'xgboost':
# return partial(self.loss._gradient_boost_objective, **loss_params)
grad_const = loss._prepare_boost_objective(y, **loss_params).reshape(-1)
return partial(cy_boost_grad_hess, grad_const=grad_const)
elif framework == 'lightgbm':
grad_const = loss._prepare_boost_objective(y, **loss_params).reshape(-1)
return LGBMObjective(grad_const)
else:
grad_const = loss._prepare_boost_objective(y, **loss_params).reshape(-1)
# normalize the shape of all loss params to be (n_samples,)
loss_params = {
name: np.full(y.shape, param) if np.isscalar(param) else param.reshape(-1)
for name, param in loss_params.items()
}
return CatBoostObjective(grad_const), CatBoostMetric(loss, **loss_params)
[docs]
def predict_proba(self, X: ArrayLike) -> FloatNDArray:
"""
Predict class probabilities for X.
Parameters
----------
X : 2D numpy.ndarray, shape=(n_samples, n_features)
Returns
-------
y_pred : 2D numpy.ndarray, shape=(n_samples, n_classes)
Predicted class probabilities.
"""
check_is_fitted(self)
X = validate_data(self, X, reset=False)
if LGBMClassifier is not None and isinstance(self.estimator_, LGBMClassifier):
y_proba: FloatNDArray = self.estimator_.predict_proba(X, raw_score=True)
y_proba: FloatNDArray = expit(y_proba)
return np.column_stack([1 - y_proba, y_proba])
y_proba: FloatNDArray = self.estimator_.predict_proba(X) # type: ignore[no-redef]
return y_proba
class CatBoostObjective:
"""AEC objective for catboost."""
def __init__(self, metric_or_gradient_const: Metric | FloatNDArray, **loss_params: FloatNDArray | float):
self.metric = metric_or_gradient_const if isinstance(metric_or_gradient_const, Metric) else None
self.gradient_const = metric_or_gradient_const if isinstance(metric_or_gradient_const, np.ndarray) else None
self.loss_params = loss_params
def calc_ders_range(
self, predictions: Sequence[float], targets: FloatNDArray, weights: FloatNDArray
) -> list[tuple[float, float]]:
"""
Compute first and second derivative of the loss function with respect to the predicted value for each object.
Parameters
----------
predictions : indexed container of floats
Current predictions for each object.
targets : indexed container of floats
Target values you provided with the dataset.
weights : float, optional (default=None)
Instance weight. Here instance weights are used to pass the indices of the instances, not actual weights.
Returns
-------
der1 : list-like object of float
der2 : list-like object of float
"""
weights = weights.astype(int)
predictions = np.array(predictions, dtype=np.float64)
if self.metric is not None:
# Use weights as a proxy to index instance-dependent parameters.
loss_params = {
name: value[weights] if isinstance(value, np.ndarray) else value
for (name, value) in self.loss_params.items()
}
gradient, hessian = self.metric._gradient_boost_objective(targets, predictions, **loss_params)
else:
gradient_const = self.gradient_const[weights] # type: ignore[index]
gradient, hessian = cy_boost_grad_hess(targets, predictions, gradient_const)
# convert from two arrays to one list of tuples
gradient_f = np.asarray(gradient, dtype=np.float32)
hessian_f = np.asarray(hessian, dtype=np.float32)
return list(zip(-gradient_f, -hessian_f, strict=False))
class CatBoostMetric:
"""AEC metric for catboost."""
def __init__(self, metric: Callable[..., float], **loss_params: FloatNDArray | float):
self.metric = metric
self.loss_params = loss_params
def is_max_optimal(self) -> bool:
"""Return whether great values of metric are better."""
return False
def evaluate(
self, predictions: Sequence[float], targets: Sequence[float], weights: FloatNDArray
) -> tuple[float, float]:
"""
Evaluate metric value.
Parameters
----------
approxes : list of indexed containers (containers with only __len__ and __getitem__ defined) of float
Vectors of approx labels.
targets : one dimensional indexed container of float
Vectors of true labels.
weights : one dimensional indexed container of float, optional (default=None)
Weight for each instance.
Here instance weights are used to pass the indices of the instances, not actual weights.
Returns
-------
weighted error : float
total weight : float
"""
weights = weights.astype(int)
# Use weights as a proxy to index the costs
loss_params = {
name: value[weights] if isinstance(value, np.ndarray) else value
for (name, value) in self.loss_params.items()
}
y_proba = expit(predictions)
return self.metric(targets, y_proba, **loss_params), 1
def get_final_error(self, error: float, weight: float) -> float:
"""
Return final value of metric based on error and weight.
Parameters
----------
error : float
Sum of errors in all instances.
weight : float
Sum of weights of all instances.
Returns
-------
metric value : float
"""
return error