from collections.abc import Callable
from functools import partial
from typing import Any, Self
import numpy as np
import sympy
from ...._types import FloatNDArray, IntNDArray
from ..._loss import cy_logit_loss_gradient
from ..common import (
BoostGradientConst,
Direction,
LogitConsts,
MetricFn,
RateFn,
ThresholdFn,
_check_parameters,
_safe_lambdify,
_safe_run_lambda,
_safe_run_lambda_array,
)
from .metric_strategy import MetricStrategy
def replace_random_var_with_mean(
tp_benefit: sympy.Expr,
tn_benefit: sympy.Expr,
fp_cost: sympy.Expr,
fn_cost: sympy.Expr,
) -> tuple[sympy.Expr, sympy.Expr, sympy.Expr, sympy.Expr]:
"""Replace random variables in the expressions with their means."""
all_symbols = tp_benefit.free_symbols | tn_benefit.free_symbols | fp_cost.free_symbols | fn_cost.free_symbols
# Mapping of distributions to their fixed mean expressions
fixed_means: dict[
sympy.stats.crv_types.SingleContinuousDistribution, Callable[[tuple[sympy.Expr, ...]], sympy.Expr]
] = {
sympy.stats.crv_types.ArcsinDistribution: lambda params: (params[0] + params[1]) / 2,
sympy.stats.crv_types.BetaPrimeDistribution: lambda params: params[0] / (params[1] - 1),
sympy.stats.crv_types.StudentTDistribution: lambda params: 0,
sympy.stats.crv_types.FDistributionDistribution: lambda params: params[1] / (params[1] - 2),
sympy.stats.crv_types.GammaInverseDistribution: lambda params: params[1] / (params[0] - 1),
sympy.stats.crv_types.LogNormalDistribution: lambda params: sympy.exp(params[0] + params[1] ** 2 / 2),
sympy.stats.crv_types.LomaxDistribution: lambda params: params[1] / (params[0] - 1),
sympy.stats.crv_types.ParetoDistribution: lambda params: (params[1] * params[0]) / (params[1] - 1),
sympy.stats.crv_types.PowerFunctionDistribution: (
lambda params: params[1] + params[0] * (params[2] - params[1]) / (params[0] + 1)
),
}
# Identify random symbols and replace each by its expectation
random_symbols = [symbol for symbol in all_symbols if sympy.stats.rv.is_random(symbol)]
if not random_symbols:
return tp_benefit, tn_benefit, fp_cost, fn_cost
subs_map = {}
for symbol in random_symbols:
dist_type = type(symbol.pspace.distribution)
# Check if we have a fixed substitution for this distribution
if dist_type in fixed_means:
# Extract parameters from the distribution
params = symbol.pspace.distribution.args
subs_map[symbol] = fixed_means[dist_type](params)
else:
try:
mean_expr = sympy.stats.E(symbol)
# Try to lambdify to verify it's computable
_ = sympy.lambdify([], mean_expr, modules=['scipy', 'numpy'])
subs_map[symbol] = mean_expr
except (NotImplementedError, TypeError) as e:
raise NotImplementedError(
f"Cannot compute or evaluate expectation for random variable '{symbol}'. "
f"The distribution '{dist_type.__name__}' may not support "
f'mean computation or lambdification in SymPy.'
) from e
tp_benefit = tp_benefit.subs(subs_map)
tn_benefit = tn_benefit.subs(subs_map)
fp_cost = fp_cost.subs(subs_map)
fn_cost = fn_cost.subs(subs_map)
return tp_benefit, tn_benefit, fp_cost, fn_cost
[docs]
class Cost(MetricStrategy):
"""Strategy for the Expected Cost metric."""
def __init__(self) -> None:
super().__init__(name='cost', direction=Direction.MINIMIZE)
[docs]
def build(
self,
tp_benefit: sympy.Expr,
tn_benefit: sympy.Expr,
fp_cost: sympy.Expr,
fn_cost: sympy.Expr,
) -> Self:
"""Build the metric strategy."""
tp_benefit, tn_benefit, fp_cost, fn_cost = replace_random_var_with_mean(
tp_benefit, tn_benefit, fp_cost, fn_cost
)
self._score_function: MetricFn = CostLoss(
tp_benefit=tp_benefit, tn_benefit=tn_benefit, fp_cost=fp_cost, fn_cost=fn_cost
)
self._optimal_threshold: ThresholdFn = CostOptimalThreshold(
tp_benefit=tp_benefit,
tn_benefit=tn_benefit,
fp_cost=fp_cost,
fn_cost=fn_cost,
)
self._optimal_rate: RateFn = CostOptimalRate(
tp_benefit=tp_benefit,
tn_benefit=tn_benefit,
fp_cost=fp_cost,
fn_cost=fn_cost,
)
self._prepare_logit_objective: LogitConsts = CostLogitConsts(
tp_benefit=tp_benefit, tn_benefit=tn_benefit, fp_cost=fp_cost, fn_cost=fn_cost
)
self._prepare_boost_objective: BoostGradientConst = CostBoostGradientConst(
tp_benefit=tp_benefit,
tn_benefit=tn_benefit,
fp_cost=fp_cost,
fn_cost=fn_cost,
)
return self
[docs]
def score(self, y_true: IntNDArray, y_score: FloatNDArray, **parameters: FloatNDArray | float) -> float:
"""
Compute the metric expected cost loss.
Parameters
----------
y_true: array-like of shape (n_samples,)
The ground truth labels.
y_score: array-like of shape (n_samples,)
The predicted labels, probabilities, or decision scores (based on the chosen metric).
parameters: float or array-like of shape (n_samples,)
The parameter values for the costs and benefits defined in the metric.
If any parameter is a stochastic variable, you should pass values for their distribution parameters.
You can set the parameter values for either the symbol names or their aliases.
- If ``float``, the same value is used for all samples (class-dependent).
- If ``array-like``, the values are used for each sample (instance-dependent).
Returns
-------
score: float
The expected cost loss.
"""
return self._score_function(y_true, y_score, **parameters)
[docs]
def optimal_threshold(
self, y_true: IntNDArray, y_score: FloatNDArray, **parameters: FloatNDArray | float
) -> float | FloatNDArray:
"""
Compute the classification threshold(s) to optimize the metric value.
i.e., the score threshold at which an observation should be classified as positive to optimize the metric.
For instance-dependent costs and benefits, this will return an array of thresholds, one for each sample.
For class-dependent costs and benefits, this will return a single threshold value.
Parameters
----------
y_true: array-like of shape (n_samples,)
The ground truth labels.
y_score: array-like of shape (n_samples,)
The predicted labels, probabilities, or decision scores (based on the chosen metric).
parameters: float or array-like of shape (n_samples,)
The parameter values for the costs and benefits defined in the metric.
If any parameter is a stochastic variable, you should pass values for their distribution parameters.
You can set the parameter values for either the symbol names or their aliases.
- If ``float``, the same value is used for all samples (class-dependent).
- If ``array-like``, the values are used for each sample (instance-dependent).
Returns
-------
optimal_threshold: float | FloatNDArray
The optimal classification threshold(s).
"""
return self._optimal_threshold(y_true, y_score, **parameters)
[docs]
def optimal_rate(self, y_true: IntNDArray, y_score: FloatNDArray, **parameters: FloatNDArray | float) -> float:
"""
Compute the predicted positive rate to optimize the metric value.
Parameters
----------
y_true: array-like of shape (n_samples,)
The ground truth labels.
y_score: array-like of shape (n_samples,)
The predicted labels, probabilities, or decision scores (based on the chosen metric).
parameters: float or array-like of shape (n_samples,)
The parameter values for the costs and benefits defined in the metric.
If any parameter is a stochastic variable, you should pass values for their distribution parameters.
You can set the parameter values for either the symbol names or their aliases.
- If ``float``, the same value is used for all samples (class-dependent).
- If ``array-like``, the values are used for each sample (instance-dependent).
Returns
-------
optimal_rate: float
The optimal predicted positive rate.
"""
return self._optimal_rate(y_true, y_score, **parameters)
[docs]
def prepare_logit_objective(
self, features: FloatNDArray, y_true: FloatNDArray, **parameters: FloatNDArray | float
) -> tuple[FloatNDArray, FloatNDArray, FloatNDArray]:
"""
Compute the constant term of the loss and gradient of the metric wrt logistic regression coefficients.
Parameters
----------
features : NDArray of shape (n_samples, n_features)
The features of the samples.
y_true : NDArray of shape (n_samples,)
The ground truth labels.
parameters : float or NDArray of shape (n_samples,)
The parameter values for the costs and benefits defined in the metric.
If any parameter is a stochastic variable, you should pass values for their distribution parameters.
You can set the parameter values for either the symbol names or their aliases.
- If ``float``, the same value is used for all samples (class-dependent).
- If ``array-like``, the values are used for each sample (instance-dependent).
Returns
-------
gradient_const : NDArray of shape (n_samples, n_features)
The constant term of the gradient.
loss_const1 : NDArray of shape (n_features,)
The first constant term of the loss function.
loss_const2 : NDArray of shape (n_features,)
The second constant term of the loss function.
"""
if y_true.ndim == 1:
y_true = np.expand_dims(y_true, axis=1)
return self._prepare_logit_objective.prepare(features, y_true, **parameters)
[docs]
def logit_objective(
self,
features: FloatNDArray,
y_true: FloatNDArray,
C: float,
l1_ratio: float,
soft_threshold: bool,
fit_intercept: bool,
**parameters: FloatNDArray | float,
) -> Callable[[FloatNDArray], tuple[float, FloatNDArray]]:
"""
Build a function which computes the metric value and the gradient of the metric w.r.t logistic coefficients.
Parameters
----------
features : NDArray of shape (n_samples, n_features)
The features of the samples.
y_true : NDArray of shape (n_samples,)
The ground truth labels.
C : float
Regularization strength parameter. Smaller values specify stronger regularization.
l1_ratio : float
The Elastic-Net mixing parameter, with range 0 <= l1_ratio <= 1.
l1_ratio=0 corresponds to L2 penalty, l1_ratio=1 to L1 penalty.
soft_threshold : bool
Indicator of whether soft thresholding is applied during optimization.
fit_intercept : bool
Specifies if an intercept should be included in the model.
parameters : float or NDArray of shape (n_samples,)
The parameter values for the costs and benefits defined in the metric.
If any parameter is a stochastic variable, you should pass values for their distribution parameters.
You can set the parameter values for either the symbol names or their aliases.
- If ``float``, the same value is used for all samples (class-dependent).
- If ``array-like``, the values are used for each sample (instance-dependent).
Returns
-------
logistic_objective : Callable[[NDArray], tuple[float, NDArray]]
A function that takes logistic regression weights as input and returns the metric value and its gradient.
The function signature is:
``logistic_objective(weights) -> (value, gradient)``
"""
grad_const, loss_const1, loss_const2 = self.prepare_logit_objective(features, y_true, **parameters)
loss_const1 = (
loss_const1.reshape(-1)
if isinstance(loss_const1, np.ndarray)
else np.full(len(y_true), loss_const1, dtype=np.float64)
)
loss_const2 = (
loss_const2.reshape(-1)
if isinstance(loss_const2, np.ndarray)
else np.full(len(y_true), loss_const2, dtype=np.float64)
)
return partial(
cy_logit_loss_gradient,
grad_const=grad_const,
loss_const1=loss_const1,
loss_const2=loss_const2,
features=features,
C=C,
l1_ratio=l1_ratio,
soft_threshold=soft_threshold,
fit_intercept=fit_intercept,
)
[docs]
def prepare_boost_objective(self, y_true: FloatNDArray, **parameters: FloatNDArray | float) -> FloatNDArray:
"""
Compute the gradient's constant term of the metric wrt gradient boost.
Parameters
----------
y_true : NDArray of shape (n_samples,)
The ground truth labels.
parameters : float or NDArray of shape (n_samples,)
The parameter values for the costs and benefits defined in the metric.
If any parameter is a stochastic variable, you should pass values for their distribution parameters.
You can set the parameter values for either the symbol names or their aliases.
- If ``float``, the same value is used for all samples (class-dependent).
- If ``array-like``, the values are used for each sample (instance-dependent).
Returns
-------
gradient_const : NDArray of shape (n_samples, n_features)
The constant term of the gradient.
"""
if y_true.ndim == 1:
y_true = np.expand_dims(y_true, axis=1)
return self._prepare_boost_objective(y_true, **parameters)
[docs]
def to_latex(
self,
tp_benefit: sympy.Expr,
tn_benefit: sympy.Expr,
fp_cost: sympy.Expr,
fn_cost: sympy.Expr,
) -> str:
"""Return the LaTeX representation of the metric."""
return _cost_loss_to_latex(tp_benefit, tn_benefit, fp_cost, fn_cost)
class CostLoss:
"""Class to compute the metric for binary classification."""
def __init__(self, tp_benefit: sympy.Expr, tn_benefit: sympy.Expr, fp_cost: sympy.Expr, fn_cost: sympy.Expr):
self.cost_equation = _build_cost_equation(
tp_cost=-tp_benefit, tn_cost=-tn_benefit, fp_cost=fp_cost, fn_cost=fn_cost
)
self.cost_function = _safe_lambdify(self.cost_equation)
def __call__(self, y_true: IntNDArray, y_score: FloatNDArray, **kwargs: Any) -> float:
"""Compute the cost loss."""
_check_parameters(self.cost_equation.free_symbols - set(sympy.symbols('y s')), kwargs)
return float(np.mean(_safe_run_lambda(self.cost_function, self.cost_equation, y=y_true, s=y_score, **kwargs)))
def _build_cost_equation(
tp_cost: sympy.Expr, tn_cost: sympy.Expr, fp_cost: sympy.Expr, fn_cost: sympy.Expr
) -> sympy.Expr:
y, s = sympy.symbols('y s')
cost_function = y * (s * tp_cost + (1 - s) * fn_cost) + (1 - y) * ((1 - s) * tn_cost + s * fp_cost)
return cost_function
class CostLogitConsts:
"""Class to compute the constants of the cost metric for logistic regression."""
def __init__(self, tp_benefit: sympy.Expr, tn_benefit: sympy.Expr, fp_cost: sympy.Expr, fn_cost: sympy.Expr):
y, x = sympy.symbols('y x')
self.gradient_const = x * (y * (-tp_benefit - fn_cost) + (1 - y) * (fp_cost + tn_benefit))
self.gradient_fn = _safe_lambdify(self.gradient_const)
self.loss_const1 = y * -tp_benefit + (1 - y) * fp_cost
self.loss_const1_fn = _safe_lambdify(self.loss_const1)
self.loss_const2 = y * fn_cost - (1 - y) * tn_benefit
self.loss_const2_fn = _safe_lambdify(self.loss_const2)
def prepare(
self, x: FloatNDArray, y_true: FloatNDArray, **kwargs: Any
) -> tuple[FloatNDArray, FloatNDArray, FloatNDArray]:
"""Prepare the constant terms for the logistic regression objective."""
gradient_const_value = _safe_run_lambda_array(
self.gradient_fn, self.gradient_const, shape=x.shape, y=y_true, x=x, **kwargs
)
loss_const1_value = _safe_run_lambda_array(
self.loss_const1_fn, self.loss_const1, shape=y_true.shape[0], y=y_true, **kwargs
)
loss_const2_value = _safe_run_lambda_array(
self.loss_const2_fn, self.loss_const2, shape=y_true.shape[0], y=y_true, **kwargs
)
return gradient_const_value, loss_const1_value, loss_const2_value
class CostBoostGradientConst:
"""Class to compute the gradient constants of the cost metric for gradient boosting."""
def __init__(self, tp_benefit: sympy.Expr, tn_benefit: sympy.Expr, fp_cost: sympy.Expr, fn_cost: sympy.Expr):
y = sympy.symbols('y')
self.gradient_const_eq = y * (-tp_benefit - fn_cost) + (1 - y) * (fp_cost + tn_benefit)
self.gradient_const_fn = _safe_lambdify(self.gradient_const_eq)
def __call__(self, y_true: FloatNDArray, **kwargs: Any) -> FloatNDArray:
"""Compute the gradient constants."""
_check_parameters(self.gradient_const_eq.free_symbols - {sympy.symbols('y')}, kwargs)
gradient_const_value = _safe_run_lambda_array(
self.gradient_const_fn, self.gradient_const_eq, shape=y_true.shape[0], y=y_true, **kwargs
)
return gradient_const_value
class CostOptimalThreshold:
"""Class to compute the optimal threshold for the cost metric."""
def __init__(self, tp_benefit: sympy.Expr, tn_benefit: sympy.Expr, fp_cost: sympy.Expr, fn_cost: sympy.Expr):
self.denominator_expression = fp_cost + tn_benefit + fn_cost + tp_benefit
self.numerator_expression = fp_cost + tn_benefit
self.calculate_denominator = _safe_lambdify(self.denominator_expression)
self.calculate_numerator = _safe_lambdify(self.numerator_expression)
def __call__(self, y_true: IntNDArray, y_score: FloatNDArray, **parameters: Any) -> FloatNDArray | float:
"""Compute the optimal threshold(s). `y_true` and `y_score` are unused and kept for API compatibility."""
_check_parameters(self.denominator_expression.free_symbols, parameters)
denominator = _safe_run_lambda(self.calculate_denominator, self.denominator_expression, **parameters)
numerator = _safe_run_lambda(self.calculate_numerator, self.numerator_expression, **parameters)
eps = float(np.finfo(np.float64).eps)
if np.isscalar(denominator):
if denominator == 0:
denominator = eps
else:
denom_arr = np.asarray(denominator, dtype=np.float64)
denominator = np.where(denom_arr == 0, eps, denom_arr)
optimal: FloatNDArray | float = numerator / denominator # type: ignore[operator, assignment]
return float(optimal) if np.isscalar(optimal) else optimal # type: ignore[arg-type]
class CostOptimalRate:
"""Class to compute the optimal predicted positive rate for the cost metric."""
def __init__(self, tp_benefit: sympy.Expr, tn_benefit: sympy.Expr, fp_cost: sympy.Expr, fn_cost: sympy.Expr):
self.denominator_expression = fp_cost + tn_benefit + fn_cost + tp_benefit
self.numerator_expression = fp_cost + tn_benefit
self.calculate_denominator = _safe_lambdify(self.denominator_expression)
self.calculate_numerator = _safe_lambdify(self.numerator_expression)
def __call__(self, y_true: IntNDArray, y_score: FloatNDArray, **parameters: Any) -> float:
"""Compute the optimal predicted positive rate."""
_check_parameters(self.denominator_expression.free_symbols, parameters)
denominator = _safe_run_lambda(self.calculate_denominator, self.denominator_expression, **parameters)
numerator = _safe_run_lambda(self.calculate_numerator, self.numerator_expression, **parameters)
# Robust division to avoid divide-by-zero
eps = float(np.finfo(np.float64).eps)
if np.isscalar(denominator):
denom_safe: FloatNDArray | float = denominator if denominator != 0 else eps # type: ignore[assignment]
else:
denom_arr = np.asarray(denominator, dtype=np.float64)
denom_safe = np.where(denom_arr == 0, eps, denom_arr) # type: ignore[assignment]
t_star: FloatNDArray | float = numerator / denom_safe # type: ignore[operator, assignment]
scores = np.asarray(y_score)
if scores.ndim > 1:
scores = scores.reshape(-1)
if np.isscalar(t_star):
rate = float(np.mean(scores >= float(t_star))) # type: ignore[arg-type]
else:
t_arr = np.asarray(t_star, dtype=np.float64).reshape(-1)
rate = float(np.mean(scores >= t_arr))
return rate
def _cost_loss_to_latex(
tp_benefit: sympy.Expr, tn_benefit: sympy.Expr, fp_cost: sympy.Expr, fn_cost: sympy.Expr
) -> str:
from sympy.printing.latex import latex
i, N = sympy.symbols('i N') # noqa: N806
cost_function = (1 / N) * sympy.Sum(
_format_cost_function(tp_cost=-tp_benefit, tn_cost=-tn_benefit, fp_cost=fp_cost, fn_cost=fn_cost), (i, 0, N)
)
for symbol in cost_function.free_symbols:
if symbol != N:
cost_function = cost_function.subs(symbol, str(symbol) + '_i')
output = latex(cost_function, mode='plain', order=None)
return f'$\\displaystyle {output}$'
def _format_cost_function(
tp_cost: sympy.Expr, tn_cost: sympy.Expr, fp_cost: sympy.Expr, fn_cost: sympy.Expr
) -> sympy.Expr:
y, s = sympy.symbols('y s')
cost_function = y * (s * tp_cost + (1 - s) * fn_cost) + (1 - y) * ((1 - s) * tn_cost + s * fp_cost)
return cost_function