import warnings
from collections.abc import Callable
from typing import Any, ClassVar, Self
import numpy as np
from scipy.optimize import OptimizeResult, minimize
from sklearn.exceptions import ConvergenceWarning
from ..._types import FloatArrayLike, FloatNDArray, IntNDArray, ParameterConstraint
from ...metrics import Metric
from .._base import BaseLogitClassifier, OptimizeFn
GradientLossFn = Callable[[FloatNDArray, FloatNDArray, FloatNDArray], tuple[float, FloatNDArray]]
ObjectiveFn = Callable[..., float | tuple[float, FloatNDArray] | tuple[float, FloatNDArray, FloatNDArray]]
[docs]
class CSLogitClassifier(BaseLogitClassifier):
"""
Cost-sensitive logistic regression classifier.
Read more in the :ref:`User Guide <cslogit>`.
.. seealso::
:class:`~empulse.models.CSBoostClassifier` : Cost-sensitive gradient boosting classifier.
:class:`~empulse.models.CSTreeClassifier` : Cost-sensitive decision tree classifier.
:class:`~empulse.models.CSForestClassifier` : Cost-sensitive random forest classifier.
Parameters
----------
tp_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of true positives. If ``float``, then all true positives have the same cost.
If array-like, then it is the cost of each true positive classification.
Is overwritten if another `tp_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
fp_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of false positives. If ``float``, then all false positives have the same cost.
If array-like, then it is the cost of each false positive classification.
Is overwritten if another `fp_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
tn_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of true negatives. If ``float``, then all true negatives have the same cost.
If array-like, then it is the cost of each true negative classification.
Is overwritten if another `tn_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
fn_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of false negatives. If ``float``, then all false negatives have the same cost.
If array-like, then it is the cost of each false negative classification.
Is overwritten if another `fn_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
loss : :class:`empulse.metrics.Metric`, default=None
Loss function which should be optimized.
- If :class:`~empulse.metrics.Metric`, metric parameters are passed as ``loss_params``
to the :meth:`~empulse.models.CSLogitClassifier.fit` method.
C : float, default=1.0
Inverse of regularization strength; must be a positive ``float``.
Like in support vector machines, smaller values specify stronger regularization.
fit_intercept : bool, default=True
Specifies if a constant (a.k.a. bias or intercept) should be added to the decision function.
soft_threshold : bool, default=False
If ``True``, apply soft-thresholding to the regression coefficients.
l1_ratio : float, default=1.0
The ElasticNet mixing parameter, with ``0 <= l1_ratio <= 1``.
- For ``l1_ratio = 0`` the penalty is a L2 penalty.
- For ``l1_ratio = 1`` it is a L1 penalty.
- For ``0 < l1_ratio < 1``, the penalty is a combination of L1 and L2.
optimize_fn : Callable, optional
Optimization algorithm. Should be a Callable with signature ``optimize(objective, X)``.
See :ref:`proflogit` for more information.
optimizer_params : dict[str, Any], optional
Additional keyword arguments passed to `optimize_fn`.
Attributes
----------
classes_ : numpy.ndarray
Unique classes in the target found during fit.
result_ : :class:`scipy:scipy.optimize.OptimizeResult`
Optimization result.
coef_ : numpy.ndarray, shape=(n_features,)
Coefficients of the logit model.
intercept_ : float
Intercept of the logit model.
Only available when ``fit_intercept=True``.
Examples
--------
.. code-block:: python
import numpy as np
from empulse.models import CSLogitClassifier
from sklearn.datasets import make_classification
X, y = make_classification()
fn_cost = np.random.rand(y.size) # instance-dependent cost
fp_cost = 5 # constant cost
model = CSLogitClassifier(C=0.1)
model.fit(X, y, fn_cost=fn_cost, fp_cost=fp_cost)
y_proba = model.predict_proba(X)
Example with passing instance-dependent costs through cross-validation:
.. code-block:: python
import numpy as np
from empulse.models import CSLogitClassifier
from sklearn import set_config
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
set_config(enable_metadata_routing=True)
X, y = make_classification()
fn_cost = np.random.rand(y.size)
fp_cost = 5
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', CSLogitClassifier(C=0.1).set_fit_request(fn_cost=True, fp_cost=True)),
])
cross_val_score(pipeline, X, y, params={'fn_cost': fn_cost, 'fp_cost': fp_cost})
Example with passing instance-dependent costs through a grid search:
.. code-block:: python
import numpy as np
from empulse.metrics import expected_cost_loss
from empulse.models import CSLogitClassifier
from sklearn import set_config
from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
set_config(enable_metadata_routing=True)
X, y = make_classification(n_samples=50)
fn_cost = np.random.rand(y.size)
fp_cost = 5
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', CSLogitClassifier().set_fit_request(fn_cost=True, fp_cost=True)),
])
param_grid = {'model__C': np.logspace(-5, 2, 5)}
scorer = make_scorer(
expected_cost_loss,
response_method='predict_proba',
greater_is_better=False,
normalize=True,
)
scorer = scorer.set_score_request(fn_cost=True, fp_cost=True)
grid_search = GridSearchCV(pipeline, param_grid=param_grid, scoring=scorer)
grid_search.fit(X, y, fn_cost=fn_cost, fp_cost=fp_cost)
References
----------
.. [1] Höppner, S., Baesens, B., Verbeke, W., & Verdonck, T. (2022).
Instance-dependent cost-sensitive learning for detecting transfer fraud.
European Journal of Operational Research, 297(1), 291-300.
"""
_parameter_constraints: ClassVar[ParameterConstraint] = {
**BaseLogitClassifier._parameter_constraints,
}
def __init__(
self,
*,
tp_cost: FloatArrayLike | float = 0.0,
tn_cost: FloatArrayLike | float = 0.0,
fn_cost: FloatArrayLike | float = 0.0,
fp_cost: FloatArrayLike | float = 0.0,
loss: Metric | None = None,
C: float = 1.0,
fit_intercept: bool = True,
soft_threshold: bool = False,
l1_ratio: float = 1.0,
optimize_fn: OptimizeFn | None = None,
optimizer_params: dict[str, Any] | None = None,
):
super().__init__(
tp_cost=tp_cost,
tn_cost=tn_cost,
fn_cost=fn_cost,
fp_cost=fp_cost,
C=C,
fit_intercept=fit_intercept,
soft_threshold=soft_threshold,
l1_ratio=l1_ratio,
loss=loss,
optimize_fn=optimize_fn,
optimizer_params=optimizer_params,
)
def _get_metric_loss(self) -> Metric | None:
"""Get the metric loss function if available."""
if isinstance(self.loss, Metric):
return self.loss
return None
def _fit_estimator(self, X: FloatNDArray, y: IntNDArray, loss: Metric, **loss_params: Any) -> Self:
optimizer_params = self.optimizer_params or {}
objective = loss._logit_objective(
features=X,
y_true=y,
C=self.C,
l1_ratio=self.l1_ratio,
soft_threshold=self.soft_threshold,
fit_intercept=self.fit_intercept,
**loss_params,
)
optimize_fn: Callable[..., OptimizeResult] = (
_optimize_jacobian if self.optimize_fn is None else self.optimize_fn
) # type: ignore[no-redef]
self.result_ = optimize_fn(objective=objective, X=X, **optimizer_params)
self.coef_ = self.result_.x[1:] if self.fit_intercept else self.result_.x
if self.fit_intercept:
self.intercept_ = self.result_.x[0]
return self
def _optimize_jacobian(
objective: Callable[[FloatNDArray], tuple[float, FloatNDArray]],
X: FloatNDArray,
max_iter: int = 1000,
tolerance: float = 1e-4,
**kwargs: Any,
) -> OptimizeResult:
initial_weights = np.zeros(X.shape[1], order='F', dtype=X.dtype)
result = minimize(
objective,
initial_weights,
method='L-BFGS-B',
jac=True,
options={
'maxiter': max_iter,
'maxls': 50,
'gtol': tolerance,
'ftol': 64 * np.finfo(float).eps,
},
**kwargs,
)
_check_optimize_result(result)
return result
def _check_optimize_result(result: OptimizeResult) -> None:
"""
Check the OptimizeResult for successful convergence.
Parameters
----------
result : OptimizeResult
Result of the scipy.optimize.minimize function.
"""
# handle both scipy and scikit-learn solver names
if result.status != 0:
try:
# The message is already decoded in scipy>=1.6.0
result_message = result.message.decode('latin1')
except AttributeError:
result_message = result.message
warning_msg = (
f'L-BFGS failed to converge (status={result.status}):\n{result_message}.\n\n'
'Increase the number of iterations (max_iter) '
'or scale the data as shown in:\n'
' https://scikit-learn.org/stable/modules/'
'preprocessing.html'
)
warnings.warn(warning_msg, ConvergenceWarning, stacklevel=2)