Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
297 commits
Select commit Hold shift + click to select a range
39ccaba
[SPARK-3861][SQL] Avoid rebuilding hash tables for broadcast joins on…
rxin Oct 13, 2014
49bbdcb
[Spark] RDD take() method: overestimate too much
yingjieMiao Oct 13, 2014
46db277
[SPARK-3892][SQL] remove redundant type name
adrian-wang Oct 13, 2014
2ac40da
[SPARK-3407][SQL]Add Date type support
adrian-wang Oct 13, 2014
56102dc
[SPARK-2066][SQL] Adds checks for non-aggregate attributes with aggre…
liancheng Oct 13, 2014
d3cdf91
[SPARK-3529] [SQL] Delete the temp files after test exit
chenghao-intel Oct 13, 2014
73da9c2
[SPARK-3771][SQL] AppendingParquetOutputFormat should use reflection …
ueshin Oct 13, 2014
e10d71e
[SPARK-3559][SQL] Remove unnecessary columns from List of needed Colu…
gvramana Oct 13, 2014
371321c
[SQL] Add type checking debugging functions
marmbrus Oct 13, 2014
e6e3770
SPARK-3807: SparkSql does not work for tables created using custom serde
chiragaggarwal Oct 13, 2014
9d9ca91
[SQL]Small bug in unresolved.scala
Ishiihara Oct 13, 2014
9eb49d4
[SPARK-3809][SQL] Fixes test suites in hive-thriftserver
liancheng Oct 13, 2014
4d26aca
[SPARK-3912][Streaming] Fixed flakyFlumeStreamSuite
tdas Oct 14, 2014
186b497
[SPARK-3921] Fix CoarseGrainedExecutorBackend's arguments for Standal…
aarondav Oct 14, 2014
9b6de6f
SPARK-3178 setting SPARK_WORKER_MEMORY to a value without a label (m…
bbejeck Oct 14, 2014
7ced88b
[SPARK-3946] gitignore in /python includes wrong directory
tsudukim Oct 14, 2014
24b818b
[SPARK-3944][Core] Using Option[String] where value of String can be …
Shiti Oct 14, 2014
56096db
SPARK-3803 [MLLIB] ArrayIndexOutOfBoundsException found in executing …
srowen Oct 14, 2014
7b4f39f
[SPARK-3869] ./bin/spark-class miss Java version with _JAVA_OPTIONS set
cocoatomo Oct 14, 2014
66af8e2
[SPARK-3943] Some scripts bin\*.cmd pollutes environment variables in…
tsudukim Oct 15, 2014
18ab6bd
SPARK-1307 [DOCS] Don't use term 'standalone' to refer to a Spark App…
srowen Oct 15, 2014
293a0b5
[SPARK-2098] All Spark processes should support spark-defaults.conf, …
witgo Oct 15, 2014
044583a
[Core] Upgrading ScalaStyle version to 0.5 and removing SparkSpaceAft…
prudhvi953 Oct 16, 2014
4c589ca
[SPARK-3944][Core] Code re-factored as suggested
Shiti Oct 16, 2014
091d32c
[SPARK-3971] [MLLib] [PySpark] hotfix: Customized pickler should work…
davies Oct 16, 2014
99e416b
[SQL] Fixes the race condition that may cause test failure
liancheng Oct 16, 2014
2fe0ba9
SPARK-3874: Provide stable TaskContext API
ScrapCodes Oct 17, 2014
7f7b50e
[SPARK-3923] Increase Akka heartbeat pause above heartbeat interval
aarondav Oct 17, 2014
be2ec4a
[SQL]typo in HiveFromSpark
Oct 17, 2014
642b246
[SPARK-3941][CORE] _remainingmem should not increase twice when updat…
liyezhang556520 Oct 17, 2014
e7f4ea8
[SPARK-3890][Docs]remove redundant spark.executor.memory in doc
WangTaoTheTonic Oct 17, 2014
56fd34a
[SPARK-3741] Add afterExecute for handleConnectExecutor
zsxwing Oct 17, 2014
dedace8
[SPARK-3067] JobProgressPage could not show Fair Scheduler Pools sect…
YanTangZhai Oct 17, 2014
e678b9f
[SPARK-3973] Print call site information for broadcasts
shivaram Oct 17, 2014
c351862
[SPARK-3935][Core] log the number of records that has been written
Oct 17, 2014
803e7f0
[SPARK-3979] [yarn] Use fs's default replication.
Oct 17, 2014
adcb7d3
[SPARK-3855][SQL] Preserve the result attribute of python UDFs though…
marmbrus Oct 17, 2014
23f6171
[SPARK-3985] [Examples] fix file path using os.path.join
adrian-wang Oct 17, 2014
477c648
[SPARK-3934] [SPARK-3918] [mllib] Bug fixes for RandomForest, Decisi…
jkbradley Oct 17, 2014
f406a83
SPARK-3926 [CORE] Result of JavaRDD.collectAsMap() is not Serializable
srowen Oct 18, 2014
05db2da
[SPARK-3952] [Streaming] [PySpark] add Python examples in Streaming P…
davies Oct 19, 2014
7e63bb4
[SPARK-2546] Clone JobConf for each task (branch-1.0 / 1.1 backport)
JoshRosen Oct 19, 2014
d1966f3
[SPARK-3902] [SPARK-3590] Stabilize AsynRDDActions and add Java API
JoshRosen Oct 20, 2014
c7aeecd
[SPARK-3948][Shuffle]Fix stream corruption bug in sort-based shuffle
jerryshao Oct 20, 2014
51afde9
[SPARK-4010][Web UI]Spark UI returns 500 in yarn-client mode
witgo Oct 20, 2014
ea054e1
[SPARK-3986][SQL] Fix package names to fit their directory names.
ueshin Oct 20, 2014
4afe9a4
[SPARK-3736] Workers reconnect when disassociated from the master.
mccheah Oct 20, 2014
eadc4c5
[SPARK-3207][MLLIB]Choose splits for continuous features in DecisionT…
chouqin Oct 20, 2014
1b3ce61
[SPARK-3906][SQL] Adds multiple join support for SQLContext
liancheng Oct 20, 2014
e9c1afa
[SPARK-3800][SQL] Clean aliases from grouping expressions
marmbrus Oct 20, 2014
364d52b
[SPARK-3966][SQL] Fix nullabilities of Cast related to DateType.
ueshin Oct 20, 2014
fce1d41
[SPARK-3945]Properties of hive-site.xml is invalid in running the Thr…
luogankun Oct 20, 2014
7586e2e
[SPARK-3969][SQL] Optimizer should have a super class as an interface.
ueshin Oct 21, 2014
0fe1c09
[SPARK-3940][SQL] Avoid console printing error messages three times
wangxiaojing Oct 21, 2014
342b57d
Update Building Spark link.
rxin Oct 21, 2014
5a8f64f
[SPARK-3958] TorrentBroadcast cleanup / debugging improvements.
JoshRosen Oct 21, 2014
8570816
[SPARK-4023] [MLlib] [PySpark] convert rdd into RDD of Vector
Oct 21, 2014
2aeb84b
replace awaitTransformation with awaitTermination in scaladoc/javadoc
holdenk Oct 21, 2014
c262cd5
[SPARK-4035] Fix a wrong format specifier
zsxwing Oct 21, 2014
61ca774
[SPARK-4020] Do not rely on timeouts to remove failed block managers
andrewor14 Oct 21, 2014
1a623b2
SPARK-3770: Make userFeatures accessible from python
Oct 21, 2014
5fdaf52
[SPARK-3994] Use standard Aggregator code path for countByKey and cou…
aarondav Oct 21, 2014
814a9cd
SPARK-3568 [mllib] add ranking metrics
coderxiang Oct 21, 2014
856b081
[SQL]redundant methods for broadcast
scwf Oct 21, 2014
6bb56fa
SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
sryza Oct 22, 2014
bae4ca3
Update JavaCustomReceiver.java
Oct 22, 2014
f05e09b
use isRunningLocally rather than runningLocally
CrazyJvm Oct 22, 2014
97cf19f
Fix for sampling error in NumPy v1.9 [SPARK-3995][PYSPARK]
freeman-lab Oct 22, 2014
813effc
[SPARK-3426] Fix sort-based shuffle error when spark.shuffle.compress…
JoshRosen Oct 22, 2014
137d942
[SPARK-3877][YARN] Throw an exception when application is not success…
zsxwing Oct 22, 2014
c5882c6
[SPARK-3812] [BUILD] Adapt maven build to publish effective pom.
ScrapCodes Oct 23, 2014
d6a3025
[BUILD] Fixed resolver for scalastyle plugin and upgrade sbt version.
ScrapCodes Oct 23, 2014
f799700
[SPARK-4055][MLlib] Inconsistent spelling 'MLlib' and 'MLLib'
sarutak Oct 23, 2014
6b48522
[SPARK-4006] In long running contexts, we encountered the situation o…
tsliwowicz Oct 23, 2014
293672c
specify unidocGenjavadocVersion of 0.8
holdenk Oct 23, 2014
222fa47
Revert "[SPARK-3812] [BUILD] Adapt maven build to publish effective p…
pwendell Oct 23, 2014
83b7a1c
[SPARK-4019] [SPARK-3740] Fix MapStatus compression bug that could le…
JoshRosen Oct 23, 2014
e595c8d
[SPARK-3993] [PySpark] fix bug while reuse worker after take()
davies Oct 24, 2014
a29c9bd
[SPARK-4000][BUILD] Sends archived unit tests logs to Jenkins master
liancheng Oct 24, 2014
0aea228
SPARK-3812 Build changes to publish effective pom.
ScrapCodes Oct 24, 2014
809c785
[SPARK-2652] [PySpark] donot use KyroSerializer as default serializer
Oct 24, 2014
d2987e8
[SPARK-3900][YARN] ApplicationMaster's shutdown hook fails and Illega…
sarutak Oct 24, 2014
d60a9d4
[SPARK-4051] [SQL] [PySpark] Convert Row into dictionary
Oct 24, 2014
0e88661
[SPARK-4050][SQL] Fix caching of temporary tables with projections.
marmbrus Oct 24, 2014
7c89a8f
[SPARK-2706][SQL] Enable Spark to support Hive 0.13
zhzhan Oct 24, 2014
6a40a76
[SPARK-4026][Streaming] Write ahead log management
harishreedharan Oct 24, 2014
7aacb7b
[SPARK-2713] Executors of same application in same host should only d…
li-zhihui Oct 24, 2014
30ea286
[SPARK-4076] Parameter expansion in spark-config is wrong
sarutak Oct 24, 2014
098f83c
[SPARK-4075] [Deploy] Jar url validation is not enough for Jar file
sarutak Oct 24, 2014
b563987
[SPARK-4013] Do not create multiple actor systems on each executor
andrewor14 Oct 24, 2014
f80dcf2
[SPARK-4067] refactor ExecutorUncaughtExceptionHandler
Oct 24, 2014
07e439b
[GraphX] Modify option name according to example doc in SynthBenchmark
GraceH Oct 24, 2014
3a906c6
[HOTFIX][SQL] Remove sleep on reset() failure.
marmbrus Oct 24, 2014
6c98c29
[SPARK-4080] Only throw IOException from [write|read][Object|External]
JoshRosen Oct 24, 2014
898b22a
[SPARK-4056] Upgrade snappy-java to 1.1.1.5
JoshRosen Oct 25, 2014
3a845d3
[SQL] Update Hive test harness for Hive 12 and 13
marmbrus Oct 25, 2014
9530316
[SPARK-2321] Stable pull-based progress / status API
JoshRosen Oct 25, 2014
e41786c
[SPARK-4088] [PySpark] Python worker should exit after socket is clos…
Oct 25, 2014
2e52e4f
Revert "[SPARK-4056] Upgrade snappy-java to 1.1.1.5"
JoshRosen Oct 26, 2014
c683444
[SPARK-4071] Unroll fails silently if BlockManager is small
Oct 26, 2014
df7974b
SPARK-3359 [DOCS] sbt/sbt unidoc doesn't work with Java 8
srowen Oct 26, 2014
b759540
Update RoaringBitmap to 0.4.3
lemire Oct 26, 2014
bf589fc
[SPARK-3616] Add basic Selenium tests to WebUISuite
JoshRosen Oct 26, 2014
677852c
Just fixing comment that shows usage
AtlasPilotPuppy Oct 26, 2014
0af7e51
[SPARK-3925][SQL] Do not consider the ordering of qualifiers during c…
viirya Oct 26, 2014
879a165
[HOTFIX][SQL] Temporarily turn off hive-server tests.
marmbrus Oct 26, 2014
2838bf8
[SPARK-3537][SPARK-3914][SQL] Refines in-memory columnar table statis…
liancheng Oct 26, 2014
89e8a5d
[SPARK-3997][Build]scalastyle should output the error location
witgo Oct 26, 2014
dc51f4d
[SQL][DOC] Wrong package name "scala.math.sql" in sql-programming-gui…
sarutak Oct 26, 2014
d518bc2
[SPARK-3953][SQL][Minor] Confusable variable name.
sarutak Oct 26, 2014
0530842
[SPARK-4052][SQL] Use scala.collection.Map for pattern matching inste…
yhuai Oct 26, 2014
0481aaa
[SPARK-4068][SQL] NPE in jsonRDD schema inference
yhuai Oct 26, 2014
974d7b2
[SPARK-3483][SQL] Special chars in column names
ravipesala Oct 26, 2014
ace41e8
[SPARK-3959][SPARK-3960][SQL] SqlParser fails to parse literal -92233…
sarutak Oct 26, 2014
3a9d66c
[SPARK-4061][SQL] We cannot use EOL character in the operand of LIKE …
sarutak Oct 26, 2014
f4e8c28
[SPARK-4042][SQL] Append columns ids and names before broadcast
scwf Oct 26, 2014
6377ada
[SPARK-3970] Remove duplicate removal of local dirs
viirya Oct 27, 2014
9aa340a
[SPARK-4030] Make destroy public for broadcast variables
shivaram Oct 27, 2014
c9e05ca
[SPARK-4032] Deprecate YARN alpha support in Spark 1.2
ScrapCodes Oct 27, 2014
dea302d
SPARK-2621. Update task InputMetrics incrementally
sryza Oct 27, 2014
1d7bcc8
[SQL] Fixes caching related JoinSuite failure
liancheng Oct 27, 2014
bfa614b
SPARK-4022 [CORE] [MLLIB] Replace colt dependency (LGPL) with commons…
srowen Oct 27, 2014
7e3a1ad
[MLlib] SPARK-3987: add test case on objective value for NNLS
coderxiang Oct 28, 2014
418ad83
[SPARK-3911] [SQL] HiveSimpleUdf can not be optimized in constant fol…
chenghao-intel Oct 28, 2014
698a7ea
[SPARK-3816][SQL] Add table properties from storage handler to output…
alexoss68 Oct 28, 2014
89af6df
[SPARK-4041][SQL] Attributes names in table scan should converted to …
scwf Oct 28, 2014
27470d3
[SQL] Correct a variable name in JavaApplySchemaSuite.applySchemaToJSON
yhuai Oct 28, 2014
0c34fa5
[SPARK-3907][SQL] Add truncate table support
wangxiaojing Oct 28, 2014
7c0c26c
[SPARK-4064]NioBlockTransferService.fetchBlocks may cause spark to hang.
witgo Oct 28, 2014
4ceb048
fix broken links in README.md
ryan-williams Oct 28, 2014
46c6341
[SPARK-4107] Fix incorrect handling of read() and skip() return values
JoshRosen Oct 28, 2014
fae095b
[SPARK-3961] [MLlib] [PySpark] Python API for mllib.feature
Oct 28, 2014
47346cd
[SPARK-4116][YARN]Delete the abandoned log4j-spark-container.properties
WangTaoTheTonic Oct 28, 2014
e8813be
[SPARK-4095][YARN][Minor]extract val isLaunchingDriver in ClientBase
WangTaoTheTonic Oct 28, 2014
0ac52e3
[SPARK-4098][YARN]use appUIAddress instead of appUIHostPort in yarn-c…
WangTaoTheTonic Oct 28, 2014
7768a80
[SPARK-4031] Make torrent broadcast read blocks on use.
shivaram Oct 28, 2014
44d8b45
[SPARK-4110] Wrong comments about default settings in spark-daemon.sh
sarutak Oct 28, 2014
1ea3e3d
[SPARK-4096][YARN]let ApplicationMaster accept executor memory argume…
WangTaoTheTonic Oct 28, 2014
247c529
[SPARK-3657] yarn alpha YarnRMClientImpl throws NPE appMasterRequest.…
sarutak Oct 28, 2014
4d52cec
[SPARK-4089][Doc][Minor] The version number of Spark in _config.yaml …
sarutak Oct 28, 2014
2f254da
[SPARK-4065] Add check for IPython on Windows
msjgriffiths Oct 28, 2014
6c1b981
[SPARK-4058] [PySpark] Log file name is hard coded even though there …
sarutak Oct 28, 2014
5807cb4
[SPARK-3814][SQL] Support for Bitwise AND(&), OR(|) ,XOR(^), NOT(~) i…
ravipesala Oct 28, 2014
47a40f6
[SPARK-3988][SQL] add public API for date type
adrian-wang Oct 28, 2014
abcafcf
[Spark 3922] Refactor spark-core to use Utils.UTF_8
zsxwing Oct 28, 2014
4b55482
[SPARK-3343] [SQL] Add serde support for CTAS
chenghao-intel Oct 28, 2014
84e5da8
[SPARK-4084] Reuse sort key in Sorter
mengxr Oct 28, 2014
1536d70
[SPARK-4008] Fix "kryo with fold" in KryoSerializerSuite
zsxwing Oct 29, 2014
b5e79bf
[SPARK-3904] [SQL] add constant objectinspector support for udfs
chenghao-intel Oct 29, 2014
8c0bfd0
[SPARK-4133] [SQL] [PySpark] type conversionfor python udf
Oct 29, 2014
1559495
[FIX] disable benchmark code
mengxr Oct 29, 2014
51ce997
[SPARK-4129][MLlib] Performance tuning in MultivariateOnlineSummarizer
Oct 29, 2014
dff0155
[SPARK-3453] Netty-based BlockTransferService, extracted from Spark core
rxin Oct 29, 2014
3535467
[SPARK-4003] [SQL] add 3 types for java SQL context
adrian-wang Oct 29, 2014
1df05a4
[SPARK-3822] Executor scaling mechanism for Yarn
Oct 29, 2014
e7fd804
[SPARK-4097] Fix the race condition of 'thread'
zsxwing Oct 29, 2014
8d59b37
[SPARK-3795] Heuristics for dynamically scaling executors
andrewor14 Oct 30, 2014
1234258
[SPARK-4053][Streaming] Made the ReceiverSuite test more reliable, by…
tdas Oct 30, 2014
cd739bd
[SPARK-1720][SPARK-1719] use LD_LIBRARY_PATH instead of -Djava.librar…
witgo Oct 30, 2014
6db3157
[SPARK-4102] Remove unused ShuffleReader.stop() method.
kayousterhout Oct 30, 2014
c7ad085
[SPARK-4130][MLlib] Fixing libSVM parser bug with extra whitespace
jegonzal Oct 30, 2014
d932719
SPARK-4111 [MLlib] add regression metrics
Oct 30, 2014
234de92
[SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract th…
tdas Oct 30, 2014
fb1fbca
[SPARK-4027][Streaming] WriteAheadLogBackedBlockRDD to read received …
tdas Oct 30, 2014
9142c9b
[SPARK-4078] New FsPermission instance w/o FsPermission.createImmutab…
GraceH Oct 30, 2014
24c5129
[SPARK-3319] [SPARK-3338] Resolve Spark submit config paths
andrewor14 Oct 30, 2014
26f092d
[SPARK-4138][SPARK-4139] Improve dynamic allocation settings
Oct 30, 2014
5231a3f
[Minor] A few typos in comments and log messages
andrewor14 Oct 30, 2014
9334d69
[SPARK-4155] Consolidate usages of <driver>
Oct 30, 2014
849b43e
Minor style hot fix after #2711
Oct 30, 2014
d345057
[SPARK-4153][WebUI] Update the sort keys for HistoryPage
zsxwing Oct 30, 2014
2f54543
[SPARK-3661] Respect spark.*.memory in cluster mode
Oct 30, 2014
68cb69d
SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce}Util should not use pa…
srowen Oct 30, 2014
9b6ebe3
[SPARK-4120][SQL] Join of multiple tables with syntax like SELECT .. …
ravipesala Oct 31, 2014
2e35e24
[SPARK-3968][SQL] Use parquet-mr filter2 api
Oct 31, 2014
26d31d1
Revert "SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce}Util should no…
Oct 31, 2014
0734d09
HOTFIX: Clean up build in network module.
pwendell Oct 31, 2014
872fc66
[SPARK-4124] [MLlib] [PySpark] simplify serialization in MLlib Python…
Oct 31, 2014
ad3bd0d
[SPARK-3250] Implement Gap Sampling optimization for random sampling
erikerlandson Oct 31, 2014
d31517a
[SPARK-4108][SQL] Fixed usage of deprecated in sql/catalyst/types/dat…
AtlasPilotPuppy Oct 31, 2014
58a6077
[SPARK-4143] [SQL] Move inner class DeferredObjectAdapter to top level
chenghao-intel Oct 31, 2014
acd4ac7
SPARK-3837. Warn when YARN kills containers for exceeding memory limits
sryza Oct 31, 2014
adb6415
[SPARK-4016] Allow user to show/hide UI metrics.
kayousterhout Oct 31, 2014
7c41d13
[SPARK-3826][SQL]enable hive-thriftserver to support hive-0.13.1
scwf Oct 31, 2014
fa712b3
[SPARK-4077][SQL] Spark SQL return wrong values for valid string time…
gvramana Oct 31, 2014
ea465af
[SPARK-4154][SQL] Query does not work if it has "not between " in Spa…
ravipesala Oct 31, 2014
23468e7
[SPARK-2220][SQL] Fixes remaining Hive commands
liancheng Oct 31, 2014
a68ecf3
[SPARK-4141] Hide Accumulators column on stage page when no accumulat…
mmm Oct 31, 2014
f1e7361
[SPARK-4150][PySpark] return self in rdd.setName
mengxr Oct 31, 2014
55ab777
[SPARK-3870] EOL character enforcement
sarutak Oct 31, 2014
087e31a
[HOT FIX] Yarn stable tests don't compile
Oct 31, 2014
23f73f5
SPARK-4175. Exception on stage page
sryza Nov 1, 2014
62d01d2
[MLLIB] SPARK-2329 Add multi-label evaluation metrics
avulanov Nov 1, 2014
e07fb6a
[SPARK-3838][examples][mllib][python] Word2Vec example in python
AtlasPilotPuppy Nov 1, 2014
8602195
[MLLIB] SPARK-1547: Add Gradient Boosting to MLlib
manishamde Nov 1, 2014
98c556e
Streaming KMeans [MLLIB][SPARK-3254]
freeman-lab Nov 1, 2014
680fd87
Upgrading to roaring 0.4.5 (bug fix release)
lemire Nov 1, 2014
f4e0b28
[SPARK-4142][GraphX] Default numEdgePartitions
jegonzal Nov 1, 2014
ee29ef3
[SPARK-4115][GraphX] Add overrided count for edge counting of EdgeRDD.
luluorta Nov 1, 2014
7136719
[SPARK-2759][CORE] Generic Binary File Support in Spark
kmader Nov 1, 2014
59e626c
[SPARK-4183] Enable NettyBlockTransferService by default
aarondav Nov 1, 2014
1d4f355
[SPARK-3569][SQL] Add metadata field to StructField
mengxr Nov 1, 2014
f55218a
[SPARK-3796] Create external service which can serve shuffle files
aarondav Nov 1, 2014
ad0fde1
[SPARK-4037][SQL] Removes the SessionState instance created in HiveTh…
liancheng Nov 1, 2014
7894de2
Revert "[SPARK-4183] Enable NettyBlockTransferService by default"
pwendell Nov 1, 2014
d8176b1
[SPARK-4121] Set commons-math3 version based on hadoop profiles, inst…
mengxr Nov 1, 2014
56f2c61
[SPARK-3161][MLLIB] Adding a node Id caching mechanism for training d…
Nov 1, 2014
23f966f
[SPARK-3930] [SPARK-3933] Support fixed-precision decimal in SQL, and…
mateiz Nov 2, 2014
105c5a3
Adding UserDefinedType to SQL, not done yet.
jkbradley Oct 3, 2014
0eaeb81
Still working on UDTs
jkbradley Oct 6, 2014
19b2f60
still working on UDTs
jkbradley Oct 6, 2014
982c035
still working on UDTs
jkbradley Oct 7, 2014
53de70f
more udts...
jkbradley Oct 7, 2014
8bebf24
commented out convertRowToScala for debugging
jkbradley Oct 7, 2014
273ac96
basic UDT is working, but deserialization has yet to be done
jkbradley Oct 8, 2014
39f8707
removed old udt suite
jkbradley Oct 8, 2014
04303c9
udts
jkbradley Oct 9, 2014
50f9726
udts
jkbradley Oct 9, 2014
893ee4c
udt finallly working
jkbradley Oct 9, 2014
964b32e
some cleanups
jkbradley Oct 9, 2014
fea04af
more cleanups
jkbradley Oct 9, 2014
b226b9e
Changing UDT to annotation
jkbradley Oct 10, 2014
3579035
udt annotation now working
jkbradley Oct 10, 2014
2f40c02
renamed UDT types
jkbradley Oct 10, 2014
e1f7b9c
blah
jkbradley Oct 10, 2014
34a5831
Added MLlib dependency on SQL.
jkbradley Oct 10, 2014
cd60cb4
Trying to get other SQL tests to run
jkbradley Oct 21, 2014
dff99d6
Added UDTs for Vectors in MLlib, plus DatasetExample using the UDTs
jkbradley Oct 22, 2014
85872f6
Allow schema calculation to be lazy, but ensure its available on exec…
marmbrus Oct 23, 2014
f025035
Cleanups before PR. Added new tests
jkbradley Oct 24, 2014
51e5282
fixed 1 test
jkbradley Oct 24, 2014
63626a4
Updated ScalaReflectionsSuite per @marmbrus suggestions
jkbradley Oct 24, 2014
759af7a
Added more doc to UserDefineType
jkbradley Oct 27, 2014
db16139
Added more doc for UserDefinedType. Removed unused code in Suite
jkbradley Oct 28, 2014
cfbc321
support UDT in parquet
mengxr Oct 28, 2014
3143ac3
remove unnecessary changes
mengxr Oct 28, 2014
87264a5
remove debug code
mengxr Oct 28, 2014
4500d8a
update example code
mengxr Oct 28, 2014
b028675
allow any type in UDT
mengxr Oct 28, 2014
7f29656
Moved udt case to top of all matches. Small cleanups
jkbradley Oct 28, 2014
8b242ea
Fixed merge error after last merge. Note: Last merge commit also rem…
jkbradley Oct 29, 2014
8de957c
Modified UserDefinedType to store Java class of user type so that reg…
jkbradley Oct 30, 2014
fa86b20
Removed Java UserDefinedType, and made UDTs private[spark] for now
jkbradley Oct 31, 2014
20630bc
fixed scalastyle
jkbradley Oct 31, 2014
6fddc1c
Made MyLabeledPoint into a Java Bean
jkbradley Oct 31, 2014
a571bb6
Removed old UDT code (registry and Java UDTs). Cleaned up other code…
jkbradley Oct 31, 2014
d063380
Cleaned up Java UDT Suite, and added warning about element ordering w…
jkbradley Oct 31, 2014
30ce5b2
updates based on code review
jkbradley Nov 2, 2014
5817b2b
style edits
jkbradley Nov 2, 2014
e13cd8a
Removed Vector UDTs
jkbradley Nov 2, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-3958] TorrentBroadcast cleanup / debugging improvements.
This PR makes several changes to TorrentBroadcast in order to make
it easier to reason about, which should help when debugging SPARK-3958.
The key changes:

- Remove all state from the global TorrentBroadcast object.  This state
  consisted mainly of configuration options, like the block size and
  compression codec, and was read by the blockify / unblockify methods.
  Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
  size was always determined by the first SparkConf that TorrentBroadast was
  initialized with; as a result, unit tests could not properly test
  TorrentBroadcast with different block sizes.

  Instead, blockifyObject and unBlockifyObject now accept compression codecs
  and blockSizes as arguments.  These arguments are supplied at the call sites
  inside of TorrentBroadcast instances.  Each TorrentBroadcast instance
  determines these values from SparkEnv's SparkConf.  I was careful to ensure
  that we do not accidentally serialize CompressionCodec or SparkConf objects
  as part of the TorrentBroadcast object.

- Remove special-case handling of local-mode in TorrentBroadcast.  I don't
  think that broadcast implementations should know about whether we're running
  in local mode.  If we want to optimize the performance of broadcast in local
  mode, then we should detect this at a higher level and use a dummy
  LocalBroadcastFactory implementation instead.

  Removing this code fixes a subtle error condition: in the old local mode
  code, a failure to find the broadcast in the local BlockManager would lead
  to an attempt to deblockify zero blocks, which could lead to confusing
  deserialization or decompression errors when we attempted to decompress
  an empty byte array.  This should never have happened, though: a failure to
  find the block in local mode is evidence of some other error.  The changes
  here will make it easier to debug those errors if they ever happen.

- Add a check that throws an exception when attempting to deblockify an
  empty array.

- Use ScalaCheck to add a test to check that TorrentBroadcast's
  blockifyObject and unBlockifyObject methods are inverses.

- Misc. cleanup and logging improvements.

Author: Josh Rosen <[email protected]>

Closes apache#2844 from JoshRosen/torrentbroadcast-bugfix and squashes the following commits:

1e8268d [Josh Rosen] Address Reynold's review comments
2a9fdfd [Josh Rosen] Address Reynold's review comments.
c3b08f9 [Josh Rosen] Update TorrentBroadcast tests to reflect removal of special local-mode optimizations.
5c22782 [Josh Rosen] Store broadcast variable's value in the driver.
33fc754 [Josh Rosen] Change blockify/unblockifyObject to accept serializer as argument.
618a872 [Josh Rosen] [SPARK-3958] TorrentBroadcast cleanup / debugging improvements.
  • Loading branch information
JoshRosen committed Oct 21, 2014
commit 5a8f64f33632fbf89d16cade2e0e66c5ed60760b
136 changes: 65 additions & 71 deletions core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Random

import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.io.ByteArrayChunkOutputStream
Expand All @@ -46,14 +47,12 @@ import org.apache.spark.util.io.ByteArrayChunkOutputStream
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
*
* When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
*
* @param obj object to broadcast
* @param isLocal whether Spark is running in local mode (single JVM process).
* @param id A unique identifier for the broadcast variable.
*/
private[spark] class TorrentBroadcast[T: ClassTag](
obj : T,
@transient private val isLocal: Boolean,
id: Long)
private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

/**
Expand All @@ -62,6 +61,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* blocks from the driver and/or other executors.
*/
@transient private var _value: T = obj
/** The compression codec to use, or None if compression is disabled */
@transient private var compressionCodec: Option[CompressionCodec] = _
/** Size of each block. Default value is 4MB. This value is only read by the broadcaster. */
@transient private var blockSize: Int = _

private def setConf(conf: SparkConf) {
compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
Some(CompressionCodec.createCodec(conf))
} else {
None
}
blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
}
setConf(SparkEnv.get.conf)

private val broadcastId = BroadcastBlockId(id)

Expand All @@ -76,23 +89,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* @return number of blocks this broadcast variable is divided into
*/
private def writeBlocks(): Int = {
// For local mode, just put the object in the BlockManager so we can find it later.
SparkEnv.get.blockManager.putSingle(
broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

if (!isLocal) {
val blocks = TorrentBroadcast.blockifyObject(_value)
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
}
blocks.length
} else {
0
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
SparkEnv.get.blockManager.putSingle(broadcastId, _value, StorageLevel.MEMORY_AND_DISK,
tellMaster = false)
val blocks =
TorrentBroadcast.blockifyObject(_value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
}
blocks.length
}

/** Fetch torrent blocks from the driver and/or other executors. */
Expand All @@ -104,29 +114,24 @@ private[spark] class TorrentBroadcast[T: ClassTag](

for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
val pieceId = BroadcastBlockId(id, "piece" + pid)

// First try getLocalBytes because there is a chance that previous attempts to fetch the
logDebug(s"Reading piece $pieceId of $broadcastId")
// First try getLocalBytes because there is a chance that previous attempts to fetch the
// broadcast blocks have already fetched some of the blocks. In that case, some blocks
// would be available locally (on this executor).
var blockOpt = bm.getLocalBytes(pieceId)
if (!blockOpt.isDefined) {
blockOpt = bm.getRemoteBytes(pieceId)
blockOpt match {
case Some(block) =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
SparkEnv.get.blockManager.putBytes(
pieceId,
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)

case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
SparkEnv.get.blockManager.putBytes(
pieceId,
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
block
}
// If we get here, the option is defined.
blocks(pid) = blockOpt.get
val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
blocks(pid) = block
}
blocks
}
Expand Down Expand Up @@ -156,6 +161,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
case Some(x) =>
_value = x.asInstanceOf[T]
Expand All @@ -167,7 +173,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val time = (System.nanoTime() - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")

_value = TorrentBroadcast.unBlockifyObject[T](blocks)
_value =
TorrentBroadcast.unBlockifyObject[T](blocks, SparkEnv.get.serializer, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
SparkEnv.get.blockManager.putSingle(
Expand All @@ -179,43 +186,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](


private object TorrentBroadcast extends Logging {
/** Size of each block. Default value is 4MB. */
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
private var initialized = false
private var conf: SparkConf = null
private var compress: Boolean = false
private var compressionCodec: CompressionCodec = null

def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
compress = conf.getBoolean("spark.broadcast.compress", true)
compressionCodec = CompressionCodec.createCodec(conf)
initialized = true
}
}
}

def stop() {
initialized = false
}

def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
val ser = SparkEnv.get.serializer.newInstance()
def blockifyObject[T: ClassTag](
obj: T,
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
val bos = new ByteArrayChunkOutputStream(blockSize)
val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
val ser = serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
bos.toArrays.map(ByteBuffer.wrap)
}

def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer]): T = {
def unBlockifyObject[T: ClassTag](
blocks: Array[ByteBuffer],
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): T = {
require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
val is = new SequenceInputStream(
asJavaEnumeration(blocks.iterator.map(block => new ByteBufferInputStream(block))))
val in: InputStream = if (compress) compressionCodec.compressedInputStream(is) else is

val ser = SparkEnv.get.serializer.newInstance()
val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
val ser = serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
serIn.close()
Expand All @@ -227,6 +220,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
logDebug(s"Unpersisting TorrentBroadcast $id")
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ import org.apache.spark.{SecurityManager, SparkConf}
*/
class TorrentBroadcastFactory extends BroadcastFactory {

override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
TorrentBroadcast.initialize(isDriver, conf)
}
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }

override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) = {
new TorrentBroadcast[T](value_, id)
}

override def stop() { TorrentBroadcast.stop() }
override def stop() { }

/**
* Remove all persisted state associated with the torrent broadcast with the given ID.
Expand Down
42 changes: 27 additions & 15 deletions core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.spark.broadcast

import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
import org.apache.spark.io.SnappyCompressionCodec
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage._


class BroadcastSuite extends FunSuite with LocalSparkContext {

private val httpConf = broadcastConf("HttpBroadcastFactory")
Expand Down Expand Up @@ -84,6 +87,24 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
}

test("TorrentBroadcast's blockifyObject and unblockifyObject are inverses") {
import org.apache.spark.broadcast.TorrentBroadcast._
val blockSize = 1024
val conf = new SparkConf()
val compressionCodec = Some(new SnappyCompressionCodec(conf))
val serializer = new JavaSerializer(conf)
val seed = 42
val rand = new Random(seed)
for (trial <- 1 to 100) {
val size = 1 + rand.nextInt(1024 * 10)
val data: Array[Byte] = new Array[Byte](size)
rand.nextBytes(data)
val blocks = blockifyObject(data, blockSize, serializer, compressionCodec)
val unblockified = unBlockifyObject[Array[Byte]](blocks, serializer, compressionCodec)
assert(unblockified === data)
}
}

test("Unpersisting HttpBroadcast on executors only in local mode") {
testUnpersistHttpBroadcast(distributed = false, removeFromDriver = false)
}
Expand Down Expand Up @@ -193,26 +214,17 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {

blockId = BroadcastBlockId(broadcastId, "piece0")
statuses = bmm.getBlockStatus(blockId, askSlaves = true)
assert(statuses.size === (if (distributed) 1 else 0))
assert(statuses.size === 1)
}

// Verify that blocks are persisted in both the executors and the driver
def afterUsingBroadcast(broadcastId: Long, bmm: BlockManagerMaster) {
var blockId = BroadcastBlockId(broadcastId)
var statuses = bmm.getBlockStatus(blockId, askSlaves = true)
if (distributed) {
assert(statuses.size === numSlaves + 1)
} else {
assert(statuses.size === 1)
}
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
assert(statuses.size === numSlaves + 1)

blockId = BroadcastBlockId(broadcastId, "piece0")
statuses = bmm.getBlockStatus(blockId, askSlaves = true)
if (distributed) {
assert(statuses.size === numSlaves + 1)
} else {
assert(statuses.size === 0)
}
assert(statuses.size === numSlaves + 1)
}

// Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver
Expand All @@ -224,7 +236,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
assert(statuses.size === expectedNumBlocks)

blockId = BroadcastBlockId(broadcastId, "piece0")
expectedNumBlocks = if (removeFromDriver || !distributed) 0 else 1
expectedNumBlocks = if (removeFromDriver) 0 else 1
statuses = bmm.getBlockStatus(blockId, askSlaves = true)
assert(statuses.size === expectedNumBlocks)
}
Expand Down