import warnings
from tabensemb.utils import *
from tabensemb.data import (
AbstractProcessor,
AbstractFeatureSelector,
AbstractTransformer,
AbstractScaler,
AbstractAugmenter,
)
from tabensemb.data import DataModule
import inspect
from sklearn.model_selection import KFold
from sklearn.feature_selection import VarianceThreshold
from sklearn.preprocessing import StandardScaler as skStandardScaler
from typing import Type
from .utils import get_corr_sets, OrdinalEncoder
[docs]
class SampleDataAugmenter(AbstractAugmenter):
"""
This is a sample of data augmentation, which is not reasonable at all and is only used to test data augmentation.
"""
[docs]
def _get_augmented(
self, data: pd.DataFrame, datamodule: DataModule
) -> pd.DataFrame:
augmented = data.loc[data.index[-2:], :].copy()
return augmented
[docs]
class FeatureValueSelector(AbstractProcessor):
"""
Select data with the specified feature value.
Parameters
----------
feature: str
The feature that will be filtered.
value: float
The selected feature value.
Notes
-----
The ``FeatureValueSelector`` will not change anything in the upcoming dataset, which means that the value in the
upcoming set may exceed the range you expect. A typical error can be "IndexError: index out of range in self" from
``torch.embedding`` because of categorical features.
"""
[docs]
def _required_kwargs(self):
return ["feature", "value"]
[docs]
class IQRRemover(AbstractProcessor):
"""
Remove outliers using the IQR strategy. Outliers are those
out of the range [25-percentile - 1.5 * IQR, 75-percentile + 1.5 * IQR], where IQR = 75-percentile - 25-percentile.
"""
[docs]
class StdRemover(AbstractProcessor):
"""
Remove outliers using the standard error strategy. Outliers are those out of the range of 3sigma.
"""
[docs]
class NaNFeatureRemover(AbstractFeatureSelector):
"""
Remove features that contain no valid value.
"""
[docs]
def _get_feature_names_out(self, data, datamodule):
retain_features = []
all_missing_idx = np.where(
pd.isna(data[datamodule.all_feature_names]).values.all(axis=0)
)[0]
for idx, feature in enumerate(datamodule.all_feature_names):
if idx not in all_missing_idx:
retain_features.append(feature)
return retain_features
[docs]
class RFEFeatureSelector(AbstractFeatureSelector):
"""
Select features using recursive feature elimination, adapted from the implementation of RFECV in sklearn.
Available arguments:
n_estimators: int
The number of trees used in random forests.
step: int
The number of eliminated features at each step.
min_features_to_select: int
The minimum number of features.
method: str
The method of calculating importance. "auto" for default impurity-based method implemented in
RandomForestRegressor, and "shap" for SHAP value (which may slow down the program but is more accurate).
"""
[docs]
def _defaults(self):
return dict(
n_estimators=100, step=1, verbose=0, min_features_to_select=1, method="auto"
)
[docs]
def _get_feature_names_out(self, data, datamodule):
from tabensemb.utils.processors.rfecv import ExtendRFECV
import shap
cv = KFold(5)
def importance_getter(estimator, data):
np.random.seed(0)
selected_data = data.loc[
np.random.choice(
np.arange(data.shape[0]),
size=min(100, data.shape[0]),
replace=False,
),
:,
]
return np.mean(
np.abs(shap.Explainer(estimator)(selected_data).values),
axis=0,
)
rfecv = ExtendRFECV(
# RFECV does not support categorical encoding. The estimator should have `coef_` or `feature_importances_`
# so pipeline is not valid if importance_getter=="auto". shap can not handle a pipeline either.
estimator=datamodule.get_base_predictor(
categorical=False,
n_estimators=self.kwargs["n_estimators"],
n_jobs=-1,
random_state=0,
),
step=self.kwargs["step"],
cv=cv,
scoring="neg_root_mean_squared_error",
min_features_to_select=self.kwargs["min_features_to_select"],
n_jobs=-1,
verbose=self.kwargs["verbose"],
importance_getter=(
importance_getter
if self.kwargs["method"] == "shap"
else self.kwargs["method"]
),
)
if len(datamodule.label_name) > 1:
warnings.warn(
f"Multi-target task is not supported by {self.__class__.__name__}. Only the first label is used."
)
data.columns = [str(x) for x in data.columns]
rfecv.fit(
data[datamodule.all_feature_names],
data[datamodule.label_name[0]].values.flatten(),
)
retain_features = list(rfecv.get_feature_names_out())
return retain_features
[docs]
class VarianceFeatureSelector(AbstractFeatureSelector):
"""
Remove features that almost (by a certain fraction) contain an identical value.
Parameters
----------
thres: float
If more than thres * 100 percent of values are the same, the feature is removed.
"""
[docs]
def _defaults(self):
return dict(thres=0.8)
[docs]
def _get_feature_names_out(self, data, datamodule):
thres = self.kwargs["thres"]
sel = VarianceThreshold(threshold=(thres * (1 - thres)))
sel.fit(
data[datamodule.all_feature_names],
(
data[datamodule.label_name].values.flatten()
if len(datamodule.label_name) == 1
else data[datamodule.label_name].values
), # Ignored.
)
retain_features = list(sel.get_feature_names_out())
return retain_features
[docs]
class CorrFeatureSelector(AbstractFeatureSelector):
"""
Select features that are not correlated (in the sense of Pearson correlation). Correlated features will be ranked
by SHAP using RandomForestRegressor, and the feature with the highest importance will be selected.
Parameters
----------
thres:
The threshold of the Pearson correlation coefficient.
n_estimators:
The number of trees used in random forests.
"""
[docs]
def _defaults(self):
return dict(thres=0.8, n_estimators=100)
[docs]
def _get_feature_names_out(self, data, datamodule):
import shap
abs_corr = datamodule.cal_corr(imputed=False, features_only=True).abs()
where_corr = np.where(abs_corr > self.kwargs["thres"])
corr_feature, corr_sets = get_corr_sets(
where_corr, datamodule.cont_feature_names
)
rf = datamodule.get_base_predictor(
categorical=False,
n_estimators=self.kwargs["n_estimators"],
n_jobs=-1,
random_state=0,
)
rf.fit(
data[datamodule.all_feature_names],
(
data[datamodule.label_name].values.flatten()
if len(datamodule.label_name) == 1
else data[datamodule.label_name].values
),
)
explainer = shap.Explainer(rf)
shap_values = explainer(
data.loc[
np.random.choice(
np.array(data.index), size=min([100, len(data)]), replace=False
),
datamodule.all_feature_names,
]
)
retain_features = list(
np.setdiff1d(datamodule.cont_feature_names, corr_feature)
)
attr = np.mean(np.abs(shap_values.values), axis=0)
print("Correlated features (Ranked by SHAP):")
for corr_set in corr_sets:
set_shap = [attr[datamodule.all_feature_names.index(x)] for x in corr_set]
max_shap_feature = corr_set[set_shap.index(np.max(set_shap))]
retain_features += [max_shap_feature]
order = np.array(set_shap).argsort()
corr_set_dict = {}
for idx in order[::-1]:
corr_set_dict[corr_set[idx]] = set_shap[idx]
print(pretty(corr_set_dict))
retain_features += datamodule.cat_feature_names
return retain_features
[docs]
class StandardScaler(AbstractScaler):
"""
A standard scaler implemented using StandardScaler from sklearn.
"""
[docs]
class CategoricalOrdinalEncoder(AbstractTransformer):
"""
A categorical feature encoder that transforms string values to unique integer values.
See :class:`~tabensemb.data.utils.OrdinalEncoder` for details.
"""
[docs]
def __init__(self, **kwargs):
super(CategoricalOrdinalEncoder, self).__init__(**kwargs)
self.record_feature_mapping = None
[docs]
def var_slip(self, feature_name, x):
return x
processor_mapping = {}
clsmembers = inspect.getmembers(sys.modules[__name__], inspect.isclass)
for name, cls in clsmembers:
if issubclass(cls, AbstractProcessor):
processor_mapping[name] = cls
[docs]
def get_data_processor(name: str) -> Type[AbstractProcessor]:
if name not in processor_mapping.keys():
raise Exception(f"Data processor {name} not implemented.")
elif not issubclass(processor_mapping[name], AbstractProcessor):
raise Exception(f"{name} is not the subclass of AbstractProcessor.")
else:
return processor_mapping[name]