Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
207 commits
Select commit Hold shift + click to select a range
300887b
[SPARK-3649] Remove GraphX custom serializers
ankurdave Nov 11, 2014
3c07b8f
[SPARK-4330][Doc] Link to proper URL for YARN overview
sarutak Nov 11, 2014
65083e9
[SPARK-4324] [PySpark] [MLlib] support numpy.array for all MLlib API
Nov 11, 2014
ef29a9a
[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
rxin Nov 11, 2014
f8811a5
[SPARK-4295][External]Fix exception in SparkSinkSuite
Nov 11, 2014
c8850a3
[SPARK-2492][Streaming] kafkaReceiver minor changes to align with Kaf…
jerryshao Nov 11, 2014
6e03de3
[Streaming][Minor]Replace some 'if-else' in Clock
SaintBacchus Nov 11, 2014
deefd9d
SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent P…
ScrapCodes Nov 11, 2014
f820b56
SPARK-4305 [BUILD] yarn-alpha profile won't build due to network/yarn…
srowen Nov 11, 2014
7f37188
[SPARK-4282][YARN] Stopping flag in YarnClientSchedulerBackend should…
sarutak Nov 11, 2014
a878660
SPARK-2269 Refactor mesos scheduler resourceOffers and add unit test
tnachen Nov 11, 2014
2ddb141
[Release] Log build output for each distribution
Nov 12, 2014
daaca14
Support cross building for Scala 2.11
ScrapCodes Nov 12, 2014
2ef016b
[MLLIB] SPARK-4347: Reducing GradientBoostingSuite run time.
manishamde Nov 12, 2014
faeb41d
[SPARK-3936] Add aggregateMessages, which supersedes mapReduceTriplets
ankurdave Nov 12, 2014
84324fb
[SPARK-4355][MLLIB] fix OnlineSummarizer.merge when other.mean is zero
mengxr Nov 12, 2014
4b736db
[SPARK-3530][MLLIB] pipeline and parameters with examples
mengxr Nov 12, 2014
36ddeb7
[SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation
soumitrak Nov 12, 2014
6e3c5a2
[Test] Better exception message from SparkSubmitSuite
Nov 12, 2014
aa43a8d
[SPARK-4281][Build] Package Yarn shuffle service into its own jar
Nov 12, 2014
0402be9
Internal cleanup for aggregateMessages
ankurdave Nov 12, 2014
c3afd32
[Release] Correct make-distribution.sh log path
Nov 12, 2014
a5ef581
[SPARK-3666] Extract interfaces for EdgeRDD and VertexRDD
ankurdave Nov 12, 2014
bd86118
[SPARK-4369] [MLLib] fix TreeModel.predict() with RDD
Nov 12, 2014
d7d54a4
[SPARK-2672] support compressed file in wholeTextFile
Nov 12, 2014
723a86b
[Release] Bring audit scripts up-to-date
andrewor14 Nov 13, 2014
23f5bdf
[SPARK-4373][MLLIB] fix MLlib maven tests
mengxr Nov 13, 2014
b9e1c2e
[SPARK-4370] [Core] Limit number of Netty cores based on executor size
aarondav Nov 13, 2014
484fecb
[SPARK-4256] Make Binary Evaluation Metrics functions defined in case…
Nov 13, 2014
ce0333f
[SPARK-4348] [PySpark] [MLlib] rename random.py to rand.py
Nov 13, 2014
ca26a21
[SPARK-4378][MLLIB] make ALS more Java-friendly
mengxr Nov 13, 2014
a0fa1ba
[HOT FIX] make-distribution.sh fails if Yarn shuffle jar DNE
Nov 13, 2014
4b0c1ed
[SPARK-4326] fix unidoc
mengxr Nov 13, 2014
3221830
[SPARK-4372][MLLIB] Make LR and SVM's default parameters consistent i…
mengxr Nov 13, 2014
825709a
[SPARK-4310][WebUI] Sort 'Submitted' column in Stage page by time
zsxwing Nov 13, 2014
e421072
[SPARK-3722][Docs]minor improvement and fix in docs
WangTaoTheTonic Nov 14, 2014
77e845c
[SPARK-4394][SQL] Data Sources API Improvements
marmbrus Nov 14, 2014
abd5817
[SPARK-4398][PySpark] specialize sc.parallelize(xrange)
mengxr Nov 14, 2014
0c56a03
[Spark Core] SPARK-4380 Edit spilling log from MB to B
shenh062326 Nov 14, 2014
5c265cc
SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR
ash211 Nov 14, 2014
156cf33
[SPARK-4313][WebUI][Yarn] Fix link issue of the executor thread dump …
zsxwing Nov 14, 2014
c258db9
Update failed assert text to match code in SizeEstimatorSuite
hammer Nov 14, 2014
ade72c4
[SPARK-4239] [SQL] support view in HiveQl
adrian-wang Nov 14, 2014
bbd8f5b
[SPARK-4245][SQL] Fix containsNull of the result ArrayType of CreateA…
ueshin Nov 14, 2014
f5f757e
SPARK-4375. no longer require -Pscala-2.10
sryza Nov 14, 2014
0cbdb01
[SPARK-4333][SQL] Correctly log number of iterations in RuleExecutor
pzzs Nov 14, 2014
5930f64
[SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming K…
jerryshao Nov 14, 2014
a0300ea
[SPARK-4390][SQL] Handle NaN cast to decimal correctly
marmbrus Nov 14, 2014
e47c387
[SPARK-4391][SQL] Configure parquet filters using SQLConf
marmbrus Nov 14, 2014
f805025
[SQL] Minor cleanup of comments, errors and override.
marmbrus Nov 14, 2014
4b4b50c
[SQL] Don't shuffle code generated rows
marmbrus Nov 14, 2014
0c7b66b
[SPARK-4322][SQL] Enables struct fields as sub expressions of groupin…
liancheng Nov 14, 2014
f76b968
[SPARK-4386] Improve performance when writing Parquet files.
Nov 14, 2014
63ca3af
[SPARK-4365][SQL] Remove unnecessary filter call on records returned …
Nov 14, 2014
37482ce
[SPARK-4412][SQL] Fix Spark's control of Parquet logging.
Nov 14, 2014
ad42b28
SPARK-4214. With dynamic allocation, avoid outstanding requests for m…
sryza Nov 14, 2014
303a4e4
[SPARK-4404]SparkSubmitDriverBootstrapper should stop after its Spark…
WangTaoTheTonic Nov 15, 2014
7fe08b4
[SPARK-4415] [PySpark] JVM should exit after Python exit
Nov 15, 2014
dba1405
[SPARK-4379][Core] Change Exception to SparkException in checkpoint
zsxwing Nov 15, 2014
861223e
[SPARK-4363][Doc] Update the Broadcast example
zsxwing Nov 15, 2014
60969b0
[SPARK-4260] Httpbroadcast should set connection timeout.
sarutak Nov 15, 2014
cbddac2
Added contains(key) to Metadata
Nov 15, 2014
40eb8b6
[SPARK-2321] Several progress API improvements / refactorings
JoshRosen Nov 15, 2014
7d8e152
[SPARK-4419] Upgrade snappy-java to 1.1.1.6
JoshRosen Nov 16, 2014
84468b2
[SPARK-4426][SQL][Minor] The symbol of BitwiseOr is wrong, should not…
sarutak Nov 16, 2014
7850e0c
[SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTa…
JoshRosen Nov 16, 2014
cb6bd83
[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, an…
liancheng Nov 16, 2014
45ce327
Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift se…
marmbrus Nov 16, 2014
5168c6c
[SPARK-4422][MLLIB]In some cases, Vectors.fromBreeze get wrong results.
witgo Nov 17, 2014
64c6b9b
[SPARK-4410][SQL] Add support for external sort
marmbrus Nov 17, 2014
5c92d47
SPARK-4445, Don't display storage level in toDebugString unless RDD i…
ScrapCodes Nov 17, 2014
e7690ed
SPARK-2811 upgrade algebird to 0.8.1
Nov 17, 2014
9ac2bb1
[SPARK-4444] Drop VD type parameter from EdgeRDD
ankurdave Nov 17, 2014
dbb9da5
Revert "[SPARK-4075] [Deploy] Jar url validation is not enough for Ja…
Nov 17, 2014
cec1116
[DOCS][SQL] Fix broken link to Row class scaladoc
andyk Nov 17, 2014
0f3ceb5
[SPARK-4180] [Core] Prevent creation of multiple active SparkContexts
JoshRosen Nov 17, 2014
5ce7dae
[SQL] Makes conjunction pushdown more aggressive for in-memory table
liancheng Nov 17, 2014
3a81a1c
[SPARK-4420][SQL] Change nullability of Cast from DoubleType/FloatTyp…
ueshin Nov 18, 2014
566c791
[SPARK-4425][SQL] Handle NaN or Infinity cast to Timestamp correctly.
ueshin Nov 18, 2014
69e858c
[SQL] Construct the MutableRow from an Array
chenghao-intel Nov 18, 2014
6b7f2f7
[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, an…
liancheng Nov 18, 2014
42389b1
[SPARK-4443][SQL] Fix statistics for external table in spark sql hive
scwf Nov 18, 2014
ef7c464
[SPARK-4448] [SQL] unwrap for the ConstantObjectInspector
chenghao-intel Nov 18, 2014
36b0956
[SPARK-4453][SPARK-4213][SQL] Simplifies Parquet filter generation code
liancheng Nov 18, 2014
c6e0c2a
SPARK-4466: Provide support for publishing Scala 2.11 artifacts to Maven
pwendell Nov 18, 2014
cedc3b5
ALS implicit: added missing parameter alpha in doc string
felixmaximilian Nov 18, 2014
8fbf72b
[SPARK-4435] [MLlib] [PySpark] improve classification
Nov 18, 2014
b54c6ab
[SPARK-4396] allow lookup by index in Python's Rating
mengxr Nov 18, 2014
90d72ec
[SQL] Support partitioned parquet tables that have the key in both th…
marmbrus Nov 18, 2014
bfebfd8
[SPARK-4075][SPARK-4434] Fix the URI validation logic for Application…
sarutak Nov 18, 2014
80f3177
[SPARK-4404] remove sys.exit() in shutdown hook
Nov 18, 2014
e34f38f
[SPARK-4017] show progress bar in console
Nov 18, 2014
010bc86
[SPARK-4463] Add (de)select all button for add'l metrics.
kayousterhout Nov 18, 2014
d2e2951
[SPARK-4306] [MLlib] Python API for LogisticRegressionWithLBFGS
Nov 18, 2014
4a377af
[SPARK-3721] [PySpark] broadcast objects larger than 2G
Nov 19, 2014
bb46046
[SPARK-4433] fix a racing condition in zipWithIndex
mengxr Nov 19, 2014
7f22fa8
[SPARK-4327] [PySpark] Python API for RDD.randomSplit()
Nov 19, 2014
423baea
[SPARK-4468][SQL] Fixes Parquet filter creation for inequality predic…
liancheng Nov 19, 2014
397d3aa
Bumping version to 1.3.0-SNAPSHOT.
Nov 19, 2014
67e9876
[SPARK-4441] Close Tachyon client when TachyonBlockManager is shutdown
Nov 19, 2014
165cec9
[Spark-4432]close InStream after the block is accessed
Nov 19, 2014
8327df6
MAINTENANCE: Automated closing of pull requests.
pwendell Nov 19, 2014
5f5ac2d
SPARK-4455 Exclude dependency on hbase-annotations module
tedyu Nov 19, 2014
d75579d
[SPARK-4467] fix elements read count for ExtrenalSorter
tsdeng Nov 19, 2014
eacc788
[SPARK-4470] Validate number of threads in local mode
kmaehashi Nov 19, 2014
22fc4e7
[SPARK-4482][Streaming] Disable ReceivedBlockTracker's write ahead lo…
tdas Nov 19, 2014
3bf7cee
[SPARK-4481][Streaming][Doc] Fix the wrong description of updateFunc
zsxwing Nov 19, 2014
0df02ca
[HOT FIX] MiMa tests are broken
Nov 19, 2014
1c93841
SPARK-3962 Marked scope as provided for external projects.
ScrapCodes Nov 19, 2014
9b7bbce
[DOC][PySpark][Streaming] Fix docstring for sphinx
giwa Nov 19, 2014
f9adda9
[SPARK-4429][BUILD] Build for Scala 2.11 using sbt fails.
ueshin Nov 19, 2014
73c8ea8
[SPARK-4384] [PySpark] improve sort spilling
Nov 19, 2014
c3002c4
[SPARK-4294][Streaming] UnionDStream stream should express the requir…
watermen Nov 19, 2014
04d462f
[SPARK-4495] Fix memory leak in JobProgressListener
JoshRosen Nov 20, 2014
377b068
Updating GraphX programming guide and documentation
jegonzal Nov 20, 2014
9ccc53c
[SPARK-4478] Keep totalRegisteredExecutors up-to-date
coolfrood Nov 20, 2014
73fedf5
[Spark-4484] Treat maxResultSize as unlimited when set to 0; improve …
nishkamravi2 Nov 20, 2014
0eb4a7f
[SPARK-4480] Avoid many small spills in external data structures
Nov 20, 2014
e216ffa
[SPARK-4446] [SPARK CORE]
Nov 20, 2014
15cacc8
[SPARK-4486][MLLIB] Improve GradientBoosting APIs and doc
mengxr Nov 20, 2014
abf2918
[SPARK-3938][SQL] Names in-memory columnar RDD with corresponding tab…
liancheng Nov 20, 2014
b8e6886
[SPARK-4228][SQL] SchemaRDD to JSON
dwmclary Nov 20, 2014
1c53a5d
[SPARK-4439] [MLlib] add python api for random forest
Nov 20, 2014
98e9419
[SPARK-4513][SQL] Support relational operator '<=>' in Spark SQL
ravipesala Nov 20, 2014
2c2e7a4
[SPARK-4318][SQL] Fix empty sum distinct.
ueshin Nov 20, 2014
6aa0fc9
[SPARK-2918] [SQL] Support the CTAS in EXPLAIN command
chenghao-intel Nov 20, 2014
ad5f1f3
[SQL] fix function description mistake
Nov 20, 2014
d39f2e9
[SPARK-4477] [PySpark] remove numpy from RDDSampler
Nov 21, 2014
84d79ee
[SPARK-4244] [SQL] Support Hive Generic UDFs with constant object ins…
chenghao-intel Nov 21, 2014
02ec058
[SPARK-4413][SQL] Parquet support through datasource API
marmbrus Nov 21, 2014
8cd6eea
add Sphinx as a dependency of building docs
Nov 21, 2014
90a6a46
[SPARK-4522][SQL] Parse schema with missing metadata.
marmbrus Nov 21, 2014
b97070e
[Doc][GraphX] Remove Motivation section and did some minor update.
rxin Nov 21, 2014
28fdc6f
[Doc][GraphX] Remove unused png files.
rxin Nov 21, 2014
f1069b8
[SPARK-4472][Shell] Print "Spark context available as sc." only when …
zsxwing Nov 21, 2014
65b987c
[SPARK-4397][Core] Reorganize 'implicit's to improve the API convenience
zsxwing Nov 21, 2014
a81918c
SPARK-4532: Fix bug in detection of Hive in Spark 1.2
pwendell Nov 21, 2014
ce95bd8
[SPARK-4531] [MLlib] cache serialized java object
Nov 21, 2014
b5d17ef
[SPARK-4431][MLlib] Implement efficient foreachActive for dense and s…
Nov 22, 2014
9b2a3c6
[SPARK-4377] Fixed serialization issue by switching to akka provided …
ScrapCodes Nov 22, 2014
29372b6
SPARK-4457. Document how to build for Hadoop versions greater than 2.4
sryza Nov 24, 2014
a6d7b61
[SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based…
liancheng Nov 24, 2014
d5834f0
[SQL] Fix comment in HiveShim
darabos Nov 24, 2014
b384119
[SQL] Fix path in HiveFromSpark
scwf Nov 24, 2014
dd1c9cb
[SPARK-4487][SQL] Fix attribute reference resolution error when using…
sarutak Nov 24, 2014
4a90276
[SPARK-4145] Web UI job pages
JoshRosen Nov 24, 2014
cb0e9b0
[SPARK-4518][SPARK-4519][Streaming] Refactored file stream to prevent…
tdas Nov 24, 2014
b660de7
[SPARK-4562] [MLlib] speedup vector
Nov 25, 2014
050616b
[SPARK-4578] fix asDict() with nested Row()
Nov 25, 2014
6cf5076
[SPARK-4548] []SPARK-4517] improve performance of python broadcast
Nov 25, 2014
d24d5bf
[SPARK-4266] [Web-UI] Reduce stage page load time.
kayousterhout Nov 25, 2014
b043c27
[SPARK-4525] Mesos should decline unused offers
pwendell Nov 25, 2014
a68d442
Revert "[SPARK-4525] Mesos should decline unused offers"
pwendell Nov 25, 2014
f0afb62
[SPARK-4525] Mesos should decline unused offers
jongyoul Nov 25, 2014
9ce2bf3
[SPARK-4582][MLLIB] get raw vectors for further processing in Word2Vec
Nov 25, 2014
723be60
[SQL] Compute timeTaken correctly
scwf Nov 25, 2014
0fe54cf
[DOC][Build] Wrong cmd for build spark with apache hadoop 2.4.X and h…
scwf Nov 25, 2014
89f9122
[SPARK-4596][MLLib] Refactorize Normalizer to make code cleaner
Nov 25, 2014
f515f94
[SPARK-4526][MLLIB]GradientDescent get a wrong gradient value accordi…
witgo Nov 25, 2014
a51118a
[SPARK-4535][Streaming] Fix the error in comments
watermen Nov 25, 2014
fef27b2
[SPARK-4381][Streaming]Add warning log when user set spark.master to …
jerryshao Nov 25, 2014
d240760
[SPARK-4344][DOCS] adding documentation on spark.yarn.user.classpath.…
arahuja Nov 25, 2014
69cd53e
[SPARK-4601][Streaming] Set correct call site for streaming jobs so t…
tdas Nov 25, 2014
bf1a6aa
[SPARK-4581][MLlib] Refactorize StandardScaler to improve the transfo…
Nov 25, 2014
8838ad7
[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDS…
tdas Nov 25, 2014
1b2ab1c
[SPARK-4592] Avoid duplicate worker registrations in standalone mode
Nov 25, 2014
9afcbe4
[SPARK-4546] Improve HistoryServer first time user experience
Nov 25, 2014
9bdf5da
Fix SPARK-4471: blockManagerIdFromJson function throws exception whil…
suyanNone Nov 25, 2014
7eba0fb
[Spark-4509] Revert EC2 tag-based cluster membership patch
mengxr Nov 26, 2014
c251fd7
[SPARK-4583] [mllib] LogLoss for GradientBoostedTrees fix + doc updates
jkbradley Nov 26, 2014
4d95526
[HOTFIX]: Adding back without-hive dist
pwendell Nov 26, 2014
b5fb141
[SPARK-4604][MLLIB] make MatrixFactorizationModel public
mengxr Nov 26, 2014
f5f2d27
[SPARK-4516] Cap default number of Netty threads at 8
aarondav Nov 26, 2014
346bc17
[SPARK-4516] Avoid allocating Netty PooledByteBufAllocators unnecessa…
aarondav Nov 26, 2014
e7f4d25
[SPARK-4612] Reduce task latency and increase scheduling throughput b…
tdas Nov 26, 2014
288ce58
Removing confusing TripletFields
jegonzal Nov 26, 2014
561d31d
[SPARK-4614][MLLIB] Slight API changes in Matrix and Matrices
mengxr Nov 26, 2014
5af53ad
[SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on…
CodingCat Nov 27, 2014
c86e9bc
[Release] Automate generation of contributors list
Nov 27, 2014
5d7fe17
SPARK-4170 [CORE] Closure problems when running Scala app that "exten…
srowen Nov 27, 2014
84376d3
[SPARK-4626] Kill a task only if the executorId is (still) registered…
roxchkplusony Nov 27, 2014
120a350
[SPARK-4613][Core] Java API for JdbcRDD
liancheng Nov 28, 2014
ceb6281
[SPARK-4619][Storage]delete redundant time suffix
Nov 28, 2014
5b99bf2
[SPARK-4645][SQL] Disables asynchronous execution in Hive 0.13.1 Hive…
liancheng Nov 28, 2014
052e658
Delete unnecessary function
Nov 28, 2014
53ed7f1
[SPARK-4643] [Build] Remove unneeded staging repositories from build
adrian-wang Nov 28, 2014
e464f0a
[SPARK-4193][BUILD] Disable doclint in Java 8 to prevent from build e…
ueshin Nov 28, 2014
915f8ee
[SPARK-4584] [yarn] Remove security manager from Yarn AM.
Nov 28, 2014
48223d8
SPARK-1450 [EC2] Specify the default zone in the EC2 script help
srowen Nov 28, 2014
49fe879
[SPARK-4597] Use proper exception and reset variable in Utils.createT…
viirya Nov 29, 2014
047ff57
MAINTENANCE: Automated closing of pull requests.
pwendell Nov 29, 2014
317e114
[SPARK-3398] [SPARK-4325] [EC2] Use EC2 status checks.
nchammas Nov 29, 2014
95290bf
Include the key name when failing on an invalid value.
Nov 30, 2014
938dc14
[SPARK-4057] Use -agentlib instead of -Xdebug in sbt-launch-lib.bash …
sarutak Nov 30, 2014
c062224
[SPARK-4505][Core] Add a ClassTag parameter to CompactBuffer[T]
zsxwing Nov 30, 2014
4316a7b
SPARK-4507: PR merge script should support closing multiple JIRA tickets
hase1031 Nov 30, 2014
0fcd24c
[DOCS][BUILD] Add instruction to use change-version-to-2.11.sh in 'Bu…
ueshin Nov 30, 2014
048ecca
SPARK-2143 [WEB UI] Add Spark version to UI footer
srowen Nov 30, 2014
aea7a99
[SPARK-4623]Add the some error infomation if using spark-sql in yarn-…
SaintBacchus Dec 1, 2014
a217ec5
[SPARK-4656][Doc] Typo in Programming Guide markdown
Lewuathe Dec 1, 2014
2a4d389
[DOC] Fixes formatting typo in SQL programming guide
liancheng Dec 1, 2014
06dc1b1
MAINTENANCE: Automated closing of pull requests.
pwendell Dec 1, 2014
5e7a6dc
[SPARK-4632] version update
prabeesh Dec 1, 2014
97eb6d7
Fix wrong file name pattern in .gitignore
sarutak Dec 1, 2014
6384f42
SPARK-2192 [BUILD] Examples Data Not in Binary Distribution
srowen Dec 1, 2014
1d238f2
[SPARK-4664][Core] Throw an exception when spark.akka.frameSize > 2047
zsxwing Dec 1, 2014
30a86ac
[SPARK-4661][Core] Minor code and docs cleanup
zsxwing Dec 1, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time.

Author: Reynold Xin <[email protected]>
Author: Reynold Xin <[email protected]>

Closes apache#3172 from rxin/lazyFD and squashes the following commits:

0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion
d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager.
6ed369e [Reynold Xin] Code review feedback.
04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
  • Loading branch information
rxin authored and aarondav committed Nov 11, 2014
commit ef29a9a9aa85468869eb67ca67b66c65f508d0ee
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()

private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
private val blockHandler = new ExternalShuffleBlockHandler()
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
new TransportContext(transportConf, handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.storage._
Expand Down Expand Up @@ -68,6 +69,8 @@ private[spark]
class FileShuffleBlockManager(conf: SparkConf)
extends ShuffleBlockManager with Logging {

private val transportConf = SparkTransportConf.fromSparkConf(conf)

private lazy val blockManager = SparkEnv.get.blockManager

// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
Expand Down Expand Up @@ -182,13 +185,14 @@ class FileShuffleBlockManager(conf: SparkConf)
val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
if (segmentOpt.isDefined) {
val segment = segmentOpt.get
return new FileSegmentManagedBuffer(segment.file, segment.offset, segment.length)
return new FileSegmentManagedBuffer(
transportConf, segment.file, segment.offset, segment.length)
}
}
throw new IllegalStateException("Failed to find shuffle block: " + blockId)
} else {
val file = blockManager.diskBlockManager.getFile(blockId)
new FileSegmentManagedBuffer(file, 0, file.length)
new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import java.nio.ByteBuffer

import com.google.common.io.ByteStreams

import org.apache.spark.SparkEnv
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._

/**
Expand All @@ -38,10 +39,12 @@ import org.apache.spark.storage._
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
private[spark]
class IndexShuffleBlockManager extends ShuffleBlockManager {
class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {

private lazy val blockManager = SparkEnv.get.blockManager

private val transportConf = SparkTransportConf.fromSparkConf(conf)

/**
* Mapping to a single shuffleBlockId with reduce ID 0.
* */
Expand Down Expand Up @@ -109,6 +112,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
transportConf,
getDataFile(blockId.shuffleId, blockId.mapId),
offset,
nextOffset - offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {

private val indexShuffleBlockManager = new IndexShuffleBlockManager()
private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {

override def beforeAll() {
val transportConf = SparkTransportConf.fromSparkConf(conf)
rpcHandler = new ExternalShuffleBlockHandler()
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,19 @@

import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.network.util.TransportConf;

/**
* A {@link ManagedBuffer} backed by a segment in a file.
*/
public final class FileSegmentManagedBuffer extends ManagedBuffer {

/**
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
* Avoid unless there's a good reason not to.
*/
// TODO: Make this configurable
private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;

private final TransportConf conf;
private final File file;
private final long offset;
private final long length;

public FileSegmentManagedBuffer(File file, long offset, long length) {
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
this.conf = conf;
this.file = file;
this.offset = offset;
this.length = length;
Expand All @@ -65,7 +60,7 @@ public ByteBuffer nioByteBuffer() throws IOException {
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < MIN_MEMORY_MAP_BYTES) {
if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
Expand Down Expand Up @@ -134,8 +129,12 @@ public ManagedBuffer release() {

@Override
public Object convertToNetty() throws IOException {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
if (conf.lazyFileDescriptor()) {
return new LazyFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}

public File getFile() { return file; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.buffer;

import java.io.FileInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;

import com.google.common.base.Objects;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;

import org.apache.spark.network.util.JavaUtils;

/**
* A FileRegion implementation that only creates the file descriptor when the region is being
* transferred. This cannot be used with Epoll because there is no native support for it.
*
* This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
* should push this into Netty so the native Epoll transport can support this feature.
*/
public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {

private final File file;
private final long position;
private final long count;

private FileChannel channel;

private long numBytesTransferred = 0L;

/**
* @param file file to transfer.
* @param position start position for the transfer.
* @param count number of bytes to transfer starting from position.
*/
public LazyFileRegion(File file, long position, long count) {
this.file = file;
this.position = position;
this.count = count;
}

@Override
protected void deallocate() {
JavaUtils.closeQuietly(channel);
}

@Override
public long position() {
return position;
}

@Override
public long transfered() {
return numBytesTransferred;
}

@Override
public long count() {
return count;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
if (channel == null) {
channel = new FileInputStream(file).getChannel();
}

long count = this.count - position;
if (count < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
}

if (count == 0) {
return 0L;
}

long written = channel.transferTo(this.position + position, count, target);
if (written > 0) {
numBytesTransferred += written;
}
return written;
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("file", file)
.add("position", position)
.add("count", count)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,21 @@ public int connectionTimeoutMs() {
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }

/**
* Minimum size of a block that we should start using memory map rather than reading in through
* normal IO operations. This prevents Spark from memory mapping very small blocks. In general,
* memory mapping has high overhead for blocks close to or below the page size of the OS.
*/
public int memoryMapBytes() {
return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
}

/**
* Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
* created only when data is going to be transferred. This can reduce the number of open files.
*/
public boolean lazyFileDescriptor() {
return conf.getBoolean("spark.shuffle.io.lazyFD", true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class ChunkFetchIntegrationSuite {
static ManagedBuffer bufferChunk;
static ManagedBuffer fileChunk;

private TransportConf transportConf;

@BeforeClass
public static void setUp() throws Exception {
int bufSize = 100000;
Expand All @@ -80,17 +82,18 @@ public static void setUp() throws Exception {
new Random().nextBytes(fileContent);
fp.write(fileContent);
fp.close();
fileChunk = new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);

TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);

streamManager = new StreamManager() {
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
assertEquals(STREAM_ID, streamId);
if (chunkIndex == BUFFER_CHUNK_INDEX) {
return new NioManagedBuffer(buf);
} else if (chunkIndex == FILE_CHUNK_INDEX) {
return new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
return new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
} else {
throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
private final ExternalShuffleBlockManager blockManager;
private final OneForOneStreamManager streamManager;

public ExternalShuffleBlockHandler() {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
public ExternalShuffleBlockHandler(TransportConf conf) {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
}

/** Enables mocking out the StreamManager and BlockManager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

/**
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
Expand All @@ -56,14 +57,17 @@ public class ExternalShuffleBlockManager {
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;

public ExternalShuffleBlockManager() {
private final TransportConf conf;

public ExternalShuffleBlockManager(TransportConf conf) {
// TODO: Give this thread a name.
this(Executors.newSingleThreadExecutor());
this(conf, Executors.newSingleThreadExecutor());
}

// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
ExternalShuffleBlockManager(Executor directoryCleaner) {
ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
this.conf = conf;
this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
}
Expand Down Expand Up @@ -167,7 +171,7 @@ private void deleteExecutorDirs(String[] dirs) {
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
}

/**
Expand All @@ -187,6 +191,7 @@ private ManagedBuffer getSortBasedShuffleBlockData(
long offset = in.readLong();
long nextOffset = in.readLong();
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
offset,
Expand Down
Loading