from collections.abc import Callable
from functools import partial
from itertools import islice
from numbers import Integral
from typing import Any, ClassVar, Self
import numpy as np
from scipy.optimize import OptimizeResult
from scipy.special import expit
from empulse.optimizers import Generation
from .._types import FloatArrayLike, FloatNDArray, IntNDArray, ParameterConstraint
from ..metrics import MaxProfit, Metric
from ..metrics.metric.common import Direction
from ._base import BaseLogitClassifier, OptimizeFn
from .csclassifier import MetricStrategyFactory
[docs]
class ProfLogitClassifier(BaseLogitClassifier):
"""
Profit-driven logistic regression classifier.
Maximizing empirical cost-sensitive/value-driven metric
by optimizing the regression coefficients of the logistic model through a Real-coded Genetic Algorithm (RGA).
Read more in the :ref:`User Guide <proflogit>`.
Parameters
----------
tp_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of true positives. If ``float``, then all true positives have the same cost.
If array-like, then it is the cost of each true positive classification.
Is overwritten if another `tp_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
fp_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of false positives. If ``float``, then all false positives have the same cost.
If array-like, then it is the cost of each false positive classification.
Is overwritten if another `fp_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
fp_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of false positives. If ``float``, then all false positives have the same cost.
If array-like, then it is the cost of each false positive classification.
Is overwritten if another `fp_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
fn_cost : float or array-like, shape=(n_samples,), default=0.0
Cost of false negatives. If ``float``, then all false negatives have the same cost.
If array-like, then it is the cost of each false negative classification.
Is overwritten if another `fn_cost` is passed to the ``fit`` method.
.. note::
It is not recommended to pass instance-dependent costs to the ``__init__`` method.
Instead, pass them to the ``fit`` method.
loss : :class:`empulse.metrics.Metric` or None, default=None
Loss function to optimize.
If :class`~empulse.metrics.Metric`, metric parameters are passed as ``loss_params``
to the :Meth:`~empulse.models.ProfLogitClassifier.fit` method.
If ``None``, the loss is set to the Maximum Profit score.
C : float, default=1.0
Inverse of regularization strength; must be a positive ``float``.
Like in support vector machines, smaller values specify stronger regularization.
fit_intercept : bool, default=True
Specifies if a constant (a.k.a. bias or intercept) should be added to the decision function.
soft_threshold : bool, default=False
If ``True``, apply soft-thresholding to the regression coefficients.
l1_ratio : float, default=1.0
The ElasticNet mixing parameter, with ``0 <= l1_ratio <= 1``.
For ``l1_ratio = 0`` the penalty is a L2 penalty.
For ``l1_ratio = 1`` it is a L1 penalty.
For ``0 < l1_ratio < 1``, the penalty is a combination of L1 and L2.
optimize_fn : Callable, optional
Optimization algorithm. Should be a Callable with signature ``optimize(objective, X)``.
See :ref:`proflogit` for more information.
optimizer_params : dict[str, Any], optional
Additional keyword arguments passed to `optimize_fn`.
By default, the optimizer is a Real-coded Genetic Algorithm (RGA) with the following parameters:
- ``max_iter`` : int, default=1000
Maximum number of iterations.
- ``patience`` : int, default=250
Number of iterations with no improvement to wait before stopping the optimization.
- ``tolerance`` : float, default=1e-4
Relative tolerance to declare convergence.
- ``bounds`` : tuple[float, float], default=(-5, 5)
Lower and upper bounds for the regression coefficients.
- all other parameters are passed to the :class:`~empulse.optimizers.Generation` initializer.
n_jobs : int, optional
Number of parallel jobs to run.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors.
Attributes
----------
classes_ : numpy.ndarray
Unique classes in the target found during fit.
result_ : :class:`scipy:scipy.optimize.OptimizeResult`
Optimization result.
coef_ : numpy.ndarray
Coefficients of the logit model.
intercept_ : float
Intercept of the logit model.
Only available when ``fit_intercept=True``.
Examples
--------
.. code-block:: python
from empulse.models import ProfLogitClassifier
from sklearn.datasets import make_classification
X, y = make_classification(n_features=4)
model = ProfLogitClassifier(C=0.1, l1_ratio=0.5, optimizer_params={'max_iter': 10})
model.fit(X, y, tp_cost=-200, fp_cost=10)
References
----------
.. [1] Stripling, E., vanden Broucke, S., Antonio, K., Baesens, B. and
Snoeck, M. (2017). Profit Maximizing Logistic Model for
Customer Churn Prediction Using Genetic Algorithms.
Swarm and Evolutionary Computation.
.. [2] Stripling, E., vanden Broucke, S., Antonio, K., Baesens, B. and
Snoeck, M. (2015). Profit Maximizing Logistic Regression Modeling for
Customer Churn Prediction. IEEE International Conference on
Data Science and Advanced Analytics (DSAA) (pp. 1–10). Paris, France.
"""
_parameter_constraints: ClassVar[ParameterConstraint] = {
**BaseLogitClassifier._parameter_constraints,
'loss': [Metric, None],
'n_jobs': [None, Integral],
}
_default_metric_strategy: ClassVar[MetricStrategyFactory] = MaxProfit
def __init__(
self,
*,
tp_cost: FloatArrayLike | float = 0.0,
tn_cost: FloatArrayLike | float = 0.0,
fn_cost: FloatArrayLike | float = 0.0,
fp_cost: FloatArrayLike | float = 0.0,
loss: Metric | None = None,
C: float = 1.0,
fit_intercept: bool = True,
soft_threshold: bool = False,
l1_ratio: float = 1.0,
optimize_fn: OptimizeFn | None = None,
optimizer_params: dict[str, Any] | None = None,
n_jobs: int | None = None,
):
super().__init__(
tp_cost=tp_cost,
tn_cost=tn_cost,
fn_cost=fn_cost,
fp_cost=fp_cost,
C=C,
fit_intercept=fit_intercept,
soft_threshold=soft_threshold,
l1_ratio=l1_ratio,
loss=loss,
optimize_fn=optimize_fn,
optimizer_params=optimizer_params,
)
self.n_jobs = n_jobs
def _fit_estimator(self, X: FloatNDArray, y: IntNDArray, loss: Metric, **loss_params: Any) -> Self:
optimizer_params = {} if self.optimizer_params is None else self.optimizer_params.copy()
optimize_fn: OptimizeFn = _optimize if self.optimize_fn is None else self.optimize_fn
optimize_fn = partial(optimize_fn, **optimizer_params)
if loss.direction == Direction.MINIMIZE:
_loss = loss # noqa: RUF052
loss = lambda *args, **kwargs: -_loss(*args, **kwargs) # type: ignore[assignment]
objective = partial(
_objective,
X=X,
y=y,
loss_fn=partial(loss, **loss_params),
C=self.C,
l1_ratio=self.l1_ratio,
soft_threshold=self.soft_threshold,
fit_intercept=self.fit_intercept,
)
self.result_ = optimize_fn(objective, X)
if self.fit_intercept:
self.intercept_ = self.result_.x[0]
self.coef_ = self.result_.x[1:]
else:
self.coef_ = self.result_.x
return self
def _get_metric_loss(self) -> Metric | None:
"""Get the metric loss function if available."""
if isinstance(self.loss, Metric):
return self.loss
return None
def _objective(
weights: FloatNDArray,
X: FloatNDArray,
y: IntNDArray,
loss_fn: Callable[[FloatNDArray, FloatNDArray], float],
C: float,
l1_ratio: float,
soft_threshold: bool,
fit_intercept: bool,
) -> float:
"""ProfLogit's objective function (maximization problem)."""
# b is the vector holding the regression coefficients (no intercept)
b = weights.copy()[1:] if fit_intercept else weights
if soft_threshold:
threshold = l1_ratio / C
b = np.sign(b) * np.maximum(np.abs(b) - threshold, 0)
logits = np.dot(X, weights)
y_pred = expit(logits) # Invert logit transformation
loss = loss_fn(y, y_pred)
regularization_term = 0.5 * (1 - l1_ratio) * np.sum(b**2) + l1_ratio * np.sum(np.abs(b))
penalty = regularization_term / C
return float(loss - penalty)
def _optimize(
objective: Callable[[FloatNDArray], float],
X: FloatNDArray,
max_iter: int = 1000,
tolerance: float = 1e-4,
patience: int = 250,
bounds: tuple[float | int, float | int] = (-5, 5),
**kwargs: Any,
) -> OptimizeResult:
rga = Generation(**kwargs)
previous_score = np.inf
iter_stagnant = 0
bounds_per_instance = [bounds] * X.shape[1]
for _ in islice(rga.optimize(objective, bounds_per_instance), max_iter):
score = rga.result.fun
relative_improvement = (score - previous_score) / previous_score if previous_score != np.inf else np.inf
previous_score = score
if relative_improvement < tolerance:
if (iter_stagnant := iter_stagnant + 1) >= patience:
rga.result.message = 'Converged.' # type: ignore[attr-defined]
rga.result.success = True # type: ignore[attr-defined]
break
else:
iter_stagnant = 0
else:
rga.result.message = 'Maximum number of iterations reached.' # type: ignore[attr-defined]
rga.result.success = False # type: ignore[attr-defined]
return rga.result