Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
170 commits
Select commit Hold shift + click to select a range
2b8d89e
[SPARK-2523] [SQL] Hadoop table scan bug fixing
chenghao-intel Jul 28, 2014
255b56f
[SPARK-2479][MLlib] Comparing floating-point numbers using relative e…
Jul 28, 2014
a7a9d14
[SPARK-2410][SQL] Merging Hive Thrift/JDBC server (with Maven profile…
liancheng Jul 28, 2014
39ab87b
Use commons-lang3 in SignalLogger rather than commons-lang
aarondav Jul 28, 2014
16ef4d1
Excess judgment
watermen Jul 29, 2014
ccd5ab5
[SPARK-2580] [PySpark] keep silent in worker if JVM close the socket
davies Jul 29, 2014
92ef026
[SPARK-791] [PySpark] fix pickle itemgetter with cloudpickle
davies Jul 29, 2014
96ba04b
[SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort.
rxin Jul 29, 2014
20424da
[SPARK-2174][MLLIB] treeReduce and treeAggregate
mengxr Jul 29, 2014
fc4d057
Minor indentation and comment typo fixes.
staple Jul 29, 2014
800ecff
[STREAMING] SPARK-1729. Make Flume pull data from source, rather than…
harishreedharan Jul 29, 2014
0c5c6a6
[SQL]change some test lists
adrian-wang Jul 29, 2014
e364348
[SPARK-2730][SQL] When retrieving a value from a Map, GetItem evaluat…
yhuai Jul 29, 2014
f0d880e
[SPARK-2674] [SQL] [PySpark] support datetime type for SchemaRDD
davies Jul 29, 2014
dc96536
[SPARK-2082] stratified sampling in PairRDDFunctions that guarantees …
dorx Jul 29, 2014
c7db274
[SPARK-2393][SQL] Cost estimation optimization framework for Catalyst…
concretevitamin Jul 29, 2014
2c35666
MAINTENANCE: Automated closing of pull requests.
pwendell Jul 30, 2014
39b8193
[SPARK-2716][SQL] Don't check resolved for having filters.
marmbrus Jul 30, 2014
86534d0
[SPARK-2631][SQL] Use SQLConf to configure in-memory columnar caching
marmbrus Jul 30, 2014
22649b6
[SPARK-2305] [PySpark] Update Py4J to version 0.8.2.1
JoshRosen Jul 30, 2014
8446746
[SPARK-2054][SQL] Code Generation for Expression Evaluation
marmbrus Jul 30, 2014
2e6efca
[SPARK-2568] RangePartitioner should run only one job if data is bala…
mengxr Jul 30, 2014
077f633
[SQL] Handle null values in debug()
marmbrus Jul 30, 2014
4ce92cc
[SPARK-2260] Fix standalone-cluster mode, which was broken
andrewor14 Jul 30, 2014
7003c16
[SPARK-2179][SQL] Public API for DataTypes and Schema
yhuai Jul 30, 2014
7c5fc28
SPARK-2543: Allow user to set maximum Kryo buffer size
koertkuipers Jul 30, 2014
ee07541
SPARK-2748 [MLLIB] [GRAPHX] Loss of precision for small arguments to …
srowen Jul 30, 2014
774142f
[SPARK-2521] Broadcast RDD object (instead of sending it along with e…
rxin Jul 30, 2014
3bc3f18
[SPARK-2747] git diff --dirstat can miss sql changes and not run Hive…
rxin Jul 30, 2014
e3d85b7
Avoid numerical instability
naftaliharris Jul 30, 2014
fc47bb6
[SPARK-2544][MLLIB] Improve ALS algorithm resource usage
witgo Jul 30, 2014
ff511ba
[SPARK-2746] Set SBT_MAVEN_PROFILES only when it is not set explicitl…
rxin Jul 30, 2014
f2eb84f
Wrap FWDIR in quotes.
rxin Jul 30, 2014
95cf203
Wrap FWDIR in quotes in dev/check-license.
rxin Jul 30, 2014
0feb349
More wrapping FWDIR in quotes.
rxin Jul 30, 2014
2248891
[SQL] Fix compiling of catalyst docs.
marmbrus Jul 30, 2014
437dc8c
dev/check-license wrap folders in quotes.
rxin Jul 30, 2014
94d1f46
[SPARK-2024] Add saveAsSequenceFile to PySpark
kanzhang Jul 30, 2014
7c7ce54
Wrap JAR_DL in dev/check-license.
rxin Jul 30, 2014
1097327
Set AMPLAB_JENKINS_BUILD_PROFILE.
rxin Jul 30, 2014
2f4b170
Properly pass SBT_MAVEN_PROFILES into sbt.
rxin Jul 30, 2014
6ab96a6
SPARK-2749 [BUILD]. Spark SQL Java tests aren't compiling in Jenkins'…
srowen Jul 30, 2014
2ac37db
SPARK-2741 - Publish version of spark assembly which does not contain…
Jul 31, 2014
88a519d
[SPARK-2734][SQL] Remove tables from cache when DROP TABLE is run.
marmbrus Jul 31, 2014
e9b275b
SPARK-2341 [MLLIB] loadLibSVMFile doesn't handle regression datasets
srowen Jul 31, 2014
da50176
Update DecisionTreeRunner.scala
strat0sphere Jul 31, 2014
e966284
SPARK-2045 Sort-based shuffle
mateiz Jul 31, 2014
894d48f
[SPARK-2758] UnionRDD's UnionPartition should not reference parent RDDs
rxin Jul 31, 2014
118c1c4
Required AM memory is "amMem", not "args.amMemory"
maji2014 Jul 31, 2014
a7c305b
[SPARK-2340] Resolve event logging and History Server paths properly
andrewor14 Jul 31, 2014
4fb2593
[SPARK-2737] Add retag() method for changing RDDs' ClassTags.
JoshRosen Jul 31, 2014
5a110da
[SPARK-2497] Included checks for module symbols too.
ScrapCodes Jul 31, 2014
669e3f0
automatically set master according to `spark.master` in `spark-defaul…
CrazyJvm Jul 31, 2014
92ca910
[SPARK-2762] SparkILoop leaks memory in multi-repl configurations
thunterdb Jul 31, 2014
3072b96
[SPARK-2743][SQL] Resolve original attributes in ParquetTableScan
marmbrus Jul 31, 2014
72cfb13
[SPARK-2397][SQL] Deprecate LocalHiveContext
marmbrus Jul 31, 2014
f193312
SPARK-2028: Expose mapPartitionsWithInputSplit in HadoopRDD
aarondav Jul 31, 2014
f68105d
SPARK-2664. Deal with `--conf` options in spark-submit that relate to…
sryza Jul 31, 2014
4dbabb3
SPARK-2749 [BUILD] Part 2. Fix a follow-on scalastyle error
srowen Jul 31, 2014
e5749a1
SPARK-2646. log4j initialization not quite compatible with log4j 2.x
srowen Jul 31, 2014
dc0865b
[SPARK-2511][MLLIB] add HashingTF and IDF
mengxr Jul 31, 2014
49b3612
[SPARK-2523] [SQL] Hadoop table scan bug fixing (fix failing Jenkins …
yhuai Jul 31, 2014
e021362
Improvements to merge_spark_pr.py
JoshRosen Jul 31, 2014
cc82050
Docs: monitoring, streaming programming guide
kennyballou Jul 31, 2014
492a195
SPARK-2740: allow user to specify ascending and numPartitions for sor…
Jul 31, 2014
ef4ff00
SPARK-2282: Reuse Socket for sending accumulator updates to Pyspark
aarondav Jul 31, 2014
8f51491
[SPARK-2531 & SPARK-2436] [SQL] Optimize the BuildSide when planning …
concretevitamin Aug 1, 2014
d843014
[SPARK-2724] Python version of RandomRDDGenerators
dorx Aug 1, 2014
b124de5
[SPARK-2756] [mllib] Decision tree bug fixes
jkbradley Aug 1, 2014
9632719
[SPARK-2779] [SQL] asInstanceOf[Map[...]] should use scala.collection…
yhuai Aug 1, 2014
9998efa
SPARK-2766: ScalaReflectionSuite throw an llegalArgumentException i…
witgo Aug 1, 2014
b190083
[SPARK-2777][MLLIB] change ALS factors storage level to MEMORY_AND_DISK
mengxr Aug 1, 2014
c475540
[SPARK-2782][mllib] Bug fix for getRanks in SpearmanCorrelation
dorx Aug 1, 2014
2cdc3e5
[SPARK-2702][Core] Upgrade Tachyon dependency to 0.5.0
haoyuan Aug 1, 2014
1499101
SPARK-2632, SPARK-2576. Fixed by only importing what is necessary dur…
ScrapCodes Aug 1, 2014
cb9e7d5
SPARK-2738. Remove redundant imports in BlockManagerSuite
sryza Aug 1, 2014
8ff4417
[SPARK-2670] FetchFailedException should be thrown when local fetch h…
sarutak Aug 1, 2014
72e3369
SPARK-983. Support external sorting in sortByKey()
mateiz Aug 1, 2014
f1957e1
SPARK-2134: Report metrics before application finishes
Aug 1, 2014
284771e
[Spark 2557] fix LOCAL_N_REGEX in createTaskScheduler and make local-…
advancedxy Aug 1, 2014
a32f0fb
[SPARK-2103][Streaming] Change to ClassTag for KafkaInputDStream and …
jerryshao Aug 1, 2014
82d209d
SPARK-2768 [MLLIB] Add product, user recommend method to MatrixFactor…
srowen Aug 1, 2014
0dacb1a
[SPARK-1997] update breeze to version 0.8.1
witgo Aug 1, 2014
5328c0a
[HOTFIX] downgrade breeze version to 0.7
mengxr Aug 1, 2014
8d338f6
SPARK-2099. Report progress while task is running.
sryza Aug 1, 2014
c41fdf0
[SPARK-2179][SQL] A minor refactoring Java data type APIs (2179 follo…
yhuai Aug 1, 2014
4415722
[SQL][SPARK-2212]Hash Outer Join
chenghao-intel Aug 1, 2014
580c701
[SPARK-2729] [SQL] Forgot to match Timestamp type in ColumnBuilder
chutium Aug 1, 2014
c0b47ba
[SPARK-2767] [SQL] SparkSQL CLI doens't output error message if query…
chenghao-intel Aug 1, 2014
c82fe47
[SQL] Documentation: Explain cacheTable command
CrazyJvm Aug 1, 2014
eb5bdca
[SPARK-695] In DAGScheduler's getPreferredLocs, track set of visited …
staple Aug 1, 2014
baf9ce1
[SPARK-2490] Change recursive visiting on RDD dependencies to iterati…
viirya Aug 1, 2014
f5d9bea
SPARK-1612: Fix potential resource leaks
zsxwing Aug 1, 2014
b270309
[SPARK-2379] Fix the bug that streaming's receiver may fall into a de…
joyyoj Aug 1, 2014
78f2af5
SPARK-2791: Fix committing, reverting and state tracking in shuffle f…
aarondav Aug 1, 2014
d88e695
[SPARK-2786][mllib] Python correlations
dorx Aug 1, 2014
7058a53
[SPARK-2796] [mllib] DecisionTree bug fix: ordered categorical features
jkbradley Aug 1, 2014
880eabe
[SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD
davies Aug 2, 2014
3822f33
[SPARK-2212][SQL] Hash Outer Join (follow-up bug fix).
yhuai Aug 2, 2014
0da07da
[SPARK-2116] Load spark-defaults.conf from SPARK_CONF_DIR if set
chu11 Aug 2, 2014
a38d3c9
[SPARK-2800]: Exclude scalastyle-output.xml Apache RAT checks
witgo Aug 2, 2014
e8e0fd6
[SPARK-2764] Simplify daemon.py process structure
JoshRosen Aug 2, 2014
f6a1899
Streaming mllib [SPARK-2438][MLLIB]
freeman-lab Aug 2, 2014
c281189
[SPARK-2550][MLLIB][APACHE SPARK] Support regularization and intercep…
miccagiann Aug 2, 2014
e25ec06
[SPARK-1580][MLLIB] Estimate ALS communication and computation costs.
tmyklebu Aug 2, 2014
fda4759
[SPARK-2801][MLlib]: DistributionGenerator renamed to RandomDataGener…
brkyvz Aug 2, 2014
4bc3bb2
StatCounter on NumPy arrays [PYSPARK][SPARK-2012]
freeman-lab Aug 2, 2014
adc8303
[SPARK-1470][SPARK-1842] Use the scala-logging wrapper instead of the…
witgo Aug 2, 2014
dab3796
Revert "[SPARK-1470][SPARK-1842] Use the scala-logging wrapper instea…
pwendell Aug 2, 2014
d934801
[SPARK-2316] Avoid O(blocks) operations in listeners
andrewor14 Aug 2, 2014
148af60
[SPARK-2454] Do not ship spark home to Workers
andrewor14 Aug 2, 2014
08c095b
[SPARK-1812] sql/catalyst - Provide explicit type information
avati Aug 2, 2014
25cad6a
HOTFIX: Fixing test error in maven for flume-sink.
pwendell Aug 2, 2014
44460ba
HOTFIX: Fix concurrency issue in FlumePollingStreamSuite.
pwendell Aug 2, 2014
87738bf
MAINTENANCE: Automated closing of pull requests.
pwendell Aug 2, 2014
e09e18b
[HOTFIX] Do not throw NPE if spark.test.home is not set
andrewor14 Aug 2, 2014
3f67382
[SPARK-2478] [mllib] DecisionTree Python API
jkbradley Aug 2, 2014
67bd8e3
[SQL] Set outputPartitioning of BroadcastHashJoin correctly.
yhuai Aug 2, 2014
91f9504
[SPARK-1981] Add AWS Kinesis streaming support
cfregly Aug 2, 2014
4c47711
SPARK-2804: Remove scalalogging-slf4j dependency
witgo Aug 2, 2014
158ad0b
[SPARK-2097][SQL] UDF Support
marmbrus Aug 2, 2014
198df11
[SPARK-2785][SQL] Remove assertions that throw when users try unsuppo…
marmbrus Aug 2, 2014
866cf1f
[SPARK-2729][SQL] Added test case for SPARK-2729
liancheng Aug 3, 2014
d210022
[SPARK-2797] [SQL] SchemaRDDs don't support unpersist()
yhuai Aug 3, 2014
1a80437
[SPARK-2739][SQL] Rename registerAsTable to registerTempTable
marmbrus Aug 3, 2014
33f167d
SPARK-2602 [BUILD] Tests steal focus under Java 6
srowen Aug 3, 2014
9cf429a
SPARK-2414 [BUILD] Add LICENSE entry for jquery
srowen Aug 3, 2014
3dc55fd
[Minor] Fixes on top of #1679
andrewor14 Aug 3, 2014
f8cd143
SPARK-2712 - Add a small note to maven doc that mvn package must happ…
javadba Aug 3, 2014
a0bcbc1
SPARK-2246: Add user-data option to EC2 scripts
Aug 3, 2014
2998e38
[SPARK-2197] [mllib] Java DecisionTree bug fix and easy-of-use
jkbradley Aug 3, 2014
236dfac
[SPARK-2784][SQL] Deprecate hql() method in favor of a config option,…
marmbrus Aug 3, 2014
ac33cbb
[SPARK-2814][SQL] HiveThriftServer2 throws NPE when executing native …
liancheng Aug 3, 2014
e139e2b
[SPARK-2783][SQL] Basic support for analyze in HiveContext
yhuai Aug 3, 2014
55349f9
[SPARK-1740] [PySpark] kill the python worker
davies Aug 3, 2014
6ba6c3e
[SPARK-2810] upgrade to scala-maven-plugin 3.2.0
avati Aug 4, 2014
5507dd8
Fix some bugs with spaces in directory name.
sarahgerweck Aug 4, 2014
ae58aea
SPARK-2272 [MLlib] Feature scaling which standardizes the range of in…
Aug 4, 2014
e053c55
[MLlib] [SPARK-2510]Word2Vec: Distributed Representation of Words
Aug 4, 2014
59f84a9
[SPARK-1687] [PySpark] pickable namedtuple
davies Aug 4, 2014
8e7d5ba
SPARK-2792. Fix reading too much or too little data from each stream …
mateiz Aug 4, 2014
9fd82db
[SPARK-1687] [PySpark] fix unit tests related to pickable namedtuple
davies Aug 4, 2014
05bf4e4
[SPARK-2323] Exception in accumulator update should not crash DAGSche…
rxin Aug 5, 2014
066765d
SPARK-2685. Update ExternalAppendOnlyMap to avoid buffer.remove()
mateiz Aug 5, 2014
4fde28c
SPARK-2711. Create a ShuffleMemoryManager to track memory for all spi…
mateiz Aug 5, 2014
a646a36
[SPARK-2857] Correct properties to set Master / Worker ports
andrewor14 Aug 5, 2014
9862c61
[SPARK-1779] Throw an exception if memory fractions are not between 0…
Aug 5, 2014
184048f
[SPARK-2856] Decrease initial buffer size for Kryo to 64KB.
rxin Aug 5, 2014
e87075d
[SPARK-1022][Streaming] Add Kafka real unit test
jerryshao Aug 5, 2014
2c0f705
SPARK-1528 - spark on yarn, add support for accessing remote HDFS
tgravescs Aug 5, 2014
1c5555a
SPARK-1890 and SPARK-1891- add admin and modify acls
tgravescs Aug 5, 2014
6e821e3
[SPARK-2860][SQL] Fix coercion of CASE WHEN.
marmbrus Aug 5, 2014
ac3440f
[SPARK-2859] Update url of Kryo project in related docs
gchen Aug 5, 2014
74f82c7
SPARK-2380: Support displaying accumulator values in the web UI
pwendell Aug 5, 2014
41e0a21
SPARK-1680: use configs for specifying environment variables on YARN
tgravescs Aug 5, 2014
cc491f6
[SPARK-2864][MLLIB] fix random seed in word2vec; move model to local
mengxr Aug 5, 2014
acff9a7
[SPARK-2503] Lower shuffle output buffer (spark.shuffle.file.buffer.k…
rxin Aug 5, 2014
1aad911
[SPARK-2550][MLLIB][APACHE SPARK] Support regularization and intercep…
miccagiann Aug 5, 2014
2643e66
SPARK-2869 - Fix tiny bug in JdbcRdd for closing jdbc connection
Aug 6, 2014
d94f599
[sql] rename project name in pom.xml of hive-thriftserver module
scwf Aug 6, 2014
d0ae3f3
[SPARK-2650][SQL] Try to partially fix SPARK-2650 by adjusting initia…
liancheng Aug 6, 2014
69ec678
[SPARK-2854][SQL] Finalize _acceptable_types in pyspark.sql
yhuai Aug 6, 2014
1d70c4f
[SPARK-2866][SQL] Support attributes in ORDER BY that aren't in SELECT
marmbrus Aug 6, 2014
82624e2
[SPARK-2806] core - upgrade to json4s-jackson 3.2.10
avati Aug 6, 2014
b70bae4
[SQL] Tighten the visibility of various SQLConf methods and renamed s…
rxin Aug 6, 2014
5a826c0
[SQL] Fix logging warn -> debug
marmbrus Aug 6, 2014
63bdb1f
SPARK-2294: fix locality inversion bug in TaskManager
CodingCat Aug 6, 2014
c7b5201
[MLlib] Use this.type as return type in k-means' builder pattern
Aug 6, 2014
ee7f308
[SPARK-1022][Streaming][HOTFIX] Fixed zookeeper dependency of Kafka
tdas Aug 6, 2014
09f7e45
[SPARK-2157] Enable tight firewall rules for Spark
andrewor14 Aug 6, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Streaming mllib [SPARK-2438][MLLIB]
This PR implements a streaming linear regression analysis, in which a linear regression model is trained online as new data arrive. The design is based on discussions with tdas and mengxr, in which we determined how to add this functionality in a general way, with minimal changes to existing libraries.

__Summary of additions:__

_StreamingLinearAlgorithm_
- An abstract class for fitting generalized linear models online to streaming data, including training on (and updating) a model, and making predictions.

_StreamingLinearRegressionWithSGD_
- Class and companion object for running streaming linear regression

_StreamingLinearRegressionTestSuite_
- Unit tests

_StreamingLinearRegression_
- Example use case: fitting a model online to data from one stream, and making predictions on other data

__Notes__
- If this looks good, I can use the StreamingLinearAlgorithm class to easily implement other analyses that follow the same logic (Ridge, Lasso, Logistic, SVM).

Author: Jeremy Freeman <the.freeman.lab@gmail.com>
Author: freeman <the.freeman.lab@gmail.com>

Closes apache#1361 from freeman-lab/streaming-mllib and squashes the following commits:

775ea29 [Jeremy Freeman] Throw error if user doesn't initialize weights
4086fee [Jeremy Freeman] Fixed current weight formatting
8b95b27 [Jeremy Freeman] Restored broadcasting
29f27ec [Jeremy Freeman] Formatting
8711c41 [Jeremy Freeman] Used return to avoid indentation
777b596 [Jeremy Freeman] Restored treeAggregate
74cf440 [Jeremy Freeman] Removed static methods
d28cf9a [Jeremy Freeman] Added usage notes
c3326e7 [Jeremy Freeman] Improved documentation
9541a41 [Jeremy Freeman] Merge remote-tracking branch 'upstream/master' into streaming-mllib
66eba5e [Jeremy Freeman] Fixed line lengths
2fe0720 [Jeremy Freeman] Minor cleanup
7d51378 [Jeremy Freeman] Moved streaming loader to MLUtils
b9b69f6 [Jeremy Freeman] Added setter methods
c3f8b5a [Jeremy Freeman] Modified logging
00aafdc [Jeremy Freeman] Add modifiers
14b801e [Jeremy Freeman] Name changes
c7d38a3 [Jeremy Freeman] Move check for empty data to GradientDescent
4b0a5d3 [Jeremy Freeman] Cleaned up tests
74188d6 [Jeremy Freeman] Eliminate dependency on commons
50dd237 [Jeremy Freeman] Removed experimental tag
6bfe1e6 [Jeremy Freeman] Fixed imports
a2a63ad [freeman] Makes convergence test more robust
86220bc [freeman] Streaming linear regression unit tests
fb4683a [freeman] Minor changes for scalastyle consistency
fd31e03 [freeman] Changed logging behavior
453974e [freeman] Fixed indentation
c4b1143 [freeman] Streaming linear regression
604f4d7 [freeman] Expanded private class to include mllib
d99aa85 [freeman] Helper methods for streaming MLlib apps
0898add [freeman] Added dependency on streaming
  • Loading branch information
freeman-lab authored and mengxr committed Aug 2, 2014
commit f6a1899306c5ad766fea122d3ab4b83436d9f6fd
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.mllib

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Train a linear regression model on one stream of data and make predictions
* on another stream, where the data streams arrive as text files
* into two different directories.
*
* The rows of the text files must be labeled data points in the form
* `(y,[x1,x2,x3,...,xn])`
* Where n is the number of features. n must be the same for train and test.
*
* Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>
*
* To run on your local machine using the two directories `trainingDir` and `testDir`,
* with updates every 5 seconds, and 2 features per data point, call:
* $ bin/run-example \
* org.apache.spark.examples.mllib.StreamingLinearRegression trainingDir testDir 5 2
*
* As you add text files to `trainingDir` the model will continuously update.
* Anytime you add text files to `testDir`, you'll see predictions from the current model.
*
*/
object StreamingLinearRegression {

def main(args: Array[String]) {

if (args.length != 4) {
System.err.println(
"Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>")
System.exit(1)
}

val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = MLUtils.loadStreamingLabeledPoints(ssc, args(0))
val testData = MLUtils.loadStreamingLabeledPoints(ssc, args(1))

val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(Array.fill[Double](args(3).toInt)(0)))

model.trainOn(trainingData)
model.predictOn(testData).print()

ssc.start()
ssc.awaitTermination()

}

}
5 changes: 5 additions & 0 deletions mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ object GradientDescent extends Logging {
val numExamples = data.count()
val miniBatchSize = numExamples * miniBatchFraction

// if no data, return initial weights to avoid NaNs
if (numExamples == 0) {

logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
return (initialWeights, stochasticLossHistory.toArray)

}

// Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)
val n = weights.size
Expand Down Expand Up @@ -202,5 +210,6 @@ object GradientDescent extends Logging {
stochasticLossHistory.takeRight(10).mkString(", ")))

(weights, stochasticLossHistory.toArray)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class LinearRegressionModel (
* its corresponding right hand side label y.
* See also the documentation for the precise formulation.
*/
class LinearRegressionWithSGD private (
class LinearRegressionWithSGD private[mllib] (
private var stepSize: Double,
private var numIterations: Int,
private var miniBatchFraction: Double)
Expand All @@ -68,7 +68,7 @@ class LinearRegressionWithSGD private (
*/
def this() = this(1.0, 100, 1.0)

override protected def createModel(weights: Vector, intercept: Double) = {
override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
new LinearRegressionModel(weights, intercept)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging
import org.apache.spark.streaming.dstream.DStream

/**
* :: DeveloperApi ::
* StreamingLinearAlgorithm implements methods for continuously
* training a generalized linear model model on streaming data,
* and using it for prediction on (possibly different) streaming data.
*
* This class takes as type parameters a GeneralizedLinearModel,
* and a GeneralizedLinearAlgorithm, making it easy to extend to construct
* streaming versions of any analyses using GLMs.
* Initial weights must be set before calling trainOn or predictOn.
* Only weights will be updated, not an intercept. If the model needs
* an intercept, it should be manually appended to the input data.
*
* For example usage, see `StreamingLinearRegressionWithSGD`.
*
* NOTE(Freeman): In some use cases, the order in which trainOn and predictOn
* are called in an application will affect the results. When called on
* the same DStream, if trainOn is called before predictOn, when new data
* arrive the model will update and the prediction will be based on the new
* model. Whereas if predictOn is called first, the prediction will use the model
* from the previous update.
*
* NOTE(Freeman): It is ok to call predictOn repeatedly on multiple streams; this
* will generate predictions for each one all using the current model.
* It is also ok to call trainOn on different streams; this will update
* the model using each of the different sources, in sequence.
*
*/
@DeveloperApi
abstract class StreamingLinearAlgorithm[
M <: GeneralizedLinearModel,
A <: GeneralizedLinearAlgorithm[M]] extends Logging {

/** The model to be updated and used for prediction. */
protected var model: M

/** The algorithm to use for updating. */
protected val algorithm: A

/** Return the latest model. */
def latestModel(): M = {
model
}

/**
* Update the model by training on batches of data from a DStream.
* This operation registers a DStream for training the model,
* and updates the model based on every subsequent
* batch of data from the stream.
*
* @param data DStream containing labeled data
*/
def trainOn(data: DStream[LabeledPoint]) {
if (Option(model.weights) == None) {
logError("Initial weights must be set before starting training")
throw new IllegalArgumentException
}
data.foreachRDD { (rdd, time) =>
model = algorithm.run(rdd, model.weights)
logInfo("Model updated at time %s".format(time.toString))
val display = model.weights.size match {
case x if x > 100 => model.weights.toArray.take(100).mkString("[", ",", "...")
case _ => model.weights.toArray.mkString("[", ",", "]")
}
logInfo("Current model: weights, %s".format (display))
}
}

/**
* Use the model to make predictions on batches of data from a DStream
*
* @param data DStream containing labeled data
* @return DStream containing predictions
*/
def predictOn(data: DStream[LabeledPoint]): DStream[Double] = {
if (Option(model.weights) == None) {
logError("Initial weights must be set before starting prediction")
throw new IllegalArgumentException
}
data.map(x => model.predict(x.features))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{Vector, Vectors}

/**
* Train or predict a linear regression model on streaming data. Training uses
* Stochastic Gradient Descent to update the model based on each new batch of
* incoming data from a DStream (see `LinearRegressionWithSGD` for model equation)
*
* Each batch of data is assumed to be an RDD of LabeledPoints.
* The number of data points per batch can vary, but the number
* of features must be constant. An initial weight
* vector must be provided.
*
* Use a builder pattern to construct a streaming linear regression
* analysis in an application, like:
*
* val model = new StreamingLinearRegressionWithSGD()
* .setStepSize(0.5)
* .setNumIterations(10)
* .setInitialWeights(Vectors.dense(...))
* .trainOn(DStream)
*
*/
@Experimental
class StreamingLinearRegressionWithSGD (
private var stepSize: Double,
private var numIterations: Int,
private var miniBatchFraction: Double,
private var initialWeights: Vector)
extends StreamingLinearAlgorithm[
LinearRegressionModel, LinearRegressionWithSGD] with Serializable {

/**
* Construct a StreamingLinearRegression object with default parameters:
* {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0}.
* Initial weights must be set before using trainOn or predictOn
* (see `StreamingLinearAlgorithm`)
*/
def this() = this(0.1, 50, 1.0, null)

val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)

var model = algorithm.createModel(initialWeights, 0.0)

/** Set the step size for gradient descent. Default: 0.1. */
def setStepSize(stepSize: Double): this.type = {
this.algorithm.optimizer.setStepSize(stepSize)
this
}

/** Set the number of iterations of gradient descent to run per update. Default: 50. */
def setNumIterations(numIterations: Int): this.type = {
this.algorithm.optimizer.setNumIterations(numIterations)
this
}

/** Set the fraction of each batch to use for updates. Default: 1.0. */
def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
this.algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)
this
}

/** Set the initial weights. Default: [0.0, 0.0]. */
def setInitialWeights(initialWeights: Vector): this.type = {
this.model = algorithm.createModel(initialWeights, 0.0)
this
}

}
15 changes: 15 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.spark.util.random.BernoulliSampler
import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

/**
* Helper methods to load, save and pre-process data used in ML Lib.
Expand Down Expand Up @@ -192,6 +194,19 @@ object MLUtils {
def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] =
loadLabeledPoints(sc, dir, sc.defaultMinPartitions)

/**
* Loads streaming labeled points from a stream of text files
* where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`.
* See `StreamingContext.textFileStream` for more details on how to
* generate a stream from files
*
* @param ssc Streaming context
* @param dir Directory path in any Hadoop-supported file system URI
* @return Labeled points stored as a DStream[LabeledPoint]
*/
def loadStreamingLabeledPoints(ssc: StreamingContext, dir: String): DStream[LabeledPoint] =
ssc.textFileStream(dir).map(LabeledPointParser.parse)

/**
* Load labeled data from a file. The data format used here is
* <L>, <f1> <f2> ...
Expand Down
Loading