Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactored the train / test data splitting quite a bit and prepare da…
…ta for sample weights during fit
  • Loading branch information
somefreestring committed Feb 12, 2020
commit f82412be31529f41879ec27d9317dfaec87eaf89
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ class FeaturesAndLabels(object):
def __init__(self,
features: List[str],
labels: _LABELS,
label_type:Type = int,
label_type: Type = int, # FIXME default to None!
gross_loss: Callable[[str, pd.DataFrame], Union[pd.Series, pd.DataFrame]] = None,
targets: Callable[[str, pd.DataFrame], Union[pd.Series, pd.DataFrame]] = None,
feature_lags: Iterable[int] = None,
feature_rescaling: Dict[Tuple[str, ...], Tuple[int, ...]] = None, # fiXme lets provide a rescaler ..
feature_rescaling: Dict[Tuple[str, ...], Tuple[int, ...]] = None, # TODO lets provide a rescaler ..
lag_smoothing: Dict[int, Callable[[pd.Series], pd.Series]] = None,
pre_processor: Callable[[pd.DataFrame, Dict], pd.DataFrame] = lambda x: x,
**kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pandas_ml_utils.model.features_and_labels.target_encoder import TargetLabelEncoder, \
MultipleTargetEncodingWrapper, IdentityEncoder
from pandas_ml_utils.utils.classes import ReScaler
from pandas_ml_utils.utils.functions import log_with_time, call_callable_dynamic_args
from pandas_ml_utils.utils.functions import log_with_time, call_callable_dynamic_args, unique_top_level_columns

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,6 +69,14 @@ def call_dynamic(func, *args):
self._df = call_dynamic(features_and_labels.pre_processor, df)
self.__call_dynamic = call_dynamic

@property
def df(self):
return self._df

@property
def min_required_samples(self):
return len(self._df) - len(self.features_df) + 1

def prediction_to_frame(self,
prediction: np.ndarray,
index: pd.Index = None,
Expand Down Expand Up @@ -109,45 +117,35 @@ def prediction_to_frame(self,
# finally we can return our nice and shiny df
return df

@property
def df(self):
return self._df

@property
def features(self) -> Tuple[pd.DataFrame, np.ndarray]:
df = self.features_df
x = self._fix_shape(df)

_log.info(f"features shape: {x.shape}")
return df, x

@property
def min_required_samples(self):
return len(self._df) - len(self.features_df) + 1
# FIXME actually we would only need the index not the whole data frame
# return df.index.tolist(), x
return df, x

@property
def features_labels(self) -> Tuple[pd.DataFrame, np.ndarray, np.ndarray]:
def features_labels_weights_df(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
# engineer features and labels
df_features = self.features_df
df_labels = self.labels_df
df = self.features_df.join(df_labels, how='inner').dropna()
index_intersect = df_features.index.intersection(df_labels.index)

# select only joining index values
df_features = df_features.loc[df.index]
df_labels = df_labels.loc[df.index]

# features eventually are in RNN shape which is [row, time_step, feature]
x = self._fix_shape(df_features)

# labels are straight forward but eventually need to be type corrected
y = df_labels.values.astype(self._label_type)
_log.info(f" features shape: {x.shape}, labels shape: {y.shape}")
df_features = df_features.loc[index_intersect]
df_labels = df_labels.loc[index_intersect]
# TODO add proper label weights
df_weights = pd.DataFrame(np.ones(df_labels.shape), index=df_labels.index)

# sanity check
if not len(x) == len(y) == len(df):
raise ValueError(f"unbalanced length of features and labels {len(x), len(y), len(df)}")
if not len(df_features) == len(df_labels):
raise ValueError(f"unbalanced length of features and labels {len(df_features), len(df_labels)}")

return df, x, y
return df_features, df_labels, df_weights

@property
@lru_cache(maxsize=1)
Expand Down Expand Up @@ -203,6 +201,9 @@ def features_df(self) -> pd.DataFrame:
dff[col] = tmp[col]

_log.info(f" make features ... done in {pc() - start_pc: .2f} sec!")

# finally patch the "values" property for features data frame and return
dff.__class__ = _RNNShapedValuesDataFrame
return dff

@property
Expand All @@ -227,7 +228,7 @@ def label_names(self, level_above=None) -> List[Union[Tuple[str, ...],str]]:
def labels_df(self) -> pd.DataFrame:
# here we can do all sorts of tricks and encodings ...
df = self._encoder(self._df[self._labels_columns], **self._features_and_labels.kwargs).dropna().copy()
return df
return df if self._label_type is None else df.astype(self._label_type)

@property
def source_df(self):
Expand Down Expand Up @@ -285,7 +286,8 @@ def target_df(self):
return df

def _fix_shape(self, df_features):
# features eventually are in RNN shape which is [row, time_step, feature]
# features eventually are in [feature, row, time_step]
# but need to be in RNN shape which is [row, time_step, feature]
feature_arr = df_features.values if self._features_and_labels.feature_lags is None else \
np.array([df_features[cols].values for cols in self.feature_names], ndmin=3).swapaxes(0, 1).swapaxes(1, 2)

Expand All @@ -296,3 +298,38 @@ def _fix_shape(self, df_features):

def __str__(self):
return f'min required data = {self.min_required_samples}'


class _RNNShapedValuesDataFrame(pd.DataFrame):

class Loc():
def __init__(self, df):
self.df = df

def __getitem__(self, item):
res = self.df.loc[item]
res.__class__ = _RNNShapedValuesDataFrame
return res

@property
def loc(self):
return _RNNShapedValuesDataFrame.Loc(super(pd.DataFrame, self))

@property
def values(self):
top_level_columns = unique_top_level_columns(self)

# we need to do a sneaky trick here to get a proper "super" object as super() does not work as expected
# so we simply rename with an empty dict
df = self.rename({})

# features eventually are in [feature, row, time_step]
# but need to be in RNN shape which is [row, time_step, feature]
feature_arr = df.values if top_level_columns is None else \
np.array([df[feature].values for feature in top_level_columns],
ndmin=3).swapaxes(0, 1).swapaxes(1, 2)

if len(feature_arr) <= 0:
_log.warning("empty feature array!")

return feature_arr
97 changes: 65 additions & 32 deletions pandas_ml_utils/model/fitting/fitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
import logging
from time import perf_counter
from typing import Callable, Tuple, Dict, TYPE_CHECKING
from sklearn.utils.testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning

import numpy as np
import pandas as pd
from sklearn.exceptions import ConvergenceWarning
from sklearn.model_selection import train_test_split as sk_train_test_split
from sklearn.utils.testing import ignore_warnings

from pandas_ml_utils.model.features_and_labels.features_and_labels_extractor import FeatureTargetLabelExtractor
from pandas_ml_utils.model.fitting.fit import Fit
from pandas_ml_utils.model.models import Model
from pandas_ml_utils.summary.summary import Summary
from pandas_ml_utils.model.fitting.train_test_data import make_training_data, make_forecast_data
from pandas_ml_utils.utils.functions import log_with_time
from pandas_ml_utils.model.models import Model

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,15 +48,13 @@ def fit(df: pd.DataFrame,
trails = None
model = model_provider()
features_and_labels = FeatureTargetLabelExtractor(df, model.features_and_labels, **model.kwargs)
_log.info(f"create model ({features_and_labels})")

# make training and test data sets
x_train, x_test, y_train, y_test, index_train, index_test = \
make_training_data(features_and_labels,
test_size,
youngest_size,
seed=test_validate_split_seed)

_log.info(f"create model ({features_and_labels})")
features, labels, weights = features_and_labels.features_labels_weights_df
train_ix, test_ix = train_test_split(features.index, test_size, youngest_size, seed=test_validate_split_seed)
train, test = ((features.loc[train_ix].values, labels.loc[train_ix].values, weights.loc[train_ix].values),
(features.loc[test_ix].values, labels.loc[test_ix].values, weights.loc[test_ix].values))

# eventually perform a hyper parameter optimization first
start_performance_count = log_with_time(lambda: _log.info("fit model"))
Expand All @@ -76,16 +74,17 @@ def fit(df: pd.DataFrame,
constants,
model_provider,
cross_validation,
x_train, y_train, index_train,
x_test, y_test, index_test)
train,
test)

# finally train the model with eventually tuned hyper parameters
__train_loop(model, cross_validation, x_train, y_train, index_train, x_test, y_test, index_test)
__train_loop(model, cross_validation, train, test)
_log.info(f"fitting model done in {perf_counter() - start_performance_count: .2f} sec!")

# assemble result objects
df_train = features_and_labels.prediction_to_frame(model.predict(x_train), index=index_train, inclusive_labels=True)
df_test = features_and_labels.prediction_to_frame(model.predict(x_test), index=index_test, inclusive_labels=True) if x_test is not None else None
df_train = features_and_labels.prediction_to_frame(model.predict(train[0]), index=train_ix, inclusive_labels=True)
df_test = features_and_labels.prediction_to_frame(model.predict(test[0]), index=test_ix, inclusive_labels=True) \
if len(test_ix) > 0 else None

# update minimum required samples
model.features_and_labels._min_required_samples = features_and_labels.min_required_samples
Expand All @@ -94,10 +93,9 @@ def fit(df: pd.DataFrame,
return Fit(model, model.summary_provider(df_train), model.summary_provider(df_test), trails)


def __train_loop(model, cross_validation, x_train, y_train, index_train, x_test, y_test, index_test):
# convert pandas indices to numpy arrays
index_train = np.array(index_train)
index_test = np.array(index_test)
def __train_loop(model, cross_validation, train, test):
x_train, y_train, w_train = train[0], train[1], train[2]
x_test, y_test, w_test = test[0], test[1], test[2]

# apply cross validation
if cross_validation is not None and isinstance(cross_validation, Tuple) and callable(cross_validation[1]):
Expand All @@ -106,15 +104,16 @@ def __train_loop(model, cross_validation, x_train, y_train, index_train, x_test
# cross validation, make sure we re-shuffle every fold_epoch
for f, (train_idx, test_idx) in enumerate(cross_validation[1](x_train, y_train)):
_log.info(f'fit fold {f}')
loss = model.fit(x_train[train_idx], y_train[train_idx], x_train[test_idx], y_train[test_idx],
index_train[train_idx], index_train[test_idx])
loss = model.fit(x_train[train_idx], y_train[train_idx],
x_train[test_idx], y_train[test_idx],
w_train[train_idx], w_train[test_idx])

losses.append(loss)

return np.array(losses).mean()
else:
# fit without cross validation
return model.fit(x_train, y_train, x_test, y_test, index_train, index_test)
return model.fit(x_train, y_train, x_test, y_test, w_train, w_test)


@ignore_warnings(category=ConvergenceWarning)
Expand All @@ -123,16 +122,16 @@ def __hyper_opt(hyper_parameter_space,
constants,
model_provider,
cross_validation,
x_train, y_train, index_train,
x_test, y_test, index_test):
train,
test):
from hyperopt import fmin, tpe, Trials

keys = list(hyper_parameter_space.keys())

def f(args):
sampled_parameters = {k: args[i] for i, k in enumerate(keys)}
model = model_provider(**sampled_parameters, **constants)
loss = __train_loop(model, cross_validation, x_train, y_train, index_train, x_test, y_test, index_test)
loss = __train_loop(model, cross_validation, train, test)
if loss is None:
raise ValueError("Can not hyper tune if model loss is None")

Expand Down Expand Up @@ -160,23 +159,57 @@ def predict(df: pd.DataFrame, model: Model, tail: int = None) -> pd.DataFrame:
_log.warning("could not determine the minimum required data from the model")

features_and_labels = FeatureTargetLabelExtractor(df, model.features_and_labels, **model.kwargs)
dff, x = make_forecast_data(features_and_labels)
x = features_and_labels.features_df
y_hat = model.predict(x.values)

y_hat = model.predict(x)
return features_and_labels.prediction_to_frame(y_hat, index=dff.index, inclusive_labels=False)
return features_and_labels.prediction_to_frame(y_hat, index=x.index, inclusive_labels=False)


def backtest(df: pd.DataFrame, model: Model, summary_provider: Callable[[pd.DataFrame], Summary] = Summary) -> Summary:
features_and_labels = FeatureTargetLabelExtractor(df, model.features_and_labels, **model.kwargs)

# make training and test data sets
x, _, _, _, index, _ = make_training_data(features_and_labels, 0)
y_hat = model.predict(x)
x = features_and_labels.features_df
y_hat = model.predict(x.values)

df_backtest = features_and_labels.prediction_to_frame(y_hat, index=index, inclusive_labels=True, inclusive_source=True)
df_backtest = features_and_labels.prediction_to_frame(y_hat, index=x.index, inclusive_labels=True, inclusive_source=True)
return (summary_provider or model.summary_provider)(df_backtest)


def features_and_label_extractor(df: pd.DataFrame, model: Model) -> FeatureTargetLabelExtractor:
return FeatureTargetLabelExtractor(df, model.features_and_labels, **model.kwargs)


def train_test_split(index: pd.Index,
test_size: float = 0.4,
youngest_size: float = None,
seed: int = 42) -> Tuple[pd.Index, pd.Index]:

# convert data frame index to numpy array
index = index.values

if test_size <= 0:
train, test = index, index[:0]
elif seed == 'youngest':
i = int(len(index) - len(index) * test_size)
train, test = index[:i], index[i:]
else:
random_sample_test_size = test_size if youngest_size is None else test_size * (1 - youngest_size)
random_sample_train_index_size = int(len(index) - len(index) * (test_size - random_sample_test_size))

if random_sample_train_index_size < len(index):
_log.warning(f"keeping youngest {len(index) - random_sample_train_index_size} elements in test set")

# cut the youngest data and use residual to randomize train/test data
index_train, index_test = \
sk_train_test_split(index[:random_sample_train_index_size],
test_size=random_sample_test_size, random_state=seed)

# then concatenate (add back) the youngest data to the random test data
index_test = np.hstack([index_test, index[random_sample_train_index_size:]]) # index is 1D

train, test = index_train, index_test
else:
train, test = sk_train_test_split(index, test_size=random_sample_test_size, random_state=seed)

return pd.Index(train), pd.Index(test)
Loading