Skip to content
Closed
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
a79079b
fix bug in config dict
Apr 28, 2017
fa1acfb
Update base.py
rhiever Apr 28, 2017
5fab09a
Merge pull request #431 from weixuanfu2016/config_dict_patch
rhiever Apr 28, 2017
ed3bdf7
Update version for minor release
rhiever Apr 28, 2017
5e32488
use stopit replace Interruptable_cross_val_score
May 1, 2017
39cff19
Update requirements.txt
rhiever May 1, 2017
eafc240
fix bugs and clean bugs
May 1, 2017
e50814c
clean test codes
May 1, 2017
c10ba2e
add unit test
May 1, 2017
6bc031c
try backend="threading"
May 1, 2017
3019275
dask works in macOS
May 1, 2017
0b3680c
clean codes
May 1, 2017
91caa55
num_worker added
May 1, 2017
6b00655
use client
May 1, 2017
d9c1863
threading
May 1, 2017
4517abb
clean codes
May 1, 2017
94b4a37
clean codes
May 1, 2017
3ce128b
clean codes
May 1, 2017
02fd277
clean codes
May 1, 2017
af98b96
clean codes
May 1, 2017
9cafac7
return to joblib
May 2, 2017
5447fe0
key works
May 2, 2017
1f97655
fix issue in large dataset
May 2, 2017
23ca6d3
Merge remote-tracking branch 'upstream/development' into joblib_timeout
May 2, 2017
3ce4a30
add doc
May 2, 2017
6515732
clean codes
May 2, 2017
633e9e8
min to sec
May 2, 2017
ac77725
manual dump memmap
May 2, 2017
dd7df4e
clean codes
May 2, 2017
a6ff510
dask array tet
May 2, 2017
dcf640e
jobs test
May 2, 2017
4d87038
add warning for large dataset
May 2, 2017
ec96ecd
add doc and installnation
May 2, 2017
7978f7d
instal in test
May 2, 2017
24c030f
pip install dask
May 2, 2017
0382753
pip install dask[complete]
May 2, 2017
ac3a086
clean codes
May 3, 2017
39ac993
better get
May 4, 2017
224a9bc
clean codes
May 4, 2017
c20d911
fix conflict
May 12, 2017
a4956d4
warning when verbosity > 2
May 12, 2017
7cea3bf
fix this compatibility issue
May 16, 2017
454f54a
add unit test
May 16, 2017
dc40489
fix ci
May 16, 2017
1fc2860
Merge pull request #451 from weixuanfu2016/mdr_dict_master_fix
rhiever May 18, 2017
18927b0
Version increment for hot patch release
rhiever May 18, 2017
568f55d
fix bug for ploynomialfeatures
May 19, 2017
37c1529
add unit test
May 19, 2017
179fdf1
Merge pull request #455 from weixuanfu2016/issue454
rhiever May 19, 2017
7b1eb27
Minor version increment for release
rhiever May 19, 2017
fd2f1c3
Update tests.py
rhiever May 19, 2017
c3b2167
Merge branch 'development' into joblib_timeout
rhiever May 23, 2017
cccf676
fix conflicts
May 23, 2017
211eed9
Merge branch 'development' into joblib_timeout
May 23, 2017
00fc6ff
add patch in master
May 23, 2017
1e0a8c4
add patch in tpot 0.7.5
May 23, 2017
d8e1904
clean codes
May 23, 2017
af01d55
add some small unit tests for increasing coverage
May 23, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ci/.travis_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ fi

pip install update_checker
pip install tqdm
pip install toolz
pip install dask[complete]
pip install stopit

if [[ "$COVERAGE" == "true" ]]; then
pip install coverage coveralls
Expand All @@ -67,4 +70,6 @@ python -c "import deap; print('deap %s' % deap.__version__)"
python -c "import xgboost; print('xgboost %s ' % xgboost.__version__)"
python -c "import update_checker; print('update_checker %s' % update_checker.__version__)"
python -c "import tqdm; print('tqdm %s' % tqdm.__version__)"
python -c "import stopit; print('stopit %s' % stopit.__version__)"
python -c "import dask; print('dask %s' % dask.__version__)"
python setup.py build_ext --inplace
11 changes: 8 additions & 3 deletions docs_sources/installing.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,24 @@ TPOT is built on top of several existing Python libraries, including:

* [tqdm](https://github.com/tqdm/tqdm)

* [stopit](https://github.com/glenfant/stopit)

* [dask](https://github.com/dask/dask)


Most of the necessary Python packages can be installed via the [Anaconda Python distribution](https://www.continuum.io/downloads), which we strongly recommend that you use. We also strongly recommend that you use of Python 3 over Python 2 if you're given the choice.

NumPy, SciPy, and scikit-learn can be installed in Anaconda via the command:
NumPy, SciPy, scikit-learn and dask can be installed in Anaconda via the command:

```Shell
conda install numpy scipy scikit-learn
conda install dask -c conda-forge
```

DEAP, update_checker, and tqdm can be installed with `pip` via the command:
DEAP, update_checker, tqdm and stopit can be installed with `pip` via the command:

```Shell
pip install deap update_checker tqdm
pip install deap update_checker tqdm stopit
```

**For the Windows users**, the pywin32 module is required if Python is NOT installed via the [Anaconda Python distribution](https://www.continuum.io/downloads) and can be installed with `pip`:
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ scikit-learn==0.18.1
scipy==0.19.0
tqdm==4.11.2
update-checker==0.16
stopit==1.1.1
dask==0.14.1
30 changes: 29 additions & 1 deletion tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from tpot.driver import positive_integer, float_range
from tpot.export_utils import export_pipeline, generate_import_code, _indent, generate_pipeline_code, get_by_name
from tpot.gp_types import Output_Array
from tpot.gp_deap import mutNodeReplacement
from tpot.gp_deap import mutNodeReplacement, _wrapped_cross_val_score

from tpot.operator_utils import TPOTOperatorClassFactory, set_sample_weight
from tpot.config_classifier import classifier_config_dict
Expand Down Expand Up @@ -147,6 +147,34 @@ def test_init_max_time_mins():
assert tpot_obj.max_time_mins == 30


def test_timeout():
"""Assert that _wrapped_cross_val_score return Timeout in a time limit"""
tpot_obj = TPOTRegressor(scoring='neg_mean_squared_error')
# a complex pipeline for the test
pipeline_string = (
"ExtraTreesRegressor("
"GradientBoostingRegressor(input_matrix, GradientBoostingRegressor__alpha=0.8,"
"GradientBoostingRegressor__learning_rate=0.1,GradientBoostingRegressor__loss=huber,"
"GradientBoostingRegressor__max_depth=5, GradientBoostingRegressor__max_features=0.5,"
"GradientBoostingRegressor__min_samples_leaf=5, GradientBoostingRegressor__min_samples_split=5,"
"GradientBoostingRegressor__n_estimators=100, GradientBoostingRegressor__subsample=0.25),"
"ExtraTreesRegressor__bootstrap=True, ExtraTreesRegressor__max_features=0.5,"
"ExtraTreesRegressor__min_samples_leaf=5, ExtraTreesRegressor__min_samples_split=5, "
"ExtraTreesRegressor__n_estimators=100)"
)
tpot_obj._optimized_pipeline = creator.Individual.from_string(pipeline_string, tpot_obj._pset)
tpot_obj._fitted_pipeline = tpot_obj._toolbox.compile(expr=tpot_obj._optimized_pipeline)
# test _wrapped_cross_val_score with cv=20 so that it is impossible to finish in 1 second
return_value = _wrapped_cross_val_score(tpot_obj._fitted_pipeline,
training_features_r,
training_classes_r,
cv=20,
scoring_function='neg_mean_squared_error',
sample_weight=None,
timeout=1)
assert return_value == "Timeout"


def test_get_params():
"""Assert that get_params returns the exact dictionary of parameters used by TPOT."""
kwargs = {
Expand Down
80 changes: 41 additions & 39 deletions tpot/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from functools import partial
from datetime import datetime
from multiprocessing import cpu_count
from dask import compute, delayed, multiprocessing

import numpy as np
import deap
Expand All @@ -35,12 +36,11 @@
from copy import copy

from sklearn.base import BaseEstimator
from sklearn.externals.joblib import Parallel, delayed
from sklearn.pipeline import make_pipeline, make_union
from sklearn.preprocessing import FunctionTransformer
from sklearn.ensemble import VotingClassifier
from sklearn.metrics.scorer import make_scorer
from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier
from sklearn.utils import check_X_y

from update_checker import update_check

Expand Down Expand Up @@ -191,6 +191,7 @@ def __init__(self, generations=100, population_size=100, offspring_size=None,
self.generations = generations
self.max_time_mins = max_time_mins
self.max_eval_time_mins = max_eval_time_mins
self.max_eval_time_seconds = max(int(self.max_eval_time_mins * 60), 1)

# Set offspring_size equal to population_size by default
if offspring_size:
Expand Down Expand Up @@ -260,17 +261,7 @@ def __init__(self, generations=100, population_size=100, offspring_size=None,
self.scoring_function = scoring

self.cv = cv
# If the OS is windows, reset cpu number to 1 since the OS did not have multiprocessing module
if sys.platform.startswith('win') and n_jobs != 1:
print(
'Warning: Although parallelization is currently supported in '
'TPOT for Windows, pressing Ctrl+C will freeze the optimization '
'process without saving the best pipeline! Thus, Please DO NOT '
'press Ctrl+C during the optimization procss if n_jobs is not '
'equal to 1. For quick test in Windows, please set n_jobs to 1 '
'for saving the best pipeline in the middle of the optimization '
'process via Ctrl+C.'
)

if n_jobs == -1:
self.n_jobs = cpu_count()
else:
Expand Down Expand Up @@ -396,22 +387,25 @@ def fit(self, features, classes, sample_weight=None):
None

"""
features = features.astype(np.float64)

# Check that the input data is formatted correctly for scikit-learn
if self.classification:
clf = DecisionTreeClassifier(max_depth=5)
else:
clf = DecisionTreeRegressor(max_depth=5)

# Check that the input data is formatted correctly for scikit-learn and convert classes to np.float64
try:
clf = clf.fit(features, classes)
features, classes = check_X_y(features, classes)
except Exception:
raise ValueError('Error: Input data is not in a valid format. '
'Please confirm that the input data is scikit-learn compatible. '
'For example, the features must be a 2-D array and target labels '
'must be a 1-D array.')

if (features.shape[0] > 10000 or features.shape[1] > 100) and self.n_jobs !=1:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warnings should only print if verbosity>2.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I fixed it.

print(
'Warning: Although parallelization is currently supported in TPOT, '
'a known freezing issue in joblib has been reported with large dataset.'
'Parallelization with large dataset may freeze or crash the optimization '
'process without time controls by max_eval_time_mins! Please set n_jobs to 1 '
'if freezing or crash happened. However, scikit-learn also use joblib in '
'multiple estimators so that freezing may also happen with n_jobs=1'
)

# Set the seed for the GP run
if self.random_state is not None:
random.seed(self.random_state) # deap uses random
Expand Down Expand Up @@ -771,24 +765,32 @@ def _evaluate_individuals(self, individuals, features, classes, sample_weight=No

# evalurate pipeline
resulting_score_list = []
# chunk size for pbar update
for chunk_idx in range(0, len(sklearn_pipeline_list), self.n_jobs * 4):
jobs = []
for sklearn_pipeline in sklearn_pipeline_list[chunk_idx:chunk_idx + self.n_jobs * 4]:
job = delayed(_wrapped_cross_val_score)(
sklearn_pipeline,
features,
classes,
self.cv,
self.scoring_function,
sample_weight,
self.max_eval_time_mins
)
jobs.append(job)
parallel = Parallel(n_jobs=self.n_jobs, verbose=0, pre_dispatch='2*n_jobs')
tmp_result_score = parallel(jobs)
# update pbar
for val in tmp_result_score:
if self.n_jobs == 1:
tmp_scores = [_wrapped_cross_val_score(
sklearn_pipeline,
features,
classes,
self.cv,
self.scoring_function,
sample_weight,
timeout=self.max_eval_time_seconds
) for sklearn_pipeline in sklearn_pipeline_list[chunk_idx:chunk_idx + self.n_jobs * 4]]
else:
jobs = []
for sklearn_pipeline in sklearn_pipeline_list[chunk_idx:chunk_idx + self.n_jobs * 4]:
job = delayed(_wrapped_cross_val_score)(
sklearn_pipeline,
features,
classes,
self.cv,
self.scoring_function,
sample_weight,
timeout=self.max_eval_time_seconds
)
jobs.append(job)
tmp_scores = compute(*jobs, get=multiprocessing.get, num_workers=self.n_jobs)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be some benefit to checking in with Dask's globals before hard-wiring dask.multiprocessing.get. You might try the following instead:

import dask
if njobs == 1:
    get = dask.async.get_sync
elif 'get' in dask.context._globals:
    get = dask.contest._globals['get']
else:
    get = multiprocessing.get

compute(..., get=get)

This would allow users to step in with other schedulers, like the distributed scheduler. Also if you are only using a single core then it's best to avoid the multiprocessing scheduler, which has some non-trivial data movement overhead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for you suggestions.

for val in tmp_scores:
if not self._pbar.disable:
self._pbar.update(1)
if val == 'Timeout':
Expand Down
71 changes: 21 additions & 50 deletions tpot/gp_deap.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from sklearn.base import clone
from collections import defaultdict
import warnings
import threading
from stopit import threading_timeoutable, TimeoutException

# Limit loops to generate a different individual by crossover/mutation
MAX_MUT_LOOPS = 50
Expand Down Expand Up @@ -321,54 +321,25 @@ def mutNodeReplacement(individual, pset):
individual[slice_] = new_subtree
return individual,


class Interruptable_cross_val_score(threading.Thread):
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self)
self.args = args
self.kwargs = kwargs
self.result = -float('inf')
self._stopevent = threading.Event()
self.daemon = True

def stop(self):
self._stopevent.set()
threading.Thread.join(self)

def run(self):
# Note: changed name of the thread to "MainThread" to avoid such warning from joblib (maybe bugs)
# Note: Need attention if using parallel execution model of scikit-learn
threading.current_thread().name = 'MainThread'
try:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
self.result = cross_val_score(*self.args, **self.kwargs)
except Exception as e:
pass


@threading_timeoutable(default= "Timeout")
def _wrapped_cross_val_score(sklearn_pipeline, features, classes,
cv, scoring_function, sample_weight, max_eval_time_mins):
max_time_seconds = max(int(max_eval_time_mins * 60), 1)
cv, scoring_function, sample_weight):
sample_weight_dict = set_sample_weight(sklearn_pipeline.steps, sample_weight)
# build a job for cross_val_score
tmp_it = Interruptable_cross_val_score(
clone(sklearn_pipeline),
features,
classes,
scoring=scoring_function,
cv=cv,
n_jobs=1,
verbose=0,
fit_params=sample_weight_dict
)
tmp_it.start()
tmp_it.join(max_time_seconds)

if tmp_it.isAlive():
resulting_score = 'Timeout'
else:
resulting_score = np.mean(tmp_it.result)

tmp_it.stop()
return resulting_score
try:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
CV_score = cross_val_score(
clone(sklearn_pipeline),
features,
classes,
scoring=scoring_function,
cv=cv,
n_jobs=1,
verbose=0,
fit_params=sample_weight_dict
)
return np.mean(CV_score)
except TimeoutException:
return "Timeout"
except Exception:
return -float('inf')