Source code for reflame.base_flnn

#!/usr/bin/env python
# Created by "Thieu" at 13:43, 13/09/2023 ----------%
#       Email: nguyenthieu2102@gmail.com            %                                                    
#       Github: https://github.com/thieu1995        %                         
# --------------------------------------------------%

import pickle
from pathlib import Path
import numpy as np

import pandas as pd
from permetrics import RegressionMetric, ClassificationMetric
from sklearn.base import BaseEstimator
from mealpy import get_optimizer_by_name, Optimizer, get_all_optimizers, FloatVar
from reflame.utils import activation, validator, expand_util
from reflame.utils.evaluator import get_all_regression_metrics, get_all_classification_metrics


[docs]class FLNN: """This class defines the general Functional Link Neural Network (FLNN) model Parameters ---------- size_input : int, default=5 The number of input features size_output : int, default=1 The number of output labels expand_name : str, default="chebyshev" The expand function that will be used. The supported expand functions are: {"chebyshev", "legendre", "gegenbauer", "laguerre", "hermite", "power", "trigonometric"} n_funcs : int, default=4 The first `n_funcs` in expand functions list will be used. Valid value from 1 to 10. act_name : str, default='none' Activation function for the hidden layer. The supported activation functions are: {"none", "relu", "prelu", "gelu", "elu", "selu", "rrelu", "tanh", "hard_tanh", "sigmoid", "hard_sigmoid", "swish", "hard_swish", "soft_plus", "mish", "soft_sign", "tanh_shrink", "soft_shrink", "hard_shrink"} """ def __init__(self, size_input=5, size_output=1, expand_name="chebyshev", n_funcs=4, act_name='elu'): self.input_nodes = size_input * n_funcs self.output_nodes = size_output self.size_w = self.input_nodes * self.output_nodes self.size_b = size_output self.expand_name = expand_name self.expand_func = getattr(expand_util, f"expand_{self.expand_name}") self.n_funcs = n_funcs self.act_name = act_name self.act_func = getattr(activation, self.act_name) self.weights = { "w": np.random.rand(self.input_nodes, self.output_nodes), "b": np.random.rand(self.output_nodes, ), }
[docs] def transform_X(self, X): return self.expand_func(X, self.n_funcs)
[docs] def fit(self, X, y): """Fit the model to data matrix X and target(s) y. Parameters ---------- X : ndarray or sparse matrix of shape (n_samples, n_features) The input data. y : ndarray of shape (n_samples,) or (n_samples, n_outputs) The target values (class labels in classification, real numbers in regression). Returns ------- self : object Returns a trained FLNN model. """ # H = self.act_func(np.dot(X, self.weights["w1"]) + self.weights["b"]) # self.weights["w2"] = np.linalg.pinv(H) @ y return self
[docs] def predict(self, X): """Predict using the Extreme Learning Machine model. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) The input data. Returns ------- y : ndarray of shape (n_samples, n_outputs) The predicted values. """ X = self.transform_X(X) y_pred = self.act_func(np.dot(X, self.weights["w"]) + self.weights["b"]) return y_pred
[docs] def get_weights(self): return self.weights
[docs] def set_weights(self, weights): self.weights = weights
[docs] def get_weights_size(self): return np.sum([item.size for item in self.weights.values()])
[docs] def update_weights_from_solution(self, solution): w = np.reshape(solution[:self.size_w], (self.input_nodes, self.output_nodes)) b = np.reshape(solution[self.size_w:], self.output_nodes) self.set_weights({"w": w, "b": b})
[docs]class BaseFlnn(BaseEstimator): """ Defines the most general class for FLNN network that inherits the BaseEstimator class of Scikit-Learn library. Parameters ---------- expand_name : str, default="chebyshev" The expand function that will be used. The supported expand functions are: {"chebyshev", "legendre", "gegenbauer", "laguerre", "hermite", "power", "trigonometric"} n_funcs : int, default=4 The first `n_funcs` in expand functions list will be used. Valid value from 1 to 10. act_name : str, default='none' Activation function for the hidden layer. The supported activation functions are: {"none", "relu", "prelu", "gelu", "elu", "selu", "rrelu", "tanh", "hard_tanh", "sigmoid", "hard_sigmoid", "swish", "hard_swish", "soft_plus", "mish", "soft_sign", "tanh_shrink", "soft_shrink", "hard_shrink"} """ SUPPORTED_CLS_METRICS = get_all_classification_metrics() SUPPORTED_REG_METRICS = get_all_regression_metrics() CLS_OBJ_LOSSES = None def __init__(self, expand_name="chebyshev", n_funcs=4, act_name="none"): super().__init__() self.expand_name = expand_name self.n_funcs = n_funcs self.act_name = act_name self.weights = {} self.network, self.obj_scaler, self.loss_train = None, None, None self.n_labels, self.obj_scaler = None, None @staticmethod def _check_method(method=None, list_supported_methods=None): if type(method) is str: return validator.check_str("method", method, list_supported_methods) else: raise ValueError(f"method should be a string and belongs to {list_supported_methods}")
[docs] def create_network(self, X, y): return None, None
[docs] def fit(self, X, y): self.network, self.obj_scaler = self.create_network(X, y) y_scaled = self.obj_scaler.transform(y) self.network.fit(X, y_scaled) return self
[docs] def predict(self, X, return_prob=False): """ Inherit the predict function from BaseFlnn class, with 1 more parameter `return_prob`. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) The input data. return_prob : bool, default=False It is used for classification problem: - If True, the returned results are the probability for each sample - If False, the returned results are the predicted labels """ pred = self.network.predict(X) if return_prob: return pred return self.obj_scaler.inverse_transform(pred)
def __evaluate_reg(self, y_true, y_pred, list_metrics=("MSE", "MAE")): rm = RegressionMetric(y_true=y_true, y_pred=y_pred, decimal=8) return rm.get_metrics_by_list_names(list_metrics) def __evaluate_cls(self, y_true, y_pred, list_metrics=("AS", "RS")): cm = ClassificationMetric(y_true, y_pred, decimal=8) return cm.get_metrics_by_list_names(list_metrics) def __score_reg(self, X, y, method="RMSE"): """Return the metric of the prediction. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape ``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. method : str, default="RMSE" You can get all metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- result : float The result of selected metric """ method = self._check_method(method, list(self.SUPPORTED_REG_METRICS.keys())) y_pred = self.network.predict(X) return RegressionMetric(y, y_pred, decimal=6).get_metric_by_name(method)[method] def __scores_reg(self, X, y, list_methods=("MSE", "MAE")): """Return the list of metrics of the prediction. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape ``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. list_methods : list, default=("MSE", "MAE") You can get all metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- results : dict The results of the list metrics """ y_pred = self.network.predict(X) rm = RegressionMetric(y_true=y, y_pred=y_pred, decimal=6) return rm.get_metrics_by_list_names(list_methods) def __score_cls(self, X, y, method="AS"): """ Return the metric on the given test data and labels. In multi-label classification, this is the subset accuracy which is a harsh metric since you require for each sample that each label set be correctly predicted. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True labels for `X`. method : str, default="AS" You can get all metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- result : float The result of selected metric """ method = self._check_method(method, list(self.SUPPORTED_CLS_METRICS.keys())) return_prob = False if self.n_labels > 2: if method in self.CLS_OBJ_LOSSES: return_prob = True y_pred = self.predict(X, return_prob=return_prob) cm = ClassificationMetric(y_true=y, y_pred=y_pred, decimal=6) return cm.get_metric_by_name(method)[method] def __scores_cls(self, X, y, list_methods=("AS", "RS")): """ Return the list of metrics on the given test data and labels. In multi-label classification, this is the subset accuracy which is a harsh metric since you require for each sample that each label set be correctly predicted. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True labels for `X`. list_methods : list, default=("AS", "RS") You can get all metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- results : dict The results of the list metrics """ list_errors = list(set(list_methods) & set(self.CLS_OBJ_LOSSES)) list_scores = list((set(self.SUPPORTED_CLS_METRICS.keys()) - set(self.CLS_OBJ_LOSSES)) & set(list_methods)) t1 = {} if len(list_errors) > 0: return_prob = False if self.n_labels > 2: return_prob = True y_pred = self.predict(X, return_prob=return_prob) cm = ClassificationMetric(y, y_pred, decimal=6) t1 = cm.get_metrics_by_list_names(list_errors) y_pred = self.predict(X, return_prob=False) cm = ClassificationMetric(y, y_pred, decimal=6) t2 = cm.get_metrics_by_list_names(list_scores) return {**t2, **t1}
[docs] def evaluate(self, y_true, y_pred, list_metrics=None): """Return the list of performance metrics of the prediction. Parameters ---------- y_true : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. y_pred : array-like of shape (n_samples,) or (n_samples, n_outputs) Predicted values for `X`. list_metrics : list You can get metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- results : dict The results of the list metrics """ pass
[docs] def score(self, X, y, method=None): """Return the metric of the prediction. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape ``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. method : str, default="RMSE" You can get all metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- result : float The result of selected metric """ pass
[docs] def scores(self, X, y, list_methods=None): """Return the list of metrics of the prediction. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape ``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. list_methods : list, default=("MSE", "MAE") You can get all metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- results : dict The results of the list metrics """ pass
[docs] def save_loss_train(self, save_path="history", filename="loss.csv"): """ Save the loss (convergence) during the training process to csv file. Parameters ---------- save_path : saved path (relative path, consider from current executed script path) filename : name of the file, needs to have ".csv" extension """ Path(save_path).mkdir(parents=True, exist_ok=True) if self.loss_train is None: print(f"{self.__class__.__name__} model doesn't have training loss!") else: data = {"epoch": list(range(1, len(self.loss_train) + 1)), "loss": self.loss_train} pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_metrics(self, y_true, y_pred, list_metrics=("RMSE", "MAE"), save_path="history", filename="metrics.csv"): """ Save evaluation metrics to csv file Parameters ---------- y_true : ground truth data y_pred : predicted output list_metrics : list of evaluation metrics save_path : saved path (relative path, consider from current executed script path) filename : name of the file, needs to have ".csv" extension """ Path(save_path).mkdir(parents=True, exist_ok=True) results = self.evaluate(y_true, y_pred, list_metrics) df = pd.DataFrame.from_dict(results, orient='index').T df.to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_y_predicted(self, X, y_true, save_path="history", filename="y_predicted.csv"): """ Save the predicted results to csv file Parameters ---------- X : The features data, nd.ndarray y_true : The ground truth data save_path : saved path (relative path, consider from current executed script path) filename : name of the file, needs to have ".csv" extension """ Path(save_path).mkdir(parents=True, exist_ok=True) y_pred = self.predict(X, return_prob=False) data = {"y_true": np.squeeze(np.asarray(y_true)), "y_pred": np.squeeze(np.asarray(y_pred))} pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_model(self, save_path="history", filename="model.pkl"): """ Save model to pickle file Parameters ---------- save_path : saved path (relative path, consider from current executed script path) filename : name of the file, needs to have ".pkl" extension """ Path(save_path).mkdir(parents=True, exist_ok=True) if filename[-4:] != ".pkl": filename += ".pkl" pickle.dump(self, open(f"{save_path}/{filename}", 'wb'))
[docs] @staticmethod def load_model(load_path="history", filename="model.pkl"): if filename[-4:] != ".pkl": filename += ".pkl" return pickle.load(open(f"{load_path}/{filename}", 'rb'))
[docs]class BaseMhaFlnn(BaseFlnn): """ Defines the most general class for Metaheuristic-based FLNN model that inherits the BaseFlnn class Parameters ---------- expand_name : str, default="chebyshev" The expand function that will be used. The supported expand functions are: {"chebyshev", "legendre", "gegenbauer", "laguerre", "hermite", "power", "trigonometric"} n_funcs : int, default=4 The first `n_funcs` in expand functions list will be used. Valid value from 1 to 10. act_name : str, default='none' Activation function for the hidden layer. The supported activation functions are: {"none", "relu", "prelu", "gelu", "elu", "selu", "rrelu", "tanh", "hard_tanh", "sigmoid", "hard_sigmoid", "swish", "hard_swish", "soft_plus", "mish", "soft_sign", "tanh_shrink", "soft_shrink", "hard_shrink"} obj_name : None or str, default=None The name of objective for the problem, also depend on the problem is classification and regression. optimizer : str or instance of Optimizer class (from Mealpy library), default = "BaseGA" The Metaheuristic Algorithm that use to solve the feature selection problem. Current supported list, please check it here: https://github.com/thieu1995/mealpy. If a custom optimizer is passed, make sure it is an instance of `Optimizer` class. optimizer_paras : None or dict of parameter, default=None The parameter for the `optimizer` object. If `None`, the default parameters of optimizer is used (defined in https://github.com/thieu1995/mealpy.) If `dict` is passed, make sure it has at least `epoch` and `pop_size` parameters. verbose : bool, default=True Whether to print progress messages to stdout. """ SUPPORTED_OPTIMIZERS = list(get_all_optimizers().keys()) SUPPORTED_CLS_OBJECTIVES = get_all_classification_metrics() SUPPORTED_REG_OBJECTIVES = get_all_regression_metrics() def __init__(self, expand_name="chebyshev", n_funcs=4, act_name="none", obj_name=None, optimizer="BaseGA", optimizer_paras=None, verbose=True): super().__init__(expand_name=expand_name, n_funcs=n_funcs, act_name=act_name) self.obj_name = obj_name self.optimizer_paras = optimizer_paras self.optimizer = self._set_optimizer(optimizer, optimizer_paras) self.verbose = verbose self.network, self.obj_scaler = None, None self.obj_weights = None def _set_optimizer(self, optimizer=None, optimizer_paras=None): if type(optimizer) is str: opt_class = get_optimizer_by_name(optimizer) if type(optimizer_paras) is dict: return opt_class(**optimizer_paras) else: return opt_class(epoch=500, pop_size=50) elif isinstance(optimizer, Optimizer): if type(optimizer_paras) is dict: return optimizer.set_parameters(optimizer_paras) return optimizer else: raise TypeError(f"optimizer needs to set as a string and supported by Mealpy library.") def _get_history_loss(self, optimizer=None): list_global_best = optimizer.history.list_global_best # 2D array / matrix 2D global_obj_list = np.array([agent.target.fitness for agent in list_global_best]) # Make each obj_list as an element in array for drawing return global_obj_list
[docs] def objective_function(self, solution=None): pass
[docs] def fit(self, X, y, lb=(-1.0, ), ub=(1.0, ), save_population=False): self.network, self.obj_scaler = self.create_network(X, y) y_scaled = self.obj_scaler.transform(y) self.X_temp, self.y_temp = X, y_scaled problem_size = self.network.get_weights_size() if type(lb) in (list, tuple, np.ndarray) and type(ub) in (list, tuple, np.ndarray): if len(lb) == len(ub): if len(lb) == 1: lb = np.array(lb * problem_size, dtype=float) ub = np.array(ub * problem_size, dtype=float) elif len(lb) != problem_size: raise ValueError(f"Invalid lb and ub. Their length should be equal to 1 or problem_size.") else: raise ValueError(f"Invalid lb and ub. They should have the same length.") elif type(lb) in (int, float) and type(ub) in (int, float): lb = (float(lb), ) * problem_size ub = (float(ub), ) * problem_size else: raise ValueError(f"Invalid lb and ub. They should be a number of list/tuple/np.ndarray with size equal to problem_size") log_to = "console" if self.verbose else "None" if self.obj_name is None: raise ValueError("obj_name can't be None") else: if self.obj_name in self.SUPPORTED_REG_OBJECTIVES.keys(): minmax = self.SUPPORTED_REG_OBJECTIVES[self.obj_name] elif self.obj_name in self.SUPPORTED_CLS_OBJECTIVES.keys(): minmax = self.SUPPORTED_CLS_OBJECTIVES[self.obj_name] else: raise ValueError("obj_name is not supported. Please check the library: permetrics to see the supported objective function.") problem = { "obj_func": self.objective_function, "bounds": FloatVar(lb=lb, ub=ub), "minmax": minmax, "log_to": log_to, "save_population": save_population, "obj_weights": self.obj_weights } self.optimizer.solve(problem) self.network.update_weights_from_solution(self.optimizer.g_best.solution) self.loss_train = self._get_history_loss(optimizer=self.optimizer) return self