Skip to content

Commit ae70684

Browse files
add stackoverflow
1 parent 48eaf9a commit ae70684

File tree

11 files changed

+1482
-157
lines changed

11 files changed

+1482
-157
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from prefect import task, Flow, Parameter
2+
from prefect.engine.results import LocalResult
3+
4+
from typing import Any, Dict, List
5+
6+
import pandas as pd
7+
from sklearn.model_selection import train_test_split
8+
9+
# ---------------------------------------------------------------------------- #
10+
# Create tasks #
11+
# ---------------------------------------------------------------------------- #
12+
@task
13+
def load_data(path: str) -> pd.DataFrame:
14+
return pd.read_csv(path)
15+
16+
17+
@task(target="{date:%a_%b_%d_%Y_%H-%M-%S}/{task_name}_output", result = LocalResult(dir='data/processed'))
18+
def get_classes(data: pd.DataFrame, target_col: str) -> List[str]:
19+
"""Task for getting the classes from the Iris data set."""
20+
return sorted(data[target_col].unique())
21+
22+
23+
@task
24+
def encode_categorical_columns(data: pd.DataFrame, target_col: str) -> pd.DataFrame:
25+
"""Task for encoding the categorical columns in the Iris data set."""
26+
27+
return pd.get_dummies(data, columns=[target_col], prefix="", prefix_sep="")
28+
29+
30+
@task(log_stdout=True, target="{date:%a_%b_%d_%Y_%H-%M-%S}/{task_name}_output", result = LocalResult(dir='data/processed'))
31+
def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
32+
"""Task for splitting the classical Iris data set into training and test
33+
sets, each split into features and labels.
34+
"""
35+
36+
print(f"Splitting data into training and test sets with ratio {test_data_ratio}")
37+
38+
X, y = data.drop(columns=classes), data[classes]
39+
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_data_ratio)
40+
41+
# When returning many variables, it is a good practice to give them names:
42+
return dict(
43+
train_x=X_train,
44+
train_y=y_train,
45+
test_x=X_test,
46+
test_y=y_test,
47+
)
48+
49+
50+
# ---------------------------------------------------------------------------- #
51+
# Create a flow #
52+
# ---------------------------------------------------------------------------- #
53+
54+
with Flow("data-engineer") as flow:
55+
56+
# Define parameters
57+
target_col = 'species'
58+
test_data_ratio = Parameter("test_data_ratio", default=0.2)
59+
60+
# Define tasks
61+
data = load_data(path="data/raw/iris.csv")
62+
classes = get_classes(data=data, target_col=target_col)
63+
categorical_columns = encode_categorical_columns(data=data, target_col=target_col)
64+
train_test_dict = split_data(data=categorical_columns, test_data_ratio=test_data_ratio, classes=classes)
65+
66+
# flow.visualize()
67+
flow.run()
68+
# flow.register(project_name="Iris Project")
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
from prefect import task, Flow, Parameter
2+
from prefect.engine.results import LocalResult
3+
4+
import numpy as np
5+
import pandas as pd
6+
7+
import mlfoundry as mlf
8+
9+
@task
10+
def setup_mlf():
11+
mlf_api = mlf.set_tracking_uri()
12+
return mlf_api.create_run(project_name="Iris-project")
13+
14+
15+
# ---------------------------------------------------------------------------- #
16+
# Create tasks #
17+
# ---------------------------------------------------------------------------- #
18+
@task(log_stdout=True)
19+
def train_model(
20+
train_x: pd.DataFrame, train_y: pd.DataFrame, num_train_iter: int, learning_rate: float) -> np.ndarray:
21+
"""Task for training a simple multi-class logistic regression model. The
22+
number of training iterations as well as the learning rate are taken from
23+
conf/project/parameters.yml. All of the data as well as the parameters
24+
will be provided to this function at the time of execution.
25+
"""
26+
num_iter = num_train_iter
27+
lr = learning_rate
28+
X = train_x.to_numpy()
29+
Y = train_y.to_numpy()
30+
31+
# Add bias to the features
32+
bias = np.ones((X.shape[0], 1))
33+
X = np.concatenate((bias, X), axis=1)
34+
35+
weights = []
36+
# Train one model for each class in Y
37+
for k in range(Y.shape[1]):
38+
# Initialise weights
39+
theta = np.zeros(X.shape[1])
40+
y = Y[:, k]
41+
for _ in range(num_iter):
42+
z = np.dot(X, theta)
43+
h = _sigmoid(z)
44+
gradient = np.dot(X.T, (h - y)) / y.size
45+
theta -= lr * gradient
46+
# Save the weights for each model
47+
weights.append(theta)
48+
49+
# Print finishing training message
50+
print("Finish training the model.")
51+
52+
# Return a joint multi-class model with weights for all classes
53+
return np.vstack(weights).transpose()
54+
55+
56+
def _sigmoid(z):
57+
"""A helper sigmoid function used by the training and the scoring tasks."""
58+
return 1 / (1 + np.exp(-z))
59+
60+
@task
61+
def predict(model: np.ndarray, test_x: pd.DataFrame) -> np.ndarray:
62+
"""Task for making predictions given a pre-trained model and a test set."""
63+
X = test_x.to_numpy()
64+
65+
# Add bias to the features
66+
bias = np.ones((X.shape[0], 1))
67+
X = np.concatenate((bias, X), axis=1)
68+
69+
# Predict "probabilities" for each class
70+
result = _sigmoid(np.dot(X, model))
71+
72+
# Return the index of the class with max probability for all samples
73+
return np.argmax(result, axis=1)
74+
75+
76+
@task(log_stdout=True)
77+
def report_accuracy(predictions: np.ndarray, test_y: pd.DataFrame) -> None:
78+
"""Task for reporting the accuracy of the predictions performed by the
79+
previous task. Notice that this function has no outputs, except logging.
80+
"""
81+
# Get true class index
82+
target = np.argmax(test_y.to_numpy(), axis=1)
83+
# Calculate accuracy of predictions
84+
accuracy = np.sum(predictions == target) / target.shape[0]
85+
# Log the accuracy of the model
86+
print(f"Model accuracy on test set: {round(accuracy * 100, 2)}")
87+
88+
89+
# ---------------------------------------------------------------------------- #
90+
# Create a flow #
91+
# ---------------------------------------------------------------------------- #
92+
93+
with Flow("data-science") as flow:
94+
95+
96+
train_test_dict = LocalResult(dir='data/processed/Mon_Dec_20_2021_20:55:20').read(location='split_data_output').value
97+
98+
# Load data
99+
train_x = train_test_dict['train_x']
100+
train_y = train_test_dict['train_y']
101+
test_x = train_test_dict['test_x']
102+
test_y = train_test_dict['test_y']
103+
104+
# Define parameters
105+
num_train_iter = Parameter('num_train_iter', default=10000)
106+
learning_rate = Parameter('learning_rate', default = 0.01)
107+
108+
# Define tasks
109+
model = train_model(train_x, train_y, num_train_iter, learning_rate)
110+
predictions = predict(model, test_x)
111+
report_accuracy(predictions, test_y)
112+
113+
114+
flow.run()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from prefect import Flow
2+
from prefect.tasks.prefect import StartFlowRun
3+
4+
data_engineering_flow = StartFlowRun(
5+
flow_name="data-engineer", project_name='Iris Project', wait=True, parameters={'test_data_ratio': 0.3})
6+
data_science_flow = StartFlowRun(
7+
flow_name="data-science", project_name='Iris Project', wait=True)
8+
9+
with Flow("main-flow") as flow:
10+
result = data_science_flow(upstream_tasks=[data_engineering_flow])
11+
12+
flow.run()
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from prefect import Flow
2+
from prefect.tasks.prefect import StartFlowRun
3+
from datetime import timedelta, datetime
4+
from prefect.schedules import IntervalSchedule
5+
6+
schedule = IntervalSchedule(
7+
start_date=datetime.utcnow() + timedelta(seconds=1),
8+
interval=timedelta(minutes=1),
9+
)
10+
11+
data_engineering_flow = StartFlowRun(flow_name="data-engineer", project_name='Iris Project')
12+
data_science_flow = StartFlowRun(flow_name="data-science", project_name='Iris Project')
13+
14+
with Flow("main-flow", schedule=schedule) as flow:
15+
data_science = data_science_flow(upstream_tasks=[data_engineering_flow])
16+
17+
# flow.register(project_name="Iris Project")
18+
flow.run()
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
artifact_location: file:///home/khuyen/Data-science/data_science_tools/mlfoundry_example/mlf/mlruns/0
2+
experiment_id: '0'
3+
lifecycle_stage: active
4+
name: Default
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
artifact_uri: file:///home/khuyen/Data-science/data_science_tools/mlfoundry_example/mlf/mlruns/1/70fac48397b2487bb1c3418812b1ef5e/artifacts
2+
end_time: null
3+
entry_point_name: ''
4+
experiment_id: '1'
5+
lifecycle_stage: active
6+
name: ''
7+
run_id: 70fac48397b2487bb1c3418812b1ef5e
8+
run_uuid: 70fac48397b2487bb1c3418812b1ef5e
9+
source_name: ''
10+
source_type: 4
11+
source_version: ''
12+
start_time: 1641322638766
13+
status: 1
14+
tags: []
15+
user_id: unknown
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
run_2022-01-04_18:57:18_utc_1
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
artifact_location: file:///home/khuyen/Data-science/data_science_tools/mlfoundry_example/mlf/mlruns/1
2+
experiment_id: '1'
3+
lifecycle_stage: active
4+
name: test-project

statistics/stackoverflow_survey/analyze_salary.ipynb

Lines changed: 622 additions & 157 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)