Skip to content

Commit a2abc51

Browse files
author
Chris Cope
committed
Merge remote-tracking branch 'upstream/master'
2 parents e623b78 + 0c7b452 commit a2abc51

File tree

76 files changed

+1167
-599
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1167
-599
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,15 @@ If your project is built with Maven, add this to your POM file's `<dependencies>
115115
</dependency>
116116

117117

118+
## A Note About Thrift JDBC server and CLI for Spark SQL
119+
120+
Spark SQL supports Thrift JDBC server and CLI.
121+
See sql-programming-guide.md for more information about those features.
122+
You can use those features by setting `-Phive-thriftserver` when building Spark as follows.
123+
124+
$ sbt/sbt -Phive-thriftserver assembly
125+
126+
118127
## Configuration
119128

120129
Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html)

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,24 @@ trait CompressionCodec {
4646

4747

4848
private[spark] object CompressionCodec {
49+
50+
private val shortCompressionCodecNames = Map(
51+
"lz4" -> classOf[LZ4CompressionCodec].getName,
52+
"lzf" -> classOf[LZFCompressionCodec].getName,
53+
"snappy" -> classOf[SnappyCompressionCodec].getName)
54+
4955
def createCodec(conf: SparkConf): CompressionCodec = {
5056
createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
5157
}
5258

5359
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
54-
val ctor = Class.forName(codecName, true, Utils.getContextOrSparkClassLoader)
60+
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
61+
val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
5562
.getConstructor(classOf[SparkConf])
5663
ctor.newInstance(conf).asInstanceOf[CompressionCodec]
5764
}
5865

59-
val DEFAULT_COMPRESSION_CODEC = classOf[SnappyCompressionCodec].getName
66+
val DEFAULT_COMPRESSION_CODEC = "snappy"
6067
}
6168

6269

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
238238
// If our vector's size has exceeded the threshold, request more memory
239239
val currentSize = vector.estimateSize()
240240
if (currentSize >= memoryThreshold) {
241-
val amountToRequest = (currentSize * (memoryGrowthFactor - 1)).toLong
241+
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
242242
// Hold the accounting lock, in case another thread concurrently puts a block that
243243
// takes up the unrolling space we just ensured here
244244
accountingLock.synchronized {
@@ -254,7 +254,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
254254
}
255255
}
256256
// New threshold is currentSize * memoryGrowthFactor
257-
memoryThreshold = currentSize + amountToRequest
257+
memoryThreshold += amountToRequest
258258
}
259259
}
260260
elementsUnrolled += 1

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
153153
val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
154154
taskEnd.reason match {
155155
case org.apache.spark.Success =>
156+
stageData.completedIndices.add(info.index)
156157
stageData.numCompleteTasks += 1
157158
(None, Option(taskEnd.taskMetrics))
158159
case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private[ui] class StageTableBase(
168168
<td valign="middle">{submissionTime}</td>
169169
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
170170
<td class="progress-cell">
171-
{makeProgressBar(stageData.numActiveTasks, stageData.numCompleteTasks,
171+
{makeProgressBar(stageData.numActiveTasks, stageData.completedIndices.size,
172172
stageData.numFailedTasks, s.numTasks)}
173173
</td>
174174
<td sorttable_customekey={inputRead.toString}>{inputReadWithUnit}</td>

core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs
1919

2020
import org.apache.spark.executor.TaskMetrics
2121
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
22+
import org.apache.spark.util.collection.OpenHashSet
2223

2324
import scala.collection.mutable.HashMap
2425

@@ -38,6 +39,7 @@ private[jobs] object UIData {
3839
class StageUIData {
3940
var numActiveTasks: Int = _
4041
var numCompleteTasks: Int = _
42+
var completedIndices = new OpenHashSet[Int]()
4143
var numFailedTasks: Int = _
4244

4345
var executorRunTime: Long = _

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -284,17 +284,32 @@ private[spark] object Utils extends Logging {
284284
/** Copy all data from an InputStream to an OutputStream */
285285
def copyStream(in: InputStream,
286286
out: OutputStream,
287-
closeStreams: Boolean = false)
287+
closeStreams: Boolean = false): Long =
288288
{
289+
var count = 0L
289290
try {
290-
val buf = new Array[Byte](8192)
291-
var n = 0
292-
while (n != -1) {
293-
n = in.read(buf)
294-
if (n != -1) {
295-
out.write(buf, 0, n)
291+
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
292+
// When both streams are File stream, use transferTo to improve copy performance.
293+
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
294+
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
295+
val size = inChannel.size()
296+
297+
// In case transferTo method transferred less data than we have required.
298+
while (count < size) {
299+
count += inChannel.transferTo(count, size - count, outChannel)
300+
}
301+
} else {
302+
val buf = new Array[Byte](8192)
303+
var n = 0
304+
while (n != -1) {
305+
n = in.read(buf)
306+
if (n != -1) {
307+
out.write(buf, 0, n)
308+
count += n
309+
}
296310
}
297311
}
312+
count
298313
} finally {
299314
if (closeStreams) {
300315
try {

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -745,12 +745,11 @@ private[spark] class ExternalSorter[K, V, C](
745745
try {
746746
out = new FileOutputStream(outputFile)
747747
for (i <- 0 until numPartitions) {
748-
val file = partitionWriters(i).fileSegment().file
749-
in = new FileInputStream(file)
750-
org.apache.spark.util.Utils.copyStream(in, out)
748+
in = new FileInputStream(partitionWriters(i).fileSegment().file)
749+
val size = org.apache.spark.util.Utils.copyStream(in, out, false)
751750
in.close()
752751
in = null
753-
lengths(i) = file.length()
752+
lengths(i) = size
754753
offsets(i + 1) = offsets(i) + lengths(i)
755754
}
756755
} finally {

core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,33 @@ class CompressionCodecSuite extends FunSuite {
5656
testCodec(codec)
5757
}
5858

59+
test("lz4 compression codec short form") {
60+
val codec = CompressionCodec.createCodec(conf, "lz4")
61+
assert(codec.getClass === classOf[LZ4CompressionCodec])
62+
testCodec(codec)
63+
}
64+
5965
test("lzf compression codec") {
6066
val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
6167
assert(codec.getClass === classOf[LZFCompressionCodec])
6268
testCodec(codec)
6369
}
6470

71+
test("lzf compression codec short form") {
72+
val codec = CompressionCodec.createCodec(conf, "lzf")
73+
assert(codec.getClass === classOf[LZFCompressionCodec])
74+
testCodec(codec)
75+
}
76+
6577
test("snappy compression codec") {
6678
val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
6779
assert(codec.getClass === classOf[SnappyCompressionCodec])
6880
testCodec(codec)
6981
}
82+
83+
test("snappy compression codec short form") {
84+
val codec = CompressionCodec.createCodec(conf, "snappy")
85+
assert(codec.getClass === classOf[SnappyCompressionCodec])
86+
testCodec(codec)
87+
}
7088
}

docs/building-with-maven.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
9696
mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -DskipTests clean package
9797
{% endhighlight %}
9898

99+
# Building Thrift JDBC server and CLI for Spark SQL
100+
101+
Spark SQL supports Thrift JDBC server and CLI.
102+
See sql-programming-guide.md for more information about those features.
103+
You can use those features by setting `-Phive-thriftserver` when building Spark as follows.
104+
{% highlight bash %}
105+
mvn -Phive-thriftserver assembly
106+
{% endhighlight %}
107+
99108
# Spark Tests in Maven
100109

101110
Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin).

0 commit comments

Comments
 (0)