Source code for empulse.metrics.churn.cost

from collections.abc import Callable, Sequence
from functools import partial, update_wrapper
from typing import Literal, overload

import numpy as np
from scipy.special import expit

from ..._types import FloatArrayLike, FloatNDArray
from ._validation import _validate_input_cost_loss_churn


@overload
def make_objective_churn(
    model: Literal['catboost'],
    *,
    accept_rate: float = 0.3,
    clv: float | FloatNDArray = 200,
    incentive_fraction: float | FloatNDArray = 0.05,
    contact_cost: float = 15,
) -> tuple['AECObjectiveChurn', 'AECMetricChurn']: ...


@overload
def make_objective_churn(
    model: Literal['xgboost', 'lightgbm'],
    *,
    accept_rate: float = 0.3,
    clv: float | FloatNDArray = 200,
    incentive_fraction: float | FloatNDArray = 0.05,
    contact_cost: float = 15,
) -> Callable[[FloatNDArray, FloatNDArray], tuple[FloatNDArray, FloatNDArray]]: ...


[docs] def make_objective_churn( model: Literal['xgboost', 'lightgbm', 'catboost'], *, accept_rate: float = 0.3, clv: float | FloatNDArray = 200, incentive_fraction: float | FloatNDArray = 0.05, contact_cost: float = 15, ) -> ( Callable[[FloatNDArray, FloatNDArray], tuple[FloatNDArray, FloatNDArray]] | tuple['AECObjectiveChurn', 'AECMetricChurn'] ): """ Create an objective function for the Expected Cost measure for customer churn. The objective function presumes a situation where identified churners are contacted and offered an incentive to remain customers. Only a fraction of churners accepts the incentive offer. For detailed information, consult the paper [1]_. Read more in the :ref:`User Guide <cost_functions>`. .. seealso:: :class:`~empulse.models.B2BoostClassifier` : Uses the instance-specific cost function as objective function. Parameters ---------- model : {'xgboost', 'lightgbm', 'catboost'} The model for which the objective function is created. - 'xgboost' : :class:`xgboost:xgboost.XGBClassifier` - 'lightgbm' : :class:`lightgbm:lightgbm.LGBMClassifier` - 'catboost' : :class:`catboost.CatBoostClassifier` accept_rate : float, default=0.3 Probability of a customer responding to the retention offer (``0 < accept_rate < 1``). clv : float or 1D array, shape=(n_samples), default=200 If ``float``: constant customer lifetime value per retained customer (``clv > incentive_cost``). If ``array``: individualized customer lifetime value of each customer when retained (``mean(clv) > incentive_cost``). incentive_fraction : float, default=0.05 Cost of incentive offered to a customer, as a fraction of customer lifetime value (``0 < incentive_fraction < 1``). contact_cost : float, default=1 Constant cost of contact (``contact_cost > 0``). Returns ------- objective : Callable A custom objective function for :class:`xgboost:xgboost.XGBClassifier`. Examples -------- .. code-block:: python from xgboost import XGBClassifier from empulse.metrics import make_objective_churn objective = make_objective_churn(model='xgboost') clf = XGBClassifier(objective=objective, n_estimators=100, max_depth=3) Notes ----- The instance-specific cost function for customer churn is defined as [1]_: .. math:: C(s_i) = y_i[s_i(f-\\gamma (1-\\delta )CLV_i] + (1-y_i)[s_i(\\delta CLV_i + f)] The measure requires that the churn class is encoded as 0, and it is NOT interchangeable. However, this implementation assumes the standard notation ('churn': 1, 'no churn': 0). References ---------- .. [1] Janssens, B., Bogaert, M., Bagué, A., & Van den Poel, D. (2022). B2Boost: Instance-dependent profit-driven modelling of B2B churn. Annals of Operations Research, 1-27. """ if model == 'xgboost': objective: Callable[[FloatNDArray, FloatNDArray], tuple[FloatNDArray, FloatNDArray]] = partial( _objective, accept_rate=accept_rate, clv=clv, incentive_fraction=incentive_fraction, contact_cost=contact_cost, ) update_wrapper(objective, _objective) elif model == 'lightgbm': def objective(y_true: FloatNDArray, y_score: FloatNDArray) -> tuple[FloatNDArray, FloatNDArray]: """ Create an objective function for the churn AEC measure. Parameters ---------- y_true : np.ndarray Ground truth labels. y_score : np.ndarray Predicted values. Returns ------- gradient : np.ndarray Gradient of the objective function. hessian : np.ndarray Hessian of the objective function. """ return _objective( y_true, y_score, accept_rate=accept_rate, clv=clv, incentive_fraction=incentive_fraction, contact_cost=contact_cost, ) elif model == 'catboost': return ( AECObjectiveChurn( accept_rate=accept_rate, clv=clv, incentive_fraction=incentive_fraction, contact_cost=contact_cost, ), AECMetricChurn( accept_rate=accept_rate, clv=clv, incentive_fraction=incentive_fraction, contact_cost=contact_cost, ), ) else: raise ValueError(f"Expected model to be 'xgboost', 'lightgbm' or 'catboost', got {model} instead.") return objective
def _objective( y_true: FloatNDArray, y_score: FloatNDArray, accept_rate: float = 0.3, clv: float | FloatNDArray = 200, incentive_fraction: float | FloatNDArray = 0.05, contact_cost: float = 1, ) -> tuple[FloatNDArray, FloatNDArray]: """ Objective function for XGBoost to maximize the profit of a churn model. Parameters ---------- y_true : np.ndarray Ground truth labels (0 or 1). y_score : np.ndarray Predicted scores. Returns ------- gradient : np.ndarray Gradient of the objective function. hessian : np.ndarray Hessian of the objective function. """ y_proba = expit(y_score) incentive_cost = incentive_fraction * clv profits = ( contact_cost + incentive_cost + y_true * (accept_rate * incentive_cost - incentive_cost - clv * accept_rate) ) gradient = y_proba * (1 - y_proba) * profits hessian = np.abs((1 - 2 * y_proba) * gradient) return gradient, hessian class AECObjectiveChurn: """AEC churn objective for catboost.""" def __init__( self, accept_rate: float = 0.3, clv: float | FloatNDArray = 200, incentive_fraction: float | FloatNDArray = 0.05, contact_cost: float = 1, ): self.accept_rate = accept_rate self.clv = clv self.incentive_fraction = incentive_fraction self.contact_cost = contact_cost def calc_ders_range( self, predictions: Sequence[float], targets: FloatNDArray, weights: FloatNDArray ) -> list[tuple[float, float]]: """ Compute first and second derivative of the loss function with respect to the predicted value for each object. Parameters ---------- predictions : indexed container of floats Current predictions for each object. targets : indexed container of floats Target values you provided with the dataset. weights : float, optional (default=None) Instance weight. Returns ------- der1 : list-like object of float der2 : list-like object of float """ # Use weights as a proxy to index the costs weights = weights.astype(int) clv = self.clv[weights] if isinstance(self.clv, np.ndarray) else self.clv y_proba = expit(predictions) incentive_cost = self.incentive_fraction * clv profits = ( self.contact_cost + incentive_cost + targets * (self.accept_rate * incentive_cost - incentive_cost - clv * self.accept_rate) ) gradient = y_proba * (1 - y_proba) * profits hessian = np.abs((1 - 2 * y_proba) * gradient) return list(zip(-gradient, -hessian, strict=False)) class AECMetricChurn: """AEC churn metric for catboost.""" def __init__( self, accept_rate: float = 0.3, clv: float | FloatNDArray = 200, incentive_fraction: float | FloatNDArray = 0.05, contact_cost: float = 1, ): self.accept_rate = accept_rate self.clv = clv self.incentive_fraction = incentive_fraction self.contact_cost = contact_cost def is_max_optimal(self) -> bool: """Return whether great values of metric are better.""" return False def evaluate( self, predictions: Sequence[float], targets: FloatNDArray, weights: FloatNDArray ) -> tuple[float, float]: """ Evaluate metric value. Parameters ---------- predictions : list of indexed containers (containers with only __len__ and __getitem__ defined) of float Vectors of approx labels. targets : one dimensional indexed container of float Vectors of true labels. weights : one dimensional indexed container of float, optional (default=None) Weight for each instance. Returns ------- weighted error : float total weight : float """ # Use weights as a proxy to index the costs weights = weights.astype(int) clv = self.clv[weights] if isinstance(self.clv, np.ndarray) else self.clv y_proba = expit(predictions) return expected_cost_loss_churn( targets, y_proba, accept_rate=self.accept_rate, clv=clv, incentive_fraction=self.incentive_fraction, contact_cost=self.contact_cost, normalize=True, check_input=False, ), 1 def get_final_error(self, error: float, weight: float) -> float: """ Return final value of metric based on error and weight. Parameters ---------- error : float Sum of errors in all instances. weight : float Sum of weights of all instances. Returns ------- metric value : float """ return error
[docs] def expected_cost_loss_churn( y_true: FloatArrayLike, y_proba: FloatArrayLike, *, accept_rate: float = 0.3, clv: float | FloatArrayLike = 200, incentive_fraction: float | FloatArrayLike = 0.05, contact_cost: float = 1, normalize: bool = False, check_input: bool = True, ) -> float: """ Expected cost of a classifier for customer churn. The cost function presumes a situation where identified churners are contacted and offered an incentive to remain customers. Only a fraction of churners accepts the incentive offer. For detailed information, consult the paper [1]_. .. seealso:: :class:`~empulse.models.B2BoostClassifier` : Uses the instance-specific cost function as objective function. Parameters ---------- y_true : 1D array-like, shape=(n_samples,) Binary target values ('churn': 1, 'no churn': 0). y_proba : 1D array-like, shape=(n_samples,) Target probabilities, should lie between 0 and 1. accept_rate : float, default=0.3 Probability of a customer responding to the retention offer (``0 < accept_rate < 1``). clv : float or 1D array-like, shape=(n_samples), default=200 If ``float``: constant customer lifetime value per retained customer (``clv` > incentive_cost``). If ``array``: individualized customer lifetime value of each customer when retained (``mean(clv) > incentive_cost``). incentive_fraction : float, default=0.05 Cost of incentive offered to a customer, as a fraction of customer lifetime value (``0 < incentive_fraction < 1``). contact_cost : float, default=1 Constant cost of contact (``contact_cost > 0``). normalize : bool, default=False Normalize the cost by the number of samples. If ``True``, return the average expected cost for customer churn. check_input : bool, default=True Perform input validation. Turning off improves performance, useful when using this metric as a loss function. Returns ------- empc_cost : float Instance-specific cost function according to the EMPC measure. Notes ----- The instance-specific cost function for customer churn is defined as [1]_: .. math:: C(s_i) = y_i[s_i(f-\\gamma (1-\\delta )CLV_i] + (1-y_i)[s_i(\\delta CLV_i + f)] The measure requires that the churn class is encoded as 0, and it is NOT interchangeable. However, this implementation assumes the standard notation ('churn': 1, 'no churn': 0). References ---------- .. [1] Janssens, B., Bogaert, M., Bagué, A., & Van den Poel, D. (2022). B2Boost: Instance-dependent profit-driven modelling of B2B churn. Annals of Operations Research, 1-27. """ # noqa: D401 if check_input: y_true, y_proba, clv, incentive_fraction = _validate_input_cost_loss_churn( y_true, y_proba, clv=clv, accept_rate=accept_rate, incentive_fraction=incentive_fraction, contact_cost=contact_cost, ) else: y_true = np.asarray(y_true) y_proba = np.asarray(y_proba) clv = np.asarray(clv) incentive_fraction = np.asarray(incentive_fraction) incentive_cost = incentive_fraction * clv profits = y_proba * ( contact_cost + incentive_cost + y_true * (accept_rate * incentive_cost - incentive_cost - clv * accept_rate) ) if normalize: return float(np.mean(profits)) return float(np.sum(profits))