Skip to content

Commit ba8e3db

Browse files
MaxBenChristjneuff
authored andcommitted
Extract distribution module and add support for dask.distributed
* Extract the multiprocessing and distributed computing into its own module to make it better extenable * Added more comments * Use an instance instead of the class itself * Add the decode function to the partial evaluation * First (unfinished) local testing version of big_fresh using lambdas * big_fresh is not its own project * Temporarily removed the decode * Fixed bugs (e.g. after merging) * draft for is_valid_ip_and_port method * ipaddress package to check if ip * unit testing is_valid_ip_and_port * add ipaddress to requirements * credit where it belongs! * added dask-requirements.txt * added cluster dask distributor * address is sufficient to connect to dask cluster * we support cluster calculations now * need custom chunksize for dask on cluster * close cluster dask client * added unit test for partition * add unit tests for calculate best chunksize * added string manipulation and distribution to docs * refactored distribution module * added warning if no Distributor object is given * py3 does not have .next method * corrected extract features docstring * removed automatic construction of ClusterDaskDistributor * check ip address before passing it to ClusterDaskDistributor * made the Distributor class abstract * add _init_ for MapDistributor * Mapdistributor does not need n_workers argument * only use tqdm if distributor supports progressbar * it is chunk_size not chunksize * refactoring of distribution module * cleaned imports in extraction * not test the abstract Distributor baseclass * still working on docstrings for distribution * test MultiprocessingDistributor * MapDistributor should use chunk_size 1 * set defaults for progress bar * fix MultiprocessingDistributorTestCase * new page about distributed freshness * removed empty reurnts for close methods * correct references in docs * renamed Distributor to DistributorBaseClass * add minimal example for distributor * now the example is working * refactoring * polish doc page * pass over cluster page * no need to make calculate_best_chunks_size private * another pass over cluster doc page * correct warning * add DistributorUsageTestCase * most todos done * move dask-requirements into requirements.txt * save the number of workers * dask distributors need to set partial as well * earlier close distributors * need to flatten dask results * add unit test for lokal dask distributor * use threads for local dask cluster, not processes * removed is_valid_ip_and_port * remove uncessary imports
1 parent abc3021 commit ba8e3db

File tree

11 files changed

+686
-54
lines changed

11 files changed

+686
-54
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ The algorithm is described in the following paper
6161
4. it has a comprehensive documentation
6262
5. it is compatible with sklearn, pandas and numpy
6363
6. it allows anyone to easily add their favorite features
64+
7. it both runs on your local machine or on even on a cluster
6465

6566
## Next steps
6667

docs/api/tsfresh.utilities.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,18 @@ profiling
2222
:undoc-members:
2323
:show-inheritance:
2424

25+
string_manipulation
26+
-------------------
27+
28+
.. automodule:: tsfresh.utilities.string_manipulation
29+
:members:
30+
:undoc-members:
31+
:show-inheritance:
32+
33+
distribution
34+
------------
35+
36+
.. automodule:: tsfresh.utilities.distribution
37+
:members:
38+
:undoc-members:
39+
:show-inheritance:

docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ The following chapters will explain the tsfresh package in detail:
3030
Feature Filtering <text/feature_filtering>
3131
How to write custom Feature Calculators <text/how_to_add_custom_feature>
3232
Parallelization <text/parallelization>
33+
tsfresh on a cluster <text/tsfresh_on_a_cluster>
3334
Time Series Forecasting <text/forecasting>
3435
FAQ <text/faq>
3536
Authors <authors>

docs/text/tsfresh_on_a_cluster.rst

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
.. role:: python(code)
2+
:language: python
3+
4+
How to deploy tsfresh at scale
5+
==============================
6+
7+
The high volume of time series data can demand an analysis at scale.
8+
So, time series need to be processed on a group of computational units instead of a singular machine.
9+
10+
Accordingly, it may be necessary to distribute the extraction of time series features to a cluster.
11+
Indeed, it is possible to extract features with *tsfresh* in a distributed fashion.
12+
This page will explain how to setup a distributed *tsfresh*.
13+
14+
The distributor class
15+
'''''''''''''''''''''
16+
17+
To distribute the calculation of features, we use a certain object, the Distributor class (contained in the
18+
:mod:`tsfresh.utilities.distribution` module).
19+
20+
Essentially, a Distributor organizes the application of feature calculators to data chunks.
21+
It maps the feature calculators to the data chunks and then reduces them, meaning that it combines the results of the
22+
individual mapping into one object, the feature matrix.
23+
24+
So, Distributor will, in the following order,
25+
26+
1. calculates an optimal :python:`chunk_size`, based on the characteristics of the time series data at hand
27+
(by :func:`~tsfresh.utilities.distribution.DistributorBaseClass.calculate_best_chunk_size`)
28+
29+
2. split the time series data into chunks
30+
(by :func:`~tsfresh.utilities.distribution.DistributorBaseClass.partition`)
31+
32+
3. distribute the applying of the feature calculators to the data chunks
33+
(by :func:`~tsfresh.utilities.distribution.DistributorBaseClass.distribute`)
34+
35+
4. combine the results into the feature matrix
36+
(by :func:`~tsfresh.utilities.distribution.DistributorBaseClass.map_reduce`)
37+
38+
5. close all connections, shutdown all resources and clean everything
39+
(by :func:`~tsfresh.utilities.distribution.DistributorBaseClass.close`)
40+
41+
So, how can you use such a Distributor to extract features with *tsfresh*?
42+
You will have to pass it into as the :python:`distributor` argument to the :func:`~tsfresh.feature_extraction.extract_features`
43+
method.
44+
45+
46+
The following example shows how to define the MultiprocessingDistributor, which will distribute the calculations to a
47+
local pool of threads:
48+
49+
.. code:: python
50+
51+
from tsfresh.examples.robot_execution_failures import \
52+
download_robot_execution_failures, \
53+
load_robot_execution_failures
54+
from tsfresh.feature_extraction import extract_features
55+
from tsfresh.utilities.distribution import MultiprocessingDistributor
56+
57+
# download and load some time series data
58+
download_robot_execution_failures()
59+
df, y = load_robot_execution_failures()
60+
61+
# We construct a Distributor that will spawn the calculations
62+
# over four threads on the local machine
63+
Distributor = MultiprocessingDistributor(n_workers=4,
64+
disable_progressbar=False,
65+
progressbar_title="Feature Extraction")
66+
67+
# just to pass the Distributor object to
68+
# the feature extraction, along the other parameters
69+
X = extract_features(timeseries_container=df,
70+
column_id='id', column_sort='time',
71+
distributor=Distributor)
72+
73+
This example actually corresponds to the existing multiprocessing *tsfresh* API, where you just specify the number of
74+
jobs, without the need to construct the Distributor:
75+
76+
.. code:: python
77+
78+
from tsfresh.examples.robot_execution_failures import \
79+
download_robot_execution_failures, \
80+
load_robot_execution_failures
81+
from tsfresh.feature_extraction import extract_features
82+
83+
download_robot_execution_failures()
84+
df, y = load_robot_execution_failures()
85+
86+
X = extract_features(timeseries_container=df,
87+
column_id='id', column_sort='time',
88+
n_jobs=4)
89+
90+
Using dask to distribute the calculations
91+
'''''''''''''''''''''''''''''''''''''''''
92+
93+
We provide distributor for the `dask framework <https://dask.pydata.org/en/latest/>`_, where
94+
*"Dask is a flexible parallel computing library for analytic computing."*
95+
96+
Dask is a great framework to distribute analytic calculations to a cluster.
97+
It scales up and down, meaning that you can even use it on a singular machine.
98+
The only thing that you will need to run *tsfresh* on a Dask cluster is the ip address and port number of the
99+
`dask-scheduler <http://distributed.readthedocs.io/en/latest/setup.html>`_.
100+
101+
Lets say that your dask scheduler is running at ``192.168.0.1:8786``, then we can easily construct a
102+
:class:`~sfresh.utilities.distribution.ClusterDaskDistributor` that connects to the sceduler and distributes the
103+
time series data and the calculation to a cluster:
104+
105+
.. code:: python
106+
107+
from tsfresh.examples.robot_execution_failures import \
108+
download_robot_execution_failures, \
109+
load_robot_execution_failures
110+
from tsfresh.feature_extraction import extract_features
111+
from tsfresh.utilities.distribution import ClusterDaskDistributor
112+
113+
download_robot_execution_failures()
114+
df, y = load_robot_execution_failures()
115+
116+
Distributor = ClusterDaskDistributor(address="192.168.0.1:8786")
117+
118+
X = extract_features(timeseries_container=df,
119+
column_id='id', column_sort='time',
120+
distributor=Distributor)
121+
122+
Compared to the :class:`~tsfresh.utilities.distribution.MultiprocessingDistributor` example from above, we only had to
123+
change one line to switch from one machine to a whole cluster.
124+
It is as easy as that.
125+
By changing the Distributor you can easily deploy your application to run to a cluster instead of your workstation.
126+
127+
You can also use a local DaskCluster on your local machine to emulate a Dask network.
128+
The following example shows how to setup a :class:`~tsfresh.utilities.distribution.LocalDaskDistributor` on a local cluster
129+
of 3 workers:
130+
131+
.. code:: python
132+
133+
from tsfresh.examples.robot_execution_failures import \
134+
download_robot_execution_failures, \
135+
load_robot_execution_failures
136+
from tsfresh.feature_extraction import extract_features
137+
from tsfresh.utilities.distribution import LocalDaskDistributor
138+
139+
download_robot_execution_failures()
140+
df, y = load_robot_execution_failures()
141+
142+
Distributor = LocalDaskDistributor(n_workers=3)
143+
144+
X = extract_features(timeseries_container=df,
145+
column_id='id', column_sort='time',
146+
distributor=Distributor)
147+
148+
Writing your own distributor
149+
''''''''''''''''''''''''''''
150+
151+
If you want to user another framework than Dask, you will have to write your own Distributor.
152+
To construct your custom Distributor, you will have to define an object that inherits from the abstract base class
153+
:class:`tsfresh.utilities.distribution.DistributorBaseClass`.
154+
The :mod:`tsfresh.utilities.distribution` module contains more information about what you will need to implement.
155+
156+

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@ scikit-learn>=0.17.1
88
future>=0.16.0
99
six>=1.10.0
1010
tqdm>=4.10.0
11+
ipaddress
12+
dask==0.15.2
13+
distributed==1.18.3

tests/units/feature_extraction/test_extraction.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,17 @@
99
import numpy as np
1010
import pandas as pd
1111
import six
12+
from mock import Mock
1213

1314
from tests.fixtures import DataTestCase
1415
from tsfresh.feature_extraction.extraction import extract_features
1516
from tsfresh.feature_extraction.settings import ComprehensiveFCParameters
1617

1718
import tempfile
1819

20+
from tsfresh.utilities.distribution import DistributorBaseClass
21+
22+
1923
class ExtractionTestCase(DataTestCase):
2024
"""The unit tests in this module make sure if the time series features are created properly"""
2125

@@ -170,3 +174,36 @@ def test_extract_features(self):
170174
self.assertTrue(np.all(extracted_features.b__abs_energy == np.array([36619, 35483])))
171175
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75])))
172176
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0])))
177+
178+
179+
class DistributorUsageTestCase(DataTestCase):
180+
def setUp(self):
181+
# only calculate some features to reduce load on travis ci
182+
self.name_to_param = {"maximum": None}
183+
184+
def test_assert_is_distributor(self):
185+
df = self.create_test_data_sample()
186+
187+
self.assertRaises(ValueError, extract_features,
188+
timeseries_container=df, column_id="id", column_sort="sort", column_kind="kind",
189+
column_value="val", default_fc_parameters=self.name_to_param,
190+
distributor=object())
191+
192+
self.assertRaises(ValueError, extract_features,
193+
timeseries_container=df, column_id="id", column_sort="sort", column_kind="kind",
194+
column_value="val", default_fc_parameters=self.name_to_param,
195+
distributor=13)
196+
197+
def test_distributor_map_reduce_and_close_are_called(self):
198+
df = self.create_test_data_sample()
199+
200+
mock = Mock(spec=DistributorBaseClass)
201+
mock.close.return_value = None
202+
mock.map_reduce.return_value = []
203+
204+
X = extract_features(timeseries_container=df, column_id="id", column_sort="sort", column_kind="kind",
205+
column_value="val", default_fc_parameters=self.name_to_param,
206+
distributor=mock)
207+
208+
self.assertTrue(mock.close.called)
209+
self.assertTrue(mock.map_reduce.called)
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# -*- coding: utf-8 -*-
2+
# This file as well as the whole tsfresh package are licenced under the MIT licence (see the LICENCE.txt)
3+
# Maximilian Christ (maximilianchrist.com), Blue Yonder Gmbh, 2016
4+
5+
from unittest import TestCase
6+
import numpy as np
7+
import pandas as pd
8+
9+
from tsfresh import extract_features
10+
from tsfresh.utilities.distribution import MultiprocessingDistributor, LocalDaskDistributor
11+
from tests.fixtures import DataTestCase
12+
13+
14+
class MultiprocessingDistributorTestCase(TestCase):
15+
16+
def test_partion(self):
17+
18+
distributor = MultiprocessingDistributor(n_workers=1)
19+
20+
data = [1, 3, 10, -10, 343.0]
21+
distro = distributor.partition(data, 3)
22+
self.assertEqual(next(distro), [1, 3, 10])
23+
self.assertEqual(next(distro), [-10, 343.0])
24+
25+
data = np.arange(10)
26+
distro = distributor.partition(data, 2)
27+
self.assertEqual(next(distro), [0, 1])
28+
self.assertEqual(next(distro), [2, 3])
29+
30+
def test__calculate_best_chunk_size(self):
31+
32+
distributor = MultiprocessingDistributor(n_workers=2)
33+
self.assertEqual(distributor.calculate_best_chunk_size(10), 1)
34+
self.assertEqual(distributor.calculate_best_chunk_size(11), 2)
35+
self.assertEqual(distributor.calculate_best_chunk_size(100), 10)
36+
self.assertEqual(distributor.calculate_best_chunk_size(101), 11)
37+
38+
distributor = MultiprocessingDistributor(n_workers=3)
39+
self.assertEqual(distributor.calculate_best_chunk_size(10), 1)
40+
self.assertEqual(distributor.calculate_best_chunk_size(30), 2)
41+
self.assertEqual(distributor.calculate_best_chunk_size(31), 3)
42+
43+
44+
class LocalDaskDistributorTestCase(DataTestCase):
45+
46+
def test_local_dask_cluster_extraction(self):
47+
48+
Distributor = LocalDaskDistributor(n_workers=1)
49+
50+
df = self.create_test_data_sample()
51+
extracted_features = extract_features(df, column_id="id", column_sort="sort", column_kind="kind",
52+
column_value="val",
53+
distributor=Distributor)
54+
55+
self.assertIsInstance(extracted_features, pd.DataFrame)
56+
self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77])))
57+
self.assertTrue(np.all(extracted_features.a__sum_values == np.array([691, 1017])))
58+
self.assertTrue(np.all(extracted_features.a__abs_energy == np.array([32211, 63167])))
59+
self.assertTrue(np.all(extracted_features.b__sum_values == np.array([757, 695])))
60+
self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1])))
61+
self.assertTrue(np.all(extracted_features.b__abs_energy == np.array([36619, 35483])))
62+
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75])))
63+
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0])))
64+

tests/units/utilities/test_string_manipilations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ def test_convert_to_output_format(self):
2222

2323
out = convert_to_output_format({"list": ["a", "b", "c"]})
2424
expected_out = "list_['a', 'b', 'c']"
25-
self.assertEqual(out, expected_out)
25+
self.assertEqual(out, expected_out)

0 commit comments

Comments
 (0)