Source code for empulse.models.cy_proftree.proftree

from functools import partial
from numbers import Integral, Real
from typing import Any, ClassVar, Self

import numpy as np
from sklearn.utils._param_validation import Interval, RealNotInt
from sklearn.utils.validation import check_is_fitted

from ..._types import FloatArrayLike, FloatNDArray, IntNDArray, ParameterConstraint
from ...metrics import MaxProfit, Metric
from ...metrics.metric.common import Direction
from ...utils._sklearn_compat import validate_data  # type: ignore[attr-defined]
from ..csclassifier import CostSensitiveClassifier, MetricStrategyFactory
from .evolutionary_tree import EvolutionaryTree

MAX_INT = 2147483647


[docs] class ProfTreeClassifier(CostSensitiveClassifier): """ Profit-driven evolutionary decision tree classifier. The ProfTree classifier is a decision tree classifier that is trained using a genetic algorithm. The genetic algorithm is used to evolve a population of trees over multiple generations. The fitness of each tree is evaluated using a fitness function, which is used to select the best trees for crossover and mutation. Parameters ---------- tp_cost : float or array-like, shape=(n_samples,), default=0.0 Cost of true positives. If ``float``, then all true positives have the same cost. If array-like, then it is the cost of each true positive classification. Is overwritten if another `tp_cost` is passed to the ``fit`` method. .. note:: It is not recommended to pass instance-dependent costs to the ``__init__`` method. Instead, pass them to the ``fit`` method. fp_cost : float or array-like, shape=(n_samples,), default=0.0 Cost of false positives. If ``float``, then all false positives have the same cost. If array-like, then it is the cost of each false positive classification. Is overwritten if another `fp_cost` is passed to the ``fit`` method. .. note:: It is not recommended to pass instance-dependent costs to the ``__init__`` method. Instead, pass them to the ``fit`` method. fp_cost : float or array-like, shape=(n_samples,), default=0.0 Cost of false positives. If ``float``, then all false positives have the same cost. If array-like, then it is the cost of each false positive classification. Is overwritten if another `fp_cost` is passed to the ``fit`` method. .. note:: It is not recommended to pass instance-dependent costs to the ``__init__`` method. Instead, pass them to the ``fit`` method. fn_cost : float or array-like, shape=(n_samples,), default=0.0 Cost of false negatives. If ``float``, then all false negatives have the same cost. If array-like, then it is the cost of each false negative classification. Is overwritten if another `fn_cost` is passed to the ``fit`` method. .. note:: It is not recommended to pass instance-dependent costs to the ``__init__`` method. Instead, pass them to the ``fit`` method. loss : Metric or None Fitness function for the genetic algorithm to maximize. If ``None``, the :func:`~empulse.metrics.max_profit_score` is used. alpha : float, default=0.0 Complexity penalty for the fitness function. A way to control overfitting. When ``alpha`` is 0.0, the fitness function is not penalized for the amount of nodes in the tree. When ``alpha`` is greater than 0.0, the fitness function is penalized for the amount of nodes in the tree. patience : int, default=100 Number of iterations to wait for improvement before stopping early. tolerance : float, default=1e-4 Minimum relative improvement in fitness required to consider a solution better. max_iter : int, default=1000 Maximum number of iterations / number of generations the GA is run. max_depth : int or None, default=10 Maximum depth of the tree. Computation time scales exponentially with depth, be careful with higher values. min_samples_split : int or float, default=20 The minimum number of samples required to split an internal node: - If int, then consider `min_samples_split` as the minimum number. - If float, then `min_samples_split` is a fraction and `ceil(min_samples_split * n_samples)` are the minimum number of samples for each split. min_samples_leaf : int or float, default=7 The minimum number of samples required to be at a leaf node. A split point at any depth will only be considered if it leaves at least ``min_samples_leaf`` training samples in each of the left and right branches. This may have the effect of smoothing the model, especially in regression. - If int, then consider `min_samples_leaf` as the minimum number. - If float, then `min_samples_leaf` is a fraction and `ceil(min_samples_leaf * n_samples)` are the minimum number of samples for each node. population_size : int or None, default=None Number of decision trees in the population. If ``None``, population_size is set to ``10 * n_features``. Will be at least 2 trees. crossover_rate : float, default=0.2 Probability of crossover. Must be in [0, 1]. Variation operator is mutually exclusive with variation operators (``crossover_rate``, ``grow_rate``, ``prune_rate``, ``mutate_split_rate``, ``mutate_value_rate``). All probabilities must sum to 1. grow_rate : float, default=0.2 Probability to add a randomly generated split rule to a leaf node. Must be in [0, 1]. Variation operator is mutually exclusive with variation operators (``crossover_rate``, ``grow_rate``, ``prune_rate``, ``mutate_split_rate``, ``mutate_value_rate``). All probabilities must sum to 1. prune_rate : float, default=0.2 Probability to remove a randomly selected split rule from an internal node with two leaf nodes as children. Must be in [0, 1]. Variation operator is mutually exclusive with variation operators (``crossover_rate``, ``grow_rate``, ``prune_rate``, ``mutate_split_rate``, ``mutate_value_rate``). All probabilities must sum to 1. mutate_split_rate : float, default=0.2 Probability to change the feature and feature value of a random split in the tree. Must be in [0, 1]. Variation operator is mutually exclusive with variation operators (``crossover_rate``, ``grow_rate``, ``prune_rate``, ``mutate_split_rate``, ``mutate_value_rate``). All probabilities must sum to 1. mutate_value_rate : float, default=0.2 Probability to change only the feature value of a random split in the tree. Must be in [0, 1]. Variation operator is mutually exclusive with variation operators (``crossover_rate``, ``grow_rate``, ``prune_rate``, ``mutate_split_rate``, ``mutate_value_rate``). All probabilities must sum to 1. n_jobs : int, default=1 Number of jobs to run in parallel. random_state : np.random.RandomState, int or None, default=None Controls the randomness of the estimator. To obtain a deterministic behaviour during fitting, ``random_state`` has to be fixed to an integer. See :term:`Sklearn Glossary <random_state>` for details. """ _parameter_constraints: ClassVar[ParameterConstraint] = { **CostSensitiveClassifier._parameter_constraints, 'alpha': [Interval(Real, 0, None, closed='both')], 'patience': [Interval(Integral, 1, None, closed='left')], 'tolerance': [Interval(Real, 0, None, closed='right')], 'max_depth': [Interval(Integral, 1, None, closed='left'), None], 'min_samples_split': [ Interval(Integral, 2, None, closed='left'), Interval(RealNotInt, 0.0, 1.0, closed='right'), ], 'min_samples_leaf': [ Interval(Integral, 1, None, closed='left'), Interval(RealNotInt, 0.0, 1.0, closed='neither'), ], 'max_iter': [Interval(Integral, 1, None, closed='left')], 'population_size': [Interval(Integral, 2, None, closed='left'), None], 'crossover_rate': [Interval(Real, 0, 1, closed='both')], 'grow_rate': [Interval(Real, 0, 1, closed='both')], 'prune_rate': [Interval(Real, 0, 1, closed='both')], 'mutate_split_rate': [Interval(Real, 0, 1, closed='both')], 'mutate_value_rate': [Interval(Real, 0, 1, closed='both')], 'n_jobs': [Interval(Integral, 1, None, closed='left')], 'random_state': ['random_state'], } _default_metric_strategy: ClassVar[MetricStrategyFactory] = MaxProfit def __init__( self, *, tp_cost: FloatArrayLike | float = 0.0, tn_cost: FloatArrayLike | float = 0.0, fn_cost: FloatArrayLike | float = 0.0, fp_cost: FloatArrayLike | float = 0.0, loss: Metric | None = None, alpha: float = 0.0, patience: int = 100, tolerance: float = 1e-4, max_depth: int | None = 10, min_samples_split: float = 20, min_samples_leaf: float = 7, max_iter: int = 1000, population_size: int | None = None, crossover_rate: float = 0.2, grow_rate: float = 0.2, prune_rate: float = 0.2, mutate_split_rate: float = 0.2, mutate_value_rate: float = 0.2, n_jobs: int = 1, random_state: np.random.RandomState | int | None = None, ): self.alpha = alpha self.patience = patience self.tolerance = tolerance self.max_depth = max_depth self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.population_size = population_size self.max_iter = max_iter self.crossover_rate = crossover_rate self.grow_rate = grow_rate self.prune_rate = prune_rate self.mutate_split_rate = mutate_split_rate self.mutate_value_rate = mutate_value_rate self.random_state = random_state self.n_jobs = n_jobs super().__init__(tp_cost=tp_cost, tn_cost=tn_cost, fp_cost=fp_cost, fn_cost=fn_cost, loss=loss) def _fit(self, X: FloatNDArray, y: IntNDArray, loss: Metric, **loss_params: Any) -> Self: """ Fit a tree to a training set. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data. y : array-like of shape (n_samples,) Target values. loss : Metric Loss to be optimized. loss_params : Any Additional parameter to be passed to the loss function. Returns ------- Node: The fitted tree. """ n_samples = X.shape[0] population_size = 10 * X.shape[1] if self.population_size is None else self.population_size if self.min_samples_split < 1: min_samples_split = np.ceil(self.min_samples_split * n_samples) else: min_samples_split = self.min_samples_split min_samples_split = max(min_samples_split, 2) if self.min_samples_leaf < 1: min_samples_leaf = np.ceil(self.min_samples_leaf * n_samples) else: min_samples_leaf = self.min_samples_leaf min_samples_leaf = max(min_samples_leaf, 1) total_probability = ( self.crossover_rate + self.grow_rate + self.prune_rate + self.mutate_split_rate + self.mutate_value_rate ) crossover_rate = self.crossover_rate / total_probability grow_rate = self.grow_rate / total_probability prune_rate = self.prune_rate / total_probability mutate_split_rate = self.mutate_split_rate / total_probability mutate_value_rate = self.mutate_value_rate / total_probability if isinstance(self.random_state, np.random.RandomState): random_state = int(self.random_state.randint(low=0, high=2**31 - 1)) else: random_state = int(self.random_state) if self.random_state is not None else -1 self.tree_ = EvolutionaryTree() if self.loss is None: tp_cost = loss_params.get('tp_cost', 0) tn_cost = loss_params.get('tn_cost', 0) fn_cost = loss_params.get('fn_cost', 0) fp_cost = loss_params.get('fp_cost', 0) self.tree_.fit_max_profit( X=X.astype(np.float32), y=y.astype(np.int32), tp_benefit=-float(np.mean(tp_cost)), tn_benefit=-float(np.mean(tn_cost)), fp_cost=float(np.mean(fp_cost)), fn_cost=float(np.mean(fn_cost)), pop_size=int(population_size), crossover_rate=float(crossover_rate), grow_rate=float(grow_rate), prune_rate=float(prune_rate), mutate_split_rate=float(mutate_split_rate), mutate_value_rate=float(mutate_value_rate), max_depth=int(self.max_depth) if self.max_depth is not None else MAX_INT, min_samples_split=int(min_samples_split), min_samples_leaf=int(min_samples_leaf), alpha=float(self.alpha), max_generations=int(self.max_iter), patience=int(self.patience), tol=float(self.tolerance), random_state=random_state, ) elif isinstance(self.loss, Metric): if isinstance(self.loss.strategy, MaxProfit) and self.loss._is_deterministic: fp_cost, fn_cost, tp_cost, tn_cost = self.loss._evaluate_costs(**loss_params) tp_benefit = -float(np.mean(tp_cost)) tn_benefit = -float(np.mean(tn_cost)) fp_cost = float(np.mean(fp_cost)) fn_cost = float(np.mean(fn_cost)) self.tree_.fit_max_profit( X=X.astype(np.float32), y=y.astype(np.int32), tp_benefit=tp_benefit, tn_benefit=tn_benefit, fp_cost=fp_cost, fn_cost=fn_cost, pop_size=int(population_size), crossover_rate=float(crossover_rate), grow_rate=float(grow_rate), prune_rate=float(prune_rate), mutate_split_rate=float(mutate_split_rate), mutate_value_rate=float(mutate_value_rate), max_depth=int(self.max_depth) if self.max_depth is not None else MAX_INT, min_samples_split=int(min_samples_split), min_samples_leaf=int(min_samples_leaf), alpha=float(self.alpha), max_generations=int(self.max_iter), patience=int(self.patience), tol=float(self.tolerance), random_state=random_state, ) else: if self.loss.direction is Direction.MAXIMIZE: fitness_fn = lambda *args, **kwargs: -self.loss(*args, **kwargs) else: fitness_fn = self.loss fitness_fn = partial(fitness_fn, **loss_params) y_proba = np.random.default_rng().random(y.size, dtype=np.float32) try: # catch issue with the loss function before it goes into C world fitness_fn(y.astype(np.int32), y_proba) except (TypeError, ValueError) as e: raise ValueError( f'The loss function {self.loss} threw an error when evaluating the function.' ) from e self.tree_.fit_custom( X=X.astype(np.float32), y=y.astype(np.int32), pop_size=int(population_size), crossover_rate=float(crossover_rate), grow_rate=float(grow_rate), prune_rate=float(prune_rate), mutate_split_rate=float(mutate_split_rate), mutate_value_rate=float(mutate_value_rate), max_depth=int(self.max_depth) if self.max_depth is not None else MAX_INT, min_samples_split=int(min_samples_split), min_samples_leaf=int(min_samples_leaf), alpha=float(self.alpha), max_generations=int(self.max_iter), patience=int(self.patience), tol=float(self.tolerance), fitness_function=fitness_fn, random_state=random_state, ) else: raise ValueError(f'Unknown loss function: {self.loss}.') self.n_iter_ = self.tree_.n_generations return self
[docs] def predict_proba(self, X: FloatArrayLike) -> FloatNDArray: """ Predict class probabilities of the input samples X. The predicted class probability is the fraction of samples of the same class in a leaf. Parameters ---------- X : array-like of shape (n_samples, n_features) The input samples. Internally, it will be converted to ``dtype=np.float32``. Returns ------- proba : ndarray of shape (n_samples, n_classes) The class probabilities of the input samples. The order of the classes corresponds to that in the attribute :term:`classes_`. """ check_is_fitted(self) X = validate_data(self, X, reset=False).astype(np.float32) y_proba = self.tree_.predict_proba(X) y_proba = np.vstack((1 - y_proba, y_proba)).T return y_proba