Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions week2/friday/you_do_4/features/FeatureExtractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
import pandas as pd
from pandas import DataFrame
from typing import Literal, List
import numpy as np

class FeatureExtractor:
def __init__(
self,
past_knowledge: DataFrame,
cyclical_feature_names: List[str],
freq: str = "D",
lag_size: int = 30,
window_size: int = 30,
):
self.PAST_KNOWLEDGE = past_knowledge.sort_values(by="datetime")
self.cyclical_feature_names = cyclical_feature_names
self.lag_size = lag_size
self.window_size = window_size
self.freq = freq

def transform(self, dates_to_predict: pd.DatetimeIndex) -> DataFrame:
df = self._get_all_ranges(dates_to_predict)
self.full_df = df.join(self.PAST_KNOWLEDGE, how="left")

return (
df.pipe(self._start_pipeline)
.pipe(self._add_lag_features)
.pipe(self._add_rolling_window_features)
.pipe(self._add_exponential_moving_features)
.pipe(self._drop_columns_with_same_values)
.pipe(self._expand_datetime)
.pipe(self._add_fourier_features)
.pipe(
lambda df: df.astype(
{
col: "int32"
for col in df.select_dtypes(["int", "uint32"]).columns
}
)
)
.pipe(
lambda df: df.astype(
{col: "float32" for col in df.select_dtypes("float").columns}
)
)
.bfill()
.loc[dates_to_predict, :]
)

def _start_pipeline(self, df: pd.DataFrame) -> pd.DataFrame:
return df.copy().sort_index()

def _get_all_ranges(self, dates_to_predict: pd.DatetimeIndex) -> pd.DataFrame:
start_date = min(dates_to_predict.min(), self.PAST_KNOWLEDGE.index.min())
end_date = max(dates_to_predict.max(), self.PAST_KNOWLEDGE.index.max())
complete_date_range = pd.date_range(
start=start_date, end=end_date, freq=self.freq
)
return pd.DataFrame(index=complete_date_range)

def _add_lag_features(
self,
df: DataFrame,
fillna_with: Literal["ffill", "bfill"] | None = "bfill",
) -> DataFrame:
columns_to_use = self.PAST_KNOWLEDGE.select_dtypes(
include=["number", "object"]
).columns.tolist()

created_features = [
self.full_df[col].shift(i).rename(f"{col}_lag_{i}")
for i in range(1, self.lag_size + 1)
for col in columns_to_use
]

lags_df = pd.concat(created_features, axis=1)

df = df.join(
lags_df,
how="left",
)

if fillna_with == "ffill":
df = df.ffill()
elif fillna_with == "bfill":
df = df.bfill()

return df

def _add_rolling_window_features(
self,
df: DataFrame,
fillna_with: Literal["ffill", "bfill"] | None = "ffill",
) -> DataFrame:
columns_to_use = self.PAST_KNOWLEDGE.select_dtypes(
include=["float"]
).columns.tolist()

metrics = ["mean", "std", "min", "max", "median", "var"]

created_features = [
(
self.full_df[col]
.rolling(window=size, min_periods=1)
.agg(metrics)
.rename(columns=lambda metric: f"{col}_rw{size}_{metric}")
)
for size in range(2, self.window_size + 1)
for col in columns_to_use
]

window_df = pd.concat(created_features, axis=1)

df = df.join(
window_df,
how="left",
)

if fillna_with == "ffill":
df = df.ffill()
elif fillna_with == "bfill":
df = df.bfill()

return df

def _drop_columns_with_same_values(self, df: DataFrame, threshold=0.9) -> DataFrame:
to_drop = [
col
for col in df.columns
if df[col].value_counts(normalize=True, dropna=False).values[0] >= threshold
]
return df.drop(columns=to_drop)

def _add_exponential_moving_features(
self, df: pd.DataFrame, up_to: int = 30
) -> pd.DataFrame:
columns_to_use = self.PAST_KNOWLEDGE.select_dtypes(
include=["float"]
).columns.tolist()

metrics = ["mean", "std", "var"]

created_features = [
(
self.full_df[col]
.ewm(span=span, adjust=False)
.agg(metrics)
.rename(columns=lambda metric: f"{col}_em_{span}_{metric}")
)
for span in range(2, up_to + 1)
for col in columns_to_use
]

exponential_moving_df = pd.concat(created_features, axis=1)

df = df.join(
exponential_moving_df,
how="left",
)
return df

def _expand_datetime(self, df: DataFrame) -> DataFrame:
return df.assign(
**{
"year": lambda a_df: a_df.index.year,
"month": lambda a_df: a_df.index.month,
"day": lambda a_df: a_df.index.day,
"hour": lambda a_df: a_df.index.hour,
"day_of_year": lambda a_df: a_df.index.dayofyear,
"week_of_year": lambda a_df: a_df.index.isocalendar().week,
"quarter": lambda a_df: a_df.index.quarter,
# "season": lambda a_df: a_df.index.month % 12 // 3 + 1,
"is_weekend": lambda a_df: np.vectorize({True: 1, False: 0}.get)(
a_df.index.weekday >= 5
),
}
)

def _add_fourier_features(self, df: pd.DataFrame, num_terms: int = 7) -> DataFrame:
for col, max_val in self.cyclical_feature_names.items():
source = self._get_column_source(df, col)

for i in range(1, num_terms + 1):
operation = 2 * np.pi * i * source[col] / max_val

df[f"fourier_sin_{col}_{i}"] = np.sin(operation)
df[f"fourier_cos_{col}_{i}"] = np.cos(operation)

return df

def _get_column_source(self, df: DataFrame, col: str) -> List[str]:
if col in df.columns:
source = df
elif col in self.PAST_KNOWLEDGE.columns:
source = self.PAST_KNOWLEDGE
else:
raise KeyError(f"{col} not found both in df and past knowledge.")
return source
175 changes: 175 additions & 0 deletions week2/friday/you_do_4/features/TargetEncoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import pandas as pd
from sklearn.model_selection import KFold, train_test_split
from typing import Any, List, Union, Literal, Optional, Protocol, runtime_checkable
from sktime.split.base import BaseSplitter


@runtime_checkable
class FoldProtocol(Protocol):
def split(self, X, y=None, groups=None) -> Any:
pass


class TargetEncoder:
def __init__(
self,
random_state: Optional[int] = None,
columns: Union[List[str], str, Literal["auto"]] = "auto",
smoothing: Union[float, int] = 20,
cv: Union[int, FoldProtocol] = 5,
drop_after_transform: bool = True,
fill_zeros_with_mean: bool = True,
fillna_with_mean: bool = True,
epsilon: Union[float | int] = 1e-10,
stats_to_include: List[str] = [
"mean",
"median",
"var",
"std",
"skew",
"min",
"max",
],
):
self.SEED = random_state
self.smoothing = smoothing
self.columns = columns
self.drop_after_transform = drop_after_transform
self.fillna_with_mean = fillna_with_mean
self.fill_zeros_with_mean = fill_zeros_with_mean
self.cv = cv
self.EPSILON = epsilon
self.mappings = {}
self.stats_to_include = stats_to_include

def _get_column_names_to_process(
self,
X: Optional[pd.DataFrame] = None,
) -> List[str]:
if isinstance(self.columns, str) and self.columns == "auto":
if X is None:
raise ValueError("X must be provided when columns='auto'")

LOWER_COUNT_LIMIT = 10
categorical_columns = X.select_dtypes(include="object").columns
high_cardinality_columns = [
col
for col in categorical_columns
if X[col].nunique() >= LOWER_COUNT_LIMIT
]
return high_cardinality_columns

elif isinstance(self.columns, str):
return [self.columns]
elif isinstance(self.columns, list):
return self.columns

def _get_fold(self) -> FoldProtocol:
if isinstance(self.cv, int):
return KFold(n_splits=self.cv, shuffle=True, random_state=self.SEED)
elif isinstance(self.cv, FoldProtocol):
return self.cv
raise ValueError(
"cv must be an integer or an object implementing a 'split' method"
)

def fit(self, X: pd.DataFrame, y: pd.DataFrame) -> "TargetEncoder":
X = X.copy()
y = y.copy()
self.columns = self._get_column_names_to_process(X)
self.TARGET = y.columns[0]
fold = self._get_fold()

for column in self.columns:
splitter = self._get_splitter(X, y, fold)

if column not in X.columns:
raise ValueError(f"Column '{column}' not found in X")

encodings = [
self._generate_encodings(X.iloc[train_idx], y.iloc[train_idx], column)
for train_idx, _ in splitter
]

self.mappings[column] = self._calculate_weighted_means_of_encodings(
encodings, column
)

return self

def _get_splitter(
self,
X: Optional[pd.DataFrame],
y: pd.DataFrame,
fold: FoldProtocol,
):
return fold.split(y) if isinstance(fold, BaseSplitter) else fold.split(X, y)

def _generate_encodings(
self, X_train_fold: pd.DataFrame, y_train_fold, column: str
):
dataset = pd.concat([X_train_fold, y_train_fold], axis=1)
global_mean = dataset[self.TARGET].agg(self.stats_to_include)

grouped_target = dataset.groupby(column)[self.TARGET]
grouped_target_count = grouped_target.count().iloc[0]

numerator = grouped_target.agg(self.stats_to_include) + (
(self.smoothing * global_mean) + self.EPSILON
)
denominator = (grouped_target_count * self.smoothing) + self.EPSILON

result = numerator / denominator

count_by_category = grouped_target.count().rename("count")
return pd.concat([result, count_by_category], axis=1)

def _calculate_weighted_means_of_encodings(
self, encodings: List[pd.DataFrame], column: str
) -> dict:
combined_encodings = pd.concat(encodings)
group = combined_encodings.groupby(column)

return {
stat: group.apply(lambda x: (x[stat] * x["count"]).sum() / x["count"].sum())
for stat in self.stats_to_include
}

def transform(self, X: pd.DataFrame) -> pd.DataFrame:
X = X.copy()

for column in self.columns:
column_mappings = self.mappings[column]

for stat in self.stats_to_include:
generated_column = f"te_{column}_{stat}"

X[generated_column] = (
X[column].map(column_mappings[stat]).astype("float32")
)

if self.fillna_with_mean:
mean_value = column_mappings[stat].mean().astype("float32")
X[generated_column] = X[generated_column].fillna(mean_value)

if self.fill_zeros_with_mean:
mean_value = column_mappings[stat].mean().astype("float32")
X.loc[X[generated_column] == 0, generated_column] = mean_value

if self.drop_after_transform:
X.drop(columns=self.columns, inplace=True)

return X

def fit_transform(self, X: pd.DataFrame, y: pd.DataFrame) -> pd.DataFrame:
self.fit(X, y)
return self.transform(X)


def get_data(address: str, target: str, SEED: int):
auto_df = pd.read_csv(address)

X = auto_df.loc[:, auto_df.columns != target]
y = auto_df.loc[:, auto_df.columns == target]

return train_test_split(X, y, test_size=0.2, random_state=SEED)
Empty file.
Loading