|
| 1 | +import pickle |
| 2 | + |
1 | 3 | import hydra |
2 | 4 | import pandas as pd |
3 | 5 | from hydra.utils import to_absolute_path as abspath |
4 | | -from nltk.tokenize import TweetTokenizer |
5 | 6 | from prefect import flow, task |
6 | | -from sklearn.ensemble import RandomForestClassifier |
7 | | -from sklearn.feature_extraction.text import TfidfVectorizer |
| 7 | +from sklearn.model_selection import train_test_split |
8 | 8 |
|
| 9 | +pd.options.mode.chained_assignment = None |
9 | 10 | # ---------------------------------------------------------------------------- # |
10 | 11 | # Create tasks # |
11 | 12 | # ---------------------------------------------------------------------------- # |
12 | 13 |
|
13 | 14 |
|
14 | 15 | @task |
15 | 16 | def get_data(data_path: str): |
16 | | - train = pd.read_csv(abspath(data_path.train)) |
17 | | - test = pd.read_csv(abspath(data_path.test)) |
18 | | - return {"train": train, "test": test} |
| 17 | + return pd.read_csv(abspath(data_path)) |
19 | 18 |
|
20 | 19 |
|
21 | | -@task |
22 | | -def get_all_data(data: dict): |
23 | | - return pd.concat([data["train"], data["test"]]) |
| 20 | +def fill_na_description(data: pd.DataFrame): |
| 21 | + data["Description"] = data["Description"].fillna("") |
| 22 | + return data |
| 23 | + |
| 24 | + |
| 25 | +def get_desc_length(data: pd.DataFrame): |
| 26 | + data["desc_length"] = data.apply(lambda x: len(x)) |
| 27 | + return data |
| 28 | + |
| 29 | + |
| 30 | +def get_desc_words(data: pd.DataFrame): |
| 31 | + data["desc_words"] = data["Description"].apply(lambda x: len(x.split())) |
| 32 | + return data |
| 33 | + |
| 34 | + |
| 35 | +def get_average_word_length(data: pd.DataFrame): |
| 36 | + data["average_word_length"] = data["desc_length"] / data["desc_words"] |
| 37 | + return data |
24 | 38 |
|
25 | 39 |
|
26 | 40 | @task |
27 | | -def get_vectorizer(data: pd.DataFrame): |
28 | | - tokenizer = TweetTokenizer() |
29 | | - vectorizer = TfidfVectorizer(ngram_range=(1, 2), tokenizer=tokenizer.tokenize) |
30 | | - vectorizer.fit(data["Description"].fillna("").values) |
31 | | - return vectorizer |
| 41 | +def get_description_features(data: pd.DataFrame): |
| 42 | + return ( |
| 43 | + data.pipe(fill_na_description) |
| 44 | + .pipe(get_desc_length) |
| 45 | + .pipe(get_desc_words) |
| 46 | + .pipe(get_average_word_length) |
| 47 | + ) |
32 | 48 |
|
33 | 49 |
|
34 | 50 | @task |
35 | | -def encode_description(vectorizer: TfidfVectorizer, data: pd.DataFrame): |
36 | | - X_train = vectorizer.transform(data["Description"].fillna("")) |
37 | | - print(X_train) |
38 | | - print(type(X_train)) |
39 | | - return X_train |
| 51 | +def filter_cols(use_cols: list, data: pd.DataFrame): |
| 52 | + return data[use_cols] |
40 | 53 |
|
41 | 54 |
|
42 | 55 | @task |
43 | | -def get_adoption_speed(data: pd.DataFrame): |
44 | | - return data["AdoptionSpeed"] |
| 56 | +def encode_cat_cols(cat_cols: list, data: pd.DataFrame): |
| 57 | + cat_cols = list(cat_cols) |
| 58 | + data[cat_cols] = data[cat_cols].astype(str) |
| 59 | + for col in cat_cols: |
| 60 | + _, indexer = pd.factorize(data[col]) |
| 61 | + data[col] = indexer.get_indexer(data[col]) |
| 62 | + return data |
45 | 63 |
|
46 | 64 |
|
47 | 65 | @task |
48 | | -def get_classifier(data: pd.DataFrame, adoption_speed: pd.Series, n_estimators: int): |
49 | | - clf = RandomForestClassifier(n_estimators=n_estimators) |
50 | | - clf.fit(data, adoption_speed) |
| 66 | +def split_data(data: pd.DataFrame): |
| 67 | + X = data.drop(columns=["AdoptionSpeed"]) |
| 68 | + y = data["AdoptionSpeed"] |
| 69 | + X_train, X_test, y_train, y_test = train_test_split( |
| 70 | + X, |
| 71 | + y, |
| 72 | + test_size=0.3, |
| 73 | + random_state=0, |
| 74 | + ) |
| 75 | + return {"X_train": X_train, "X_test": X_test, "y_train": y_train, "y_test": y_test} |
51 | 76 |
|
52 | 77 |
|
53 | | -@flow |
54 | | -def get_description_features(config, all_data, data: dict): |
55 | | - vectorizer = get_vectorizer(all_data) |
56 | | - X_train = encode_description(vectorizer, data["train"]) |
57 | | - y_train = get_adoption_speed |
| 78 | +@task |
| 79 | +def save_data(data: dict, save_dir: str): |
| 80 | + for name, value in data.items(): |
| 81 | + save_path = abspath(f"{save_dir}/{name}") |
| 82 | + pickle.dump(value, open(save_path, "wb")) |
58 | 83 |
|
59 | 84 |
|
60 | 85 | @hydra.main(config_path="../config", config_name="process", version_base=None) |
61 | 86 | @flow |
62 | 87 | def process_data(config): |
63 | | - data = get_data(config.data.raw) |
64 | | - all_data = get_all_data(data) |
65 | | - get_description_features(config, all_data, data) |
| 88 | + data = get_data(config.data.raw.path) |
| 89 | + processed = get_description_features(data) |
| 90 | + filtered = filter_cols(config.use_cols, processed) |
| 91 | + encoded = encode_cat_cols(config.cat_cols, filtered) |
| 92 | + split = split_data(encoded) |
| 93 | + save_data(split, config.data.processed) |
66 | 94 |
|
67 | 95 |
|
68 | 96 | # ---------------------------------------------------------------------------- # |
|
0 commit comments