diff --git a/ci/.travis_install.sh b/ci/.travis_install.sh index 267d8e601..88c0d5073 100755 --- a/ci/.travis_install.sh +++ b/ci/.travis_install.sh @@ -53,6 +53,11 @@ fi pip install update_checker pip install tqdm +pip install toolz +pip install dask[complete] +pip install stopit +pip install scikit-mdr +pip install skrebate if [[ "$COVERAGE" == "true" ]]; then pip install coverage coveralls @@ -67,4 +72,6 @@ python -c "import deap; print('deap %s' % deap.__version__)" python -c "import xgboost; print('xgboost %s ' % xgboost.__version__)" python -c "import update_checker; print('update_checker %s' % update_checker.__version__)" python -c "import tqdm; print('tqdm %s' % tqdm.__version__)" +python -c "import stopit; print('stopit %s' % stopit.__version__)" +python -c "import dask; print('dask %s' % dask.__version__)" python setup.py build_ext --inplace diff --git a/docs_sources/installing.md b/docs_sources/installing.md index ab58d3a8d..d5bcf092e 100644 --- a/docs_sources/installing.md +++ b/docs_sources/installing.md @@ -12,19 +12,24 @@ TPOT is built on top of several existing Python libraries, including: * [tqdm](https://github.com/tqdm/tqdm) +* [stopit](https://github.com/glenfant/stopit) + +* [dask](https://github.com/dask/dask) + Most of the necessary Python packages can be installed via the [Anaconda Python distribution](https://www.continuum.io/downloads), which we strongly recommend that you use. We also strongly recommend that you use of Python 3 over Python 2 if you're given the choice. -NumPy, SciPy, and scikit-learn can be installed in Anaconda via the command: +NumPy, SciPy, scikit-learn and dask can be installed in Anaconda via the command: ```Shell conda install numpy scipy scikit-learn +conda install dask -c conda-forge ``` -DEAP, update_checker, and tqdm can be installed with `pip` via the command: +DEAP, update_checker, tqdm and stopit can be installed with `pip` via the command: ```Shell -pip install deap update_checker tqdm +pip install deap update_checker tqdm stopit ``` **For the Windows users**, the pywin32 module is required if Python is NOT installed via the [Anaconda Python distribution](https://www.continuum.io/downloads) and can be installed with `pip`: diff --git a/requirements.txt b/requirements.txt index da0b5ab1b..13972fabc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,5 @@ scikit-learn==0.18.1 scipy==0.19.0 tqdm==4.11.2 update-checker==0.16 +stopit==1.1.1 +dask==0.14.1 diff --git a/tests.py b/tests.py index b00e2d519..655ce524c 100644 --- a/tests.py +++ b/tests.py @@ -25,7 +25,7 @@ from tpot.driver import positive_integer, float_range, _get_arg_parser, _print_args, main, _read_data_file from tpot.export_utils import export_pipeline, generate_import_code, _indent, generate_pipeline_code, get_by_name from tpot.gp_types import Output_Array -from tpot.gp_deap import mutNodeReplacement +from tpot.gp_deap import mutNodeReplacement, _wrapped_cross_val_score from tpot.metrics import balanced_accuracy from tpot.operator_utils import TPOTOperatorClassFactory, set_sample_weight @@ -40,8 +40,9 @@ import random import subprocess import sys +from multiprocessing import cpu_count -from sklearn.datasets import load_digits, load_boston +from sklearn.datasets import load_digits, load_boston, make_classification from sklearn.model_selection import train_test_split, cross_val_score from sklearn.linear_model import LogisticRegression, Lasso from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor @@ -247,6 +248,13 @@ def test_init_default_scoring(): assert tpot_obj.scoring_function == 'accuracy' +def test_init_default_scoring_2(): + """Assert that TPOT intitializes with the correct customized scoring function.""" + + tpot_obj = TPOTClassifier(scoring=balanced_accuracy) + assert tpot_obj.scoring_function == 'balanced_accuracy' + + def test_invaild_score_warning(): """Assert that the TPOT intitializes raises a ValueError when the scoring metrics is not available in SCORERS.""" # Mis-spelled scorer @@ -277,6 +285,14 @@ def test_invaild_subsample_ratio_warning(): TPOTClassifier(subsample=0.1) +def test_invaild_mut_rate_plus_xo_rate(): + """Assert that the TPOT intitializes raises a ValueError when the sum of crossover and mutation probabilities is large than 1.""" + # Invalid ratio + assert_raises(ValueError, TPOTClassifier, mutation_rate=0.8, crossover_rate=0.8) + # Valid ratio + TPOTClassifier(mutation_rate=0.8, crossover_rate=0.1) + + def test_init_max_time_mins(): """Assert that the TPOT init stores max run time and sets generations to 1000000.""" tpot_obj = TPOTClassifier(max_time_mins=30, generations=1000) @@ -285,6 +301,43 @@ def test_init_max_time_mins(): assert tpot_obj.max_time_mins == 30 +def test_init_n_jobs(): + """Assert that the TPOT init stores current number of processes""" + tpot_obj = TPOTClassifier(n_jobs=2) + assert tpot_obj.n_jobs == 2 + + tpot_obj = TPOTClassifier(n_jobs=-1) + assert tpot_obj.n_jobs == cpu_count() + + +def test_timeout(): + """Assert that _wrapped_cross_val_score return Timeout in a time limit""" + tpot_obj = TPOTRegressor(scoring='neg_mean_squared_error') + # a complex pipeline for the test + pipeline_string = ( + "ExtraTreesRegressor(" + "GradientBoostingRegressor(input_matrix, GradientBoostingRegressor__alpha=0.8," + "GradientBoostingRegressor__learning_rate=0.1,GradientBoostingRegressor__loss=huber," + "GradientBoostingRegressor__max_depth=5, GradientBoostingRegressor__max_features=0.5," + "GradientBoostingRegressor__min_samples_leaf=5, GradientBoostingRegressor__min_samples_split=5," + "GradientBoostingRegressor__n_estimators=100, GradientBoostingRegressor__subsample=0.25)," + "ExtraTreesRegressor__bootstrap=True, ExtraTreesRegressor__max_features=0.5," + "ExtraTreesRegressor__min_samples_leaf=5, ExtraTreesRegressor__min_samples_split=5, " + "ExtraTreesRegressor__n_estimators=100)" + ) + tpot_obj._optimized_pipeline = creator.Individual.from_string(pipeline_string, tpot_obj._pset) + tpot_obj._fitted_pipeline = tpot_obj._toolbox.compile(expr=tpot_obj._optimized_pipeline) + # test _wrapped_cross_val_score with cv=20 so that it is impossible to finish in 1 second + return_value = _wrapped_cross_val_score(tpot_obj._fitted_pipeline, + training_features_r, + training_classes_r, + cv=20, + scoring_function='neg_mean_squared_error', + sample_weight=None, + timeout=1) + assert return_value == "Timeout" + + def test_balanced_accuracy(): """Assert that the balanced_accuracy in TPOT returns correct accuracy.""" y_true = np.array([1,1,1,1,1,2,2,2,2,2,2,2,3,3,3,3,3,4,4,4]) @@ -331,6 +384,11 @@ def test_set_params_2(): assert tpot_obj.generations == 3 +def test_TPOTBase(): + """Assert that TPOTBase class raises RuntimeError when using it directly.""" + assert_raises(RuntimeError, TPOTBase) + + def test_conf_dict(): """Assert that TPOT uses the pre-configured dictionary of operators when config_dict is 'TPOT light' or 'TPOT MDR'.""" tpot_obj = TPOTClassifier(config_dict='TPOT light') @@ -468,6 +526,7 @@ def test_score_3(): assert np.allclose(known_score, score) + def test_sample_weight_func(): """Assert that the TPOTRegressor score function outputs a known score for a fixed pipeline with sample weights.""" tpot_obj = TPOTRegressor(scoring='neg_mean_squared_error') @@ -585,6 +644,7 @@ def test_predict_proba2(): float_range(result[i][j]) + def test_warm_start(): """Assert that the TPOT warm_start flag stores the pop and pareto_front from the first run.""" tpot_obj = TPOTClassifier(random_state=42, population_size=1, offspring_size=2, generations=1, verbosity=0, warm_start=True) @@ -747,8 +807,18 @@ def test_imputer3(): assert_not_equal(imputed_features[0][0], float('nan')) -def test_tpot_operator_factory_class(): - """Assert that the TPOT operators class factory.""" +def test_fit3(): + """Assert that the TPOT fit function provides an optimized pipeline when config_dict is \'TPOT MDR\'""" + X, y = make_classification(n_samples=50, n_features=10, random_state=42, n_classes=2) # binary classification problem + tpot_obj = TPOTClassifier(random_state=42, population_size=1, offspring_size=2, generations=1, verbosity=0, config_dict='TPOT MDR') + tpot_obj.fit(X, y) + + assert isinstance(tpot_obj._optimized_pipeline, creator.Individual) + assert not (tpot_obj._start_datetime is None) + + +def testTPOTOperatorClassFactory(): + """Assert that the TPOT operators class factory""" test_config_dict = { 'sklearn.svm.LinearSVC': { 'penalty': ["l1", "l2"], @@ -870,6 +940,33 @@ def test_generate_import_code(): assert expected_code == generate_import_code(pipeline, tpot_obj.operators) +def test_PolynomialFeatures_exception(): + """Assert that TPOT allows only one PolynomialFeatures operator in a pipeline""" + tpot_obj = TPOTClassifier() + tpot_obj._pbar = tqdm(total=1, disable=True) + # pipeline with one PolynomialFeatures operator + pipeline_string_1 = ('LogisticRegression(PolynomialFeatures' + '(input_matrix, PolynomialFeatures__degree=2, PolynomialFeatures__include_bias=DEFAULT, ' + 'PolynomialFeatures__interaction_only=False), LogisticRegression__C=10.0, ' + 'LogisticRegression__dual=DEFAULT, LogisticRegression__penalty=DEFAULT)') + + # pipeline with two PolynomialFeatures operator + pipeline_string_2 = ('LogisticRegression(PolynomialFeatures' + '(PolynomialFeatures(input_matrix, PolynomialFeatures__degree=2, ' + 'PolynomialFeatures__include_bias=DEFAULT, PolynomialFeatures__interaction_only=False), ' + 'PolynomialFeatures__degree=2, PolynomialFeatures__include_bias=DEFAULT, ' + 'PolynomialFeatures__interaction_only=False), LogisticRegression__C=10.0, ' + 'LogisticRegression__dual=DEFAULT, LogisticRegression__penalty=DEFAULT)') + + # make a list for _evaluate_individuals + pipelines = [] + pipelines.append(creator.Individual.from_string(pipeline_string_1, tpot_obj._pset)) + pipelines.append(creator.Individual.from_string(pipeline_string_2, tpot_obj._pset)) + fitness_scores = tpot_obj._evaluate_individuals(pipelines, training_features, training_classes) + known_scores = [(2, 0.98068077235290885), (5000.0, -float('inf'))] + assert np.allclose(known_scores, fitness_scores) + + def test_mutNodeReplacement(): """Assert that mutNodeReplacement() returns the correct type of mutation node in a fixed pipeline.""" tpot_obj = TPOTClassifier() diff --git a/tpot/_version.py b/tpot/_version.py index 69ad68df0..c302f0390 100644 --- a/tpot/_version.py +++ b/tpot/_version.py @@ -19,4 +19,4 @@ """ -__version__ = '0.7.3' +__version__ = '0.7.5' diff --git a/tpot/base.py b/tpot/base.py index a0ba34e7c..8b83979e3 100644 --- a/tpot/base.py +++ b/tpot/base.py @@ -28,6 +28,7 @@ from functools import partial from datetime import datetime from multiprocessing import cpu_count +from dask import compute, delayed, multiprocessing, async, context import numpy as np import deap @@ -37,7 +38,6 @@ from sklearn.base import BaseEstimator from sklearn.utils import check_X_y -from sklearn.externals.joblib import Parallel, delayed from sklearn.pipeline import make_pipeline, make_union from sklearn.preprocessing import FunctionTransformer, Imputer from sklearn.model_selection import train_test_split @@ -149,7 +149,7 @@ def __init__(self, generations=100, population_size=100, offspring_size=None, Random number generator seed for TPOT. Use this to make sure that TPOT will give you the same results each time you run it against the same data set with that seed. - config_dict: a Python dictionary or string (default: None) + config_dict: Python dictionary or string (default: None) Python dictionary: A dictionary customizing the operators and parameters that TPOT uses in the optimization process. @@ -197,6 +197,7 @@ def __init__(self, generations=100, population_size=100, offspring_size=None, self.generations = generations self.max_time_mins = max_time_mins self.max_eval_time_mins = max_eval_time_mins + self.max_eval_time_seconds = max(int(self.max_eval_time_mins * 60), 1) # Set offspring_size equal to population_size by default if offspring_size: @@ -282,6 +283,7 @@ def __init__(self, generations=100, population_size=100, offspring_size=None, 'for saving the best pipeline in the middle of the optimization ' 'process via Ctrl+C.' ) + if n_jobs == -1: self.n_jobs = cpu_count() else: @@ -429,16 +431,27 @@ def fit(self, features, classes, sample_weight=None): self._check_dataset(features, classes) - # Randomly collect a subsample of training samples for pipeline optimization process. - if self.subsample < 1.0: - features, _, classes, _ = train_test_split(features, classes, train_size=self.subsample, random_state=self.random_state) - # Raise a warning message if the training size is less than 1500 when subsample is not default value - if features.shape[0] < 1500: + if self.verbosity > 2: + # Randomly collect a subsample of training samples for pipeline optimization process. + if self.subsample < 1.0: + features, _, classes, _ = train_test_split(features, classes, train_size=self.subsample, random_state=self.random_state) + # Raise a warning message if the training size is less than 1500 when subsample is not default value + if features.shape[0] < 1500: + print( + 'Warning: Although subsample can accelerate pipeline optimization process, ' + 'too small training sample size may cause unpredictable effect on maximizing ' + 'score in pipeline optimization process. Increasing subsample ratio may get ' + 'a more reasonable outcome from optimization process in TPOT.' + ) + + if (features.shape[0] > 10000 or features.shape[1] > 100) and self.n_jobs !=1: print( - 'Warning: Although subsample can accelerate pipeline optimization process, ' - 'too small training sample size may cause unpredictable effect on maximizing ' - 'score in pipeline optimization process. Increasing subsample ratio may get ' - 'a more reasonable outcome from optimization process in TPOT.' + 'Warning: Although parallelization is currently supported in TPOT, ' + 'a known freezing issue in joblib has been reported with large dataset.' + 'Parallelization with large dataset may freeze or crash the optimization ' + 'process without time controls by max_eval_time_mins! Please set n_jobs to 1 ' + 'if freezing or crash happened. However, scikit-learn also use joblib in ' + 'multiple estimators so that freezing may also happen with n_jobs=1' ) # Set the seed for the GP run @@ -822,13 +835,13 @@ def _evaluate_individuals(self, individuals, features, classes, sample_weight=No # This is a fairly hacky way to prevent TPOT from getting stuck on bad pipelines and should be improved in a future release individual = individuals[indidx] individual_str = str(individual) - if individual_str.count('PolynomialFeatures') > 1: + sklearn_pipeline_str = generate_pipeline_code(expr_to_tree(individual, self._pset), self.operators) + if sklearn_pipeline_str.count('PolynomialFeatures') > 1: if self.verbosity > 2: self._pbar.write('Invalid pipeline encountered. Skipping its evaluation.') fitnesses_dict[indidx] = (5000., -float('inf')) if not self._pbar.disable: self._pbar.update(1) - # Check if the individual was evaluated before elif individual_str in self._evaluated_individuals: # Get fitness score from previous evaluation @@ -843,6 +856,7 @@ def _evaluate_individuals(self, individuals, features, classes, sample_weight=No # Transform the tree expression into an sklearn pipeline sklearn_pipeline = self._toolbox.compile(expr=individual) + # Fix random state when the operator allows and build sample weight dictionary self._set_param_recursive(sklearn_pipeline.steps, 'random_state', 42) @@ -861,24 +875,27 @@ def _evaluate_individuals(self, individuals, features, classes, sample_weight=No # evalurate pipeline resulting_score_list = [] - # chunk size for pbar update for chunk_idx in range(0, len(sklearn_pipeline_list), self.n_jobs * 4): + if self.n_jobs == 1: + get = async.get_sync + elif 'get' in context._globals: + get = context._globals['get'] + else: + get = multiprocessing.get jobs = [] for sklearn_pipeline in sklearn_pipeline_list[chunk_idx:chunk_idx + self.n_jobs * 4]: - job = delayed(_wrapped_cross_val_score)( + job = _wrapped_cross_val_score( sklearn_pipeline, features, classes, self.cv, self.scoring_function, sample_weight, - self.max_eval_time_mins + timeout=self.max_eval_time_seconds ) jobs.append(job) - parallel = Parallel(n_jobs=self.n_jobs, verbose=0, pre_dispatch='2*n_jobs') - tmp_result_score = parallel(jobs) - # update pbar - for val in tmp_result_score: + tmp_scores = compute(*jobs, get=get, num_workers=self.n_jobs) + for val in tmp_scores: if not self._pbar.disable: self._pbar.update(1) if val == 'Timeout': @@ -890,6 +907,7 @@ def _evaluate_individuals(self, individuals, features, classes, sample_weight=No resulting_score_list.append(val) for resulting_score, operator_count, individual_str, test_idx in zip(resulting_score_list, operator_count_list, eval_individuals_str, test_idx_list): + if type(resulting_score) in [float, np.float64, np.float32]: self._evaluated_individuals[individual_str] = (max(1, operator_count), resulting_score) fitnesses_dict[test_idx] = self._evaluated_individuals[individual_str] diff --git a/tpot/gp_deap.py b/tpot/gp_deap.py index ef3ac4385..6e04269be 100644 --- a/tpot/gp_deap.py +++ b/tpot/gp_deap.py @@ -32,7 +32,7 @@ from sklearn.base import clone from collections import defaultdict import warnings -import threading +from stopit import threading_timeoutable, TimeoutException # Limit loops to generate a different individual by crossover/mutation MAX_MUT_LOOPS = 50 @@ -321,54 +321,25 @@ def mutNodeReplacement(individual, pset): individual[slice_] = new_subtree return individual, - -class Interruptable_cross_val_score(threading.Thread): - def __init__(self, *args, **kwargs): - threading.Thread.__init__(self) - self.args = args - self.kwargs = kwargs - self.result = -float('inf') - self._stopevent = threading.Event() - self.daemon = True - - def stop(self): - self._stopevent.set() - threading.Thread.join(self) - - def run(self): - # Note: changed name of the thread to "MainThread" to avoid such warning from joblib (maybe bugs) - # Note: Need attention if using parallel execution model of scikit-learn - threading.current_thread().name = 'MainThread' - try: - with warnings.catch_warnings(): - warnings.simplefilter('ignore') - self.result = cross_val_score(*self.args, **self.kwargs) - except Exception as e: - pass - - +@threading_timeoutable(default= "Timeout") def _wrapped_cross_val_score(sklearn_pipeline, features, classes, - cv, scoring_function, sample_weight, max_eval_time_mins): - max_time_seconds = max(int(max_eval_time_mins * 60), 1) + cv, scoring_function, sample_weight): sample_weight_dict = set_sample_weight(sklearn_pipeline.steps, sample_weight) - # build a job for cross_val_score - tmp_it = Interruptable_cross_val_score( - clone(sklearn_pipeline), - features, - classes, - scoring=scoring_function, - cv=cv, - n_jobs=1, - verbose=0, - fit_params=sample_weight_dict - ) - tmp_it.start() - tmp_it.join(max_time_seconds) - - if tmp_it.isAlive(): - resulting_score = 'Timeout' - else: - resulting_score = np.mean(tmp_it.result) - - tmp_it.stop() - return resulting_score + try: + with warnings.catch_warnings(): + warnings.simplefilter('ignore') + CV_score = cross_val_score( + clone(sklearn_pipeline), + features, + classes, + scoring=scoring_function, + cv=cv, + n_jobs=1, + verbose=0, + fit_params=sample_weight_dict + ) + return np.mean(CV_score) + except TimeoutException: + return "Timeout" + except Exception: + return -float('inf')