Source code for empulse.models.cost_sensitive.cslogit

import warnings
from collections.abc import Callable
from typing import Any, ClassVar, Self

import numpy as np
from scipy.optimize import OptimizeResult, minimize
from sklearn.exceptions import ConvergenceWarning

from ..._types import FloatArrayLike, FloatNDArray, IntNDArray, ParameterConstraint
from ...metrics import Metric
from .._base import BaseLogitClassifier, OptimizeFn

GradientLossFn = Callable[[FloatNDArray, FloatNDArray, FloatNDArray], tuple[float, FloatNDArray]]
ObjectiveFn = Callable[..., float | tuple[float, FloatNDArray] | tuple[float, FloatNDArray, FloatNDArray]]


[docs] class CSLogitClassifier(BaseLogitClassifier): """ Cost-sensitive logistic regression classifier. Read more in the :ref:`User Guide <cslogit>`. .. seealso:: :class:`~empulse.models.CSBoostClassifier` : Cost-sensitive gradient boosting classifier. :class:`~empulse.models.CSTreeClassifier` : Cost-sensitive decision tree classifier. :class:`~empulse.models.CSForestClassifier` : Cost-sensitive random forest classifier. Parameters ---------- tp_cost : float or array-like, shape=(n_samples,), default=0.0 Cost of true positives. If ``float``, then all true positives have the same cost. If array-like, then it is the cost of each true positive classification. Is overwritten if another `tp_cost` is passed to the ``fit`` method. .. note:: It is not recommended to pass instance-dependent costs to the ``__init__`` method. Instead, pass them to the ``fit`` method. fp_cost : float or array-like, shape=(n_samples,), default=0.0 Cost of false positives. If ``float``, then all false positives have the same cost. If array-like, then it is the cost of each false positive classification. Is overwritten if another `fp_cost` is passed to the ``fit`` method. .. note:: It is not recommended to pass instance-dependent costs to the ``__init__`` method. Instead, pass them to the ``fit`` method. tn_cost : float or array-like, shape=(n_samples,), default=0.0 Cost of true negatives. If ``float``, then all true negatives have the same cost. If array-like, then it is the cost of each true negative classification. Is overwritten if another `tn_cost` is passed to the ``fit`` method. .. note:: It is not recommended to pass instance-dependent costs to the ``__init__`` method. Instead, pass them to the ``fit`` method. fn_cost : float or array-like, shape=(n_samples,), default=0.0 Cost of false negatives. If ``float``, then all false negatives have the same cost. If array-like, then it is the cost of each false negative classification. Is overwritten if another `fn_cost` is passed to the ``fit`` method. .. note:: It is not recommended to pass instance-dependent costs to the ``__init__`` method. Instead, pass them to the ``fit`` method. loss : :class:`empulse.metrics.Metric`, default=None Loss function which should be optimized. - If :class:`~empulse.metrics.Metric`, metric parameters are passed as ``loss_params`` to the :meth:`~empulse.models.CSLogitClassifier.fit` method. C : float, default=1.0 Inverse of regularization strength; must be a positive ``float``. Like in support vector machines, smaller values specify stronger regularization. fit_intercept : bool, default=True Specifies if a constant (a.k.a. bias or intercept) should be added to the decision function. soft_threshold : bool, default=False If ``True``, apply soft-thresholding to the regression coefficients. l1_ratio : float, default=1.0 The ElasticNet mixing parameter, with ``0 <= l1_ratio <= 1``. - For ``l1_ratio = 0`` the penalty is a L2 penalty. - For ``l1_ratio = 1`` it is a L1 penalty. - For ``0 < l1_ratio < 1``, the penalty is a combination of L1 and L2. optimize_fn : Callable, optional Optimization algorithm. Should be a Callable with signature ``optimize(objective, X)``. See :ref:`proflogit` for more information. optimizer_params : dict[str, Any], optional Additional keyword arguments passed to `optimize_fn`. Attributes ---------- classes_ : numpy.ndarray Unique classes in the target found during fit. result_ : :class:`scipy:scipy.optimize.OptimizeResult` Optimization result. coef_ : numpy.ndarray, shape=(n_features,) Coefficients of the logit model. intercept_ : float Intercept of the logit model. Only available when ``fit_intercept=True``. Examples -------- .. code-block:: python import numpy as np from empulse.models import CSLogitClassifier from sklearn.datasets import make_classification X, y = make_classification() fn_cost = np.random.rand(y.size) # instance-dependent cost fp_cost = 5 # constant cost model = CSLogitClassifier(C=0.1) model.fit(X, y, fn_cost=fn_cost, fp_cost=fp_cost) y_proba = model.predict_proba(X) Example with passing instance-dependent costs through cross-validation: .. code-block:: python import numpy as np from empulse.models import CSLogitClassifier from sklearn import set_config from sklearn.datasets import make_classification from sklearn.model_selection import cross_val_score from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler set_config(enable_metadata_routing=True) X, y = make_classification() fn_cost = np.random.rand(y.size) fp_cost = 5 pipeline = Pipeline([ ('scaler', StandardScaler()), ('model', CSLogitClassifier(C=0.1).set_fit_request(fn_cost=True, fp_cost=True)), ]) cross_val_score(pipeline, X, y, params={'fn_cost': fn_cost, 'fp_cost': fp_cost}) Example with passing instance-dependent costs through a grid search: .. code-block:: python import numpy as np from empulse.metrics import expected_cost_loss from empulse.models import CSLogitClassifier from sklearn import set_config from sklearn.datasets import make_classification from sklearn.model_selection import GridSearchCV from sklearn.metrics import make_scorer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler set_config(enable_metadata_routing=True) X, y = make_classification(n_samples=50) fn_cost = np.random.rand(y.size) fp_cost = 5 pipeline = Pipeline([ ('scaler', StandardScaler()), ('model', CSLogitClassifier().set_fit_request(fn_cost=True, fp_cost=True)), ]) param_grid = {'model__C': np.logspace(-5, 2, 5)} scorer = make_scorer( expected_cost_loss, response_method='predict_proba', greater_is_better=False, normalize=True, ) scorer = scorer.set_score_request(fn_cost=True, fp_cost=True) grid_search = GridSearchCV(pipeline, param_grid=param_grid, scoring=scorer) grid_search.fit(X, y, fn_cost=fn_cost, fp_cost=fp_cost) References ---------- .. [1] Höppner, S., Baesens, B., Verbeke, W., & Verdonck, T. (2022). Instance-dependent cost-sensitive learning for detecting transfer fraud. European Journal of Operational Research, 297(1), 291-300. """ _parameter_constraints: ClassVar[ParameterConstraint] = { **BaseLogitClassifier._parameter_constraints, } def __init__( self, *, tp_cost: FloatArrayLike | float = 0.0, tn_cost: FloatArrayLike | float = 0.0, fn_cost: FloatArrayLike | float = 0.0, fp_cost: FloatArrayLike | float = 0.0, loss: Metric | None = None, C: float = 1.0, fit_intercept: bool = True, soft_threshold: bool = False, l1_ratio: float = 1.0, optimize_fn: OptimizeFn | None = None, optimizer_params: dict[str, Any] | None = None, ): super().__init__( tp_cost=tp_cost, tn_cost=tn_cost, fn_cost=fn_cost, fp_cost=fp_cost, C=C, fit_intercept=fit_intercept, soft_threshold=soft_threshold, l1_ratio=l1_ratio, loss=loss, optimize_fn=optimize_fn, optimizer_params=optimizer_params, ) def _get_metric_loss(self) -> Metric | None: """Get the metric loss function if available.""" if isinstance(self.loss, Metric): return self.loss return None def _fit_estimator(self, X: FloatNDArray, y: IntNDArray, loss: Metric, **loss_params: Any) -> Self: optimizer_params = self.optimizer_params or {} objective = loss._logit_objective( features=X, y_true=y, C=self.C, l1_ratio=self.l1_ratio, soft_threshold=self.soft_threshold, fit_intercept=self.fit_intercept, **loss_params, ) optimize_fn: Callable[..., OptimizeResult] = ( _optimize_jacobian if self.optimize_fn is None else self.optimize_fn ) # type: ignore[no-redef] self.result_ = optimize_fn(objective=objective, X=X, **optimizer_params) self.coef_ = self.result_.x[1:] if self.fit_intercept else self.result_.x if self.fit_intercept: self.intercept_ = self.result_.x[0] return self
def _optimize_jacobian( objective: Callable[[FloatNDArray], tuple[float, FloatNDArray]], X: FloatNDArray, max_iter: int = 1000, tolerance: float = 1e-4, **kwargs: Any, ) -> OptimizeResult: initial_weights = np.zeros(X.shape[1], order='F', dtype=X.dtype) result = minimize( objective, initial_weights, method='L-BFGS-B', jac=True, options={ 'maxiter': max_iter, 'maxls': 50, 'gtol': tolerance, 'ftol': 64 * np.finfo(float).eps, }, **kwargs, ) _check_optimize_result(result) return result def _check_optimize_result(result: OptimizeResult) -> None: """ Check the OptimizeResult for successful convergence. Parameters ---------- result : OptimizeResult Result of the scipy.optimize.minimize function. """ # handle both scipy and scikit-learn solver names if result.status != 0: try: # The message is already decoded in scipy>=1.6.0 result_message = result.message.decode('latin1') except AttributeError: result_message = result.message warning_msg = ( f'L-BFGS failed to converge (status={result.status}):\n{result_message}.\n\n' 'Increase the number of iterations (max_iter) ' 'or scale the data as shown in:\n' ' https://scikit-learn.org/stable/modules/' 'preprocessing.html' ) warnings.warn(warning_msg, ConvergenceWarning, stacklevel=2)