Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions numerai_tools/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from typing import List, Union, Optional

import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder # type: ignore

from numerai_tools.scoring import tie_kept_rank

DEFAULT_BINS = (0.0, 0.25, 0.5, 0.75, 1.0)
DEFAULT_QUANTILES = (0.05, 0.25, 0.75, 0.95)


def one_hot_encode(
df: pd.DataFrame, columns: List[str], dtype: type = np.float64
) -> pd.DataFrame:
"""One-hot encodes specified columns in a pandas dataframe.
Each column i should have x_i discrete values (eg. categories, bucket values, etc.)
and will be converted to x_i columns that each have 0s for rows that don't have
the associated value and 1s for rows that do have that value.

Arguments:
df: pd.DataFrame - the data with columns to one-hot encode
columns: List[str] - list of columns names to replace w/ one-hot encoding
dtype: type = np.float64 - the target datatype for the resulting columns

Returns:
pd.DataFrame - original data, but specified cols replaced w/ one-hot encoding
"""
for col in columns:
encoder = OneHotEncoder(dtype=dtype)
one_hot = encoder.fit_transform(df[[col]])
one_hot = pd.DataFrame(
one_hot.toarray(),
columns=encoder.get_feature_names_out(),
index=df.index,
)
df = df.join(one_hot).drop(columns=col)
return df


def balanced_rank_transform(
df: pd.DataFrame,
cols: List[str],
rank_group: Optional[str] = None,
rank_filter: Optional[str] = None,
) -> pd.DataFrame:
"""
Perform a balanced rank transformation on specified columns of a DataFrame,
optionally within groups and with a filter.

Parameters
----------
df : pd.DataFrame
Input DataFrame containing the data to be ranked.
cols : list of str
List of column names to apply the rank transformation to.
rank_group : str
Column name to group by before ranking.
rank_filter : str, optional
Column name to filter rows before ranking. Only rows where this column is True
will be ranked. If None, no filtering is applied.

Returns
-------
pd.DataFrame
DataFrame with the same index as the input, containing the ranked columns.
"""
if rank_filter is not None:
df = df.loc[df[rank_filter]]
else:
df = df
if rank_group is not None:
df = df.groupby(rank_group, group_keys=False).apply(
lambda d: tie_kept_rank(d[cols])
)
else:
df = tie_kept_rank(df[cols])
return df[cols]


def quantile_bin(
data: Union[pd.Series, pd.DataFrame],
bins: tuple[float, ...] = DEFAULT_BINS,
quantiles: tuple[float, ...] = DEFAULT_QUANTILES,
) -> pd.DataFrame:
"""
Bin a Series or DataFrame into discrete quantile-based bins.
Handles identical-value columns by assigning all values to the lowest bin.

Parameters
----------
data : pd.Series or pd.DataFrame
Data to bin.
bins : list of float
Values to assign to each bin.
quantiles : list of float
Quantile thresholds to use for binning (len = number of bins - 1)

Returns
-------
pd.DataFrame
Binned values, same shape as input.
"""
assert len(bins), "Invalid bins! Must not be empty."
assert len(quantiles), "Invalid quantiles! Must not be empty."
assert len(quantiles) == (
len(bins) - 1
), "Invalid quantiles! Length must be 1 less than bins."

if isinstance(data, pd.Series):
data = data.to_frame(name="value")

binned = data.copy()
for col in binned.columns:
s = binned[col].astype(float)

# handle all-identical values
if s.nunique() <= 1:
binned[col] = 0.0
continue

# calculate quantile thresholds
q = s.quantile(quantiles)

# assign bins according to quantiles
s.loc[s <= q[quantiles[0]]] = bins[0]
for i in range(1, len(bins) - 1):
s.loc[(s > q[quantiles[i - 1]]) & (s <= q[quantiles[i]])] = bins[i]
s.loc[s >= q[quantiles[-1]]] = bins[-1]
Comment on lines +125 to +129
Copy link

Copilot AI Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The binning logic has overlapping conditions that could cause values to be assigned to multiple bins. Values exactly equal to quantile thresholds will be overwritten by subsequent assignments. The conditions should be mutually exclusive.

Suggested change
# assign bins according to quantiles
s.loc[s <= q[quantiles[0]]] = bins[0]
for i in range(1, len(bins) - 1):
s.loc[(s > q[quantiles[i - 1]]) & (s <= q[quantiles[i]])] = bins[i]
s.loc[s >= q[quantiles[-1]]] = bins[-1]
# assign bins according to quantiles using pd.cut for mutually exclusive bins
bin_edges = [-np.inf] + [q[q_idx] for q_idx in quantiles] + [np.inf]
s = pd.cut(s, bins=bin_edges, labels=bins, include_lowest=True).astype(float)

Copilot uses AI. Check for mistakes.

binned[col] = s.astype(float)

return binned
106 changes: 106 additions & 0 deletions numerai_tools/indexing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from typing import List, Tuple, cast, Any

import numpy as np
import pandas as pd

# leaving this here for backwards compatibility
from numerai_tools.typing import S1, S2


# sometimes when we match up the target/prediction indices,
# changes in stock universe causes some stocks to enter / leave,
# this ensures we don't filter too much
DEFAULT_MAX_FILTERED_INDEX_RATIO = 0.2


def filter_sort_index(
s1: S1, s2: S2, max_filtered_ratio: float = DEFAULT_MAX_FILTERED_INDEX_RATIO
) -> Tuple[S1, S2]:
"""Filters the indices of the given series to match each other,
then sorts the indices, then checks that we didn't filter too many indices
before returning the filtered and sorted series.

Arguments:
s1: Union[pd.DataFrame, pd.Series] - the first dataset to filter and sort
s2: Union[pd.DataFrame, pd.Series] - the second dataset to filter and sort

Returns:
Tuple[
Union[pd.DataFrame, pd.Series],
Union[pd.DataFrame, pd.Series],
] - the filtered and sorted datasets
"""
ids = s1.dropna().index.intersection(s2.dropna().index)
# ensure we didn't filter too many ids
assert len(ids) / len(s1) >= (1 - max_filtered_ratio), (
"s1 does not have enough overlapping ids with s2,"
f" must have >= {round(1-max_filtered_ratio,2)*100}% overlapping ids"
)
assert len(ids) / len(s2) >= (1 - max_filtered_ratio), (
"s2 does not have enough overlapping ids with s1,"
f" must have >= {round(1-max_filtered_ratio,2)*100}% overlapping ids"
)
return cast(S1, s1.loc[ids].sort_index()), cast(S2, s2.loc[ids].sort_index())


def filter_sort_index_many(
inputs: List[Any],
max_filtered_ratio: float = DEFAULT_MAX_FILTERED_INDEX_RATIO,
) -> List[Any]:
"""Filters the indices of the given list of series to match each other,
then sorts the indices, then checks that we didn't filter too many indices
before returning the filtered and sorted series.

Arguments:
inputs: List[Union[pd.DataFrame, pd.Series]] - the list of datasets to filter and sort

Returns:
List[Union[pd.DataFrame, pd.Series]] - the filtered and sorted datasets
"""
assert len(inputs) > 0, "List must contain at least one element"
ids = inputs[0].dropna().index
for i in range(1, len(inputs)):
ids = ids.intersection(inputs[i].dropna().index)
result = [inputs[i].loc[ids].sort_index() for i in range(len(inputs))]
# ensure we didn't filter too many ids
for i in range(len(result)):
assert len(result[i]) / len(inputs[i]) >= (1 - max_filtered_ratio), (
f"inputs[{i}] does not have enough overlapping ids with the others,"
f" must have >= {round(1-max_filtered_ratio,2)*100}% overlapping ids"
)
return result


def filter_sort_top_bottom(
s: pd.Series, top_bottom: int
) -> Tuple[pd.Series, pd.Series]:
"""Filters the series according to the top n and bottom n values
then sorts the index and returns two filtered and sorted series
for the top and bottom values respectively.

Arguments:
s: pd.Series - the data to filter and sort
top_bottom: int - the number of top n and bottom n values to keep

Returns:
Tuple[pd.Series, pd.Series] - the filtered and sorted top and bottom series respectively
"""
tb_idx = np.argsort(s, kind="stable")
bot = s.iloc[tb_idx[:top_bottom]]
top = s.iloc[tb_idx[-top_bottom:]]
return top.sort_index(), bot.sort_index()


def filter_sort_top_bottom_concat(s: pd.Series, top_bottom: int) -> pd.Series:
"""Similar to filter_sort_top_bottom, but concatenates the top and bottom series
into 1 series and then sorts the index.

Arguments:
s: pd.Series - the data to filter and sort
top_bottom: int - the number of top n and bottom n values to keep

Returns:
pd.Series - the concatenated and sorted series of top and bottom values
"""
top, bot = filter_sort_top_bottom(s, top_bottom)
return pd.concat([top, bot]).sort_index()
Loading