Source code for tabensemb.data.dataderiver
from tabensemb.utils import *
from tabensemb.data import AbstractDeriver
import inspect
from typing import Type
from .utils import get_corr_sets
[docs]
class RelativeDeriver(AbstractDeriver):
"""
Dividing a feature by another to derive a new feature. Required arguments are:
absolute_col: str
The feature that needs to be divided.
relative2_col: str
The feature that acts as the denominator.
"""
[docs]
def _required_cols(self):
return ["absolute_col", "relative2_col"]
[docs]
def _required_kwargs(self):
return []
[docs]
def _defaults(self):
return dict(stacked=True, intermediate=False, is_continuous=True)
[docs]
def _derive(self, df, datamodule):
absolute_col = self.kwargs["absolute_col"]
relative2_col = self.kwargs["relative2_col"]
relative = df[absolute_col] / df[relative2_col]
relative = relative.values.reshape(-1, 1)
return relative
[docs]
class SampleWeightDeriver(AbstractDeriver):
"""
Derive weight for each sample in the dataset.
"""
[docs]
def __init__(self, **kwargs):
super(SampleWeightDeriver, self).__init__(**kwargs)
self.percentile_dict = {}
self.unique_vals = {}
self.feature_weight = {}
self.denominator = None
[docs]
def _required_cols(self):
return []
[docs]
def _required_kwargs(self):
return []
[docs]
def _defaults(self):
return dict(stacked=False, intermediate=False, is_continuous=True)
[docs]
def _derive(self, df, datamodule):
if datamodule.training:
self.percentile_dict = {}
self.unique_vals = {}
self.feature_weight = {}
self.denominator = None
train_idx = datamodule.train_indices
cont_feature_names = datamodule.cont_feature_names
cat_feature_names = datamodule.cat_feature_names
weight = pd.DataFrame(
index=df.index, columns=["weight"], data=np.ones((len(df), 1))
)
for feature in cont_feature_names:
if feature == self.kwargs["derived_name"]:
continue
# We can only calculate distributions based on known data, i.e. the training set.
if datamodule.training:
Q1 = np.percentile(
df.loc[train_idx, feature].dropna(axis=0), 25, method="midpoint"
)
Q3 = np.percentile(
df.loc[train_idx, feature].dropna(axis=0), 75, method="midpoint"
)
self.percentile_dict[feature] = (Q1, Q3)
else:
Q1, Q3 = self.percentile_dict[feature]
IQR = Q3 - Q1
if IQR == 0:
continue
upper = df.index[np.where(df[feature] >= (Q3 + 1.5 * IQR))[0]]
lower = df.index[np.where(df[feature] <= (Q1 - 1.5 * IQR))[0]]
idx = np.union1d(upper, lower)
if len(idx) == 0:
continue
if datamodule.training:
train_upper = train_idx[
np.where(df.loc[train_idx, feature] >= (Q3 + 1.5 * IQR))[0]
]
train_lower = train_idx[
np.where(df.loc[train_idx, feature] <= (Q1 - 1.5 * IQR))[0]
]
train_outlier = np.union1d(train_upper, train_lower)
p_outlier = len(train_outlier) / len(train_idx)
feature_weight = -np.log10(p_outlier + 1e-8)
self.feature_weight[feature] = feature_weight
elif feature in self.feature_weight.keys():
feature_weight = self.feature_weight[feature]
else:
continue
weight.loc[idx, "weight"] = weight.loc[idx, "weight"] * (
1.0 + 0.1 * feature_weight
)
for feature in cat_feature_names:
if datamodule.training:
all_cnts = df[feature].value_counts()
unique_values = np.array(all_cnts.index)
train_cnts = df.loc[train_idx, feature].value_counts()
fitted_train_cnts = np.array(
[
train_cnts[x] if x in train_cnts.index else 0.0
for x in unique_values
]
)
p_unique_values = fitted_train_cnts / len(train_idx)
feature_weight = np.abs(
np.log10(p_unique_values + 1e-8)
- np.log10(max(p_unique_values) + 1e-8)
)
self.unique_vals[feature] = unique_values
self.feature_weight[feature] = feature_weight
elif feature in self.unique_vals.keys():
unique_values = self.unique_vals[feature]
feature_weight = self.feature_weight[feature]
else:
continue
for value, w in zip(unique_values, feature_weight):
where_value = df.index[np.where(df[feature] == value)[0]]
weight.loc[where_value, "weight"] = weight.loc[
where_value, "weight"
] * (1.0 + 0.1 * w)
if datamodule.training:
self.denominator = 1 / np.sum(weight.values) * len(df)
weight = weight.values * self.denominator
return weight
[docs]
class UnscaledDataDeriver(AbstractDeriver):
"""
Record unscaled data in DataModule.derived_data so that :class:`~tabensemb.model.base.TorchModel` can access it.
"""
[docs]
def _required_cols(self):
return []
[docs]
def _required_kwargs(self):
return []
[docs]
def _defaults(self):
return dict(stacked=False, intermediate=False, is_continuous=True)
[docs]
def _derive(self, df, datamodule):
if self.kwargs["stacked"]:
raise Exception(
f"{self.__class__.__name__} can not derive stacked features (behavior when "
f"``datamodule._force_features=True`` is not defined)."
)
return df[datamodule.cont_feature_names].values
deriver_mapping = {}
clsmembers = inspect.getmembers(sys.modules[__name__], inspect.isclass)
for name, cls in clsmembers:
if issubclass(cls, AbstractDeriver):
deriver_mapping[name] = cls
[docs]
def get_data_deriver(name: str) -> Type[AbstractDeriver]:
if name not in deriver_mapping.keys():
raise Exception(f"Data deriver {name} not implemented.")
elif not issubclass(deriver_mapping[name], AbstractDeriver):
raise Exception(f"{name} is not the subclass of AbstractDeriver.")
else:
return deriver_mapping[name]