Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
d7a06b8
Updated SparkConf class to add getOrCreate method. Started test suite…
Apr 13, 2015
a99032f
Spacing fix
Apr 14, 2015
e92caf7
[SPARK-6703] Added test to ensure that getOrCreate both allows creati…
Apr 14, 2015
8be2f83
Replaced match with if
Apr 14, 2015
733ec9f
Fixed some bugs in test code
Apr 14, 2015
dfec4da
Changed activeContext to AtomicReference
Apr 14, 2015
0e1567c
Got rid of unecessary option for AtomicReference
Apr 14, 2015
15e8dea
Updated comments and added MiMa Exclude
Apr 14, 2015
270cfe3
[SPARK-6703] Documentation fixes
Apr 14, 2015
cb0c6b7
Doc updates and code cleanup
Apr 14, 2015
8c884fa
Made getOrCreate synchronized
Apr 14, 2015
1dc0444
Added ref equality check
Apr 14, 2015
db9a963
Closing second spark context
Apr 17, 2015
5390fd9
Merge remote-tracking branch 'upstream/master' into SPARK-5932
Apr 18, 2015
09ea450
[SPARK-5932] Added byte string conversion to Jav utils
Apr 18, 2015
747393a
[SPARK-5932] Added unit tests for ByteString conversion
Apr 18, 2015
a9f4fcf
[SPARK-5932] Added unit tests for unit conversion
Apr 18, 2015
851d691
[SPARK-5932] Updated memoryStringToMb to use new interfaces
Apr 18, 2015
475370a
[SPARK-5932] Simplified ByteUnit code, switched to using longs. Updat…
Apr 18, 2015
0cdff35
[SPARK-5932] Updated to use bibibytes in method names. Updated spark.…
Apr 18, 2015
b809a78
[SPARK-5932] Updated spark.kryoserializer.buffer.max
Apr 18, 2015
eba4de6
[SPARK-5932] Updated spark.shuffle.file.buffer.kb
Apr 18, 2015
1fbd435
[SPARK-5932] Updated spark.broadcast.blockSize
Apr 18, 2015
2d15681
[SPARK-5932] Updated spark.executor.logs.rolling.size.maxBytes
Apr 18, 2015
ae7e9f6
[SPARK-5932] Updated spark.io.compression.snappy.block.size
Apr 18, 2015
afc9a38
[SPARK-5932] Updated spark.broadcast.blockSize and spark.storage.memo…
Apr 18, 2015
7a6c847
[SPARK-5932] Updated spark.shuffle.file.buffer
Apr 18, 2015
5d29f90
[SPARK-5932] Finished documentation updates
Apr 18, 2015
928469e
[SPARK-5932] Converted some longs to ints
Apr 18, 2015
35a7fa7
Minor formatting
Apr 18, 2015
0f4443e
Merge remote-tracking branch 'upstream/master' into SPARK-5932
Apr 18, 2015
f15f209
Fixed conversion of kryo buffer size
Apr 19, 2015
f32bc01
[SPARK-5932] Fixed error in API in SparkConf.scala where Kb conversio…
Apr 19, 2015
69e2f20
Updates to code
Apr 21, 2015
54b78b4
Simplified byteUnit class
Apr 21, 2015
c7803cd
Empty lines
Apr 21, 2015
fe286b4
Resolved merge conflict
Apr 21, 2015
d3d09b6
[SPARK-5932] Fixing error in KryoSerializer
Apr 21, 2015
84a2581
Added smoother handling of fractional values for size parameters. Thi…
Apr 21, 2015
8b43748
Fixed error in pattern matching for doubles
Apr 21, 2015
e428049
resolving merge conflict
Apr 22, 2015
3dfae96
Fixed some nits. Added automatic conversion of old paramter for kryos…
Apr 22, 2015
22413b1
Made MAX private
Apr 22, 2015
9ee779c
Simplified fraction matches
Apr 22, 2015
852a407
[SPARK-5932] Added much improved overflow handling. Can now handle si…
Apr 23, 2015
fc85733
Got rid of floating point math
Apr 24, 2015
2ab886b
Scala style
Apr 24, 2015
49a8720
Whitespace fix
Apr 24, 2015
11f6999
Nit fixes
Apr 24, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,74 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
Utils.timeStringAsMs(get(key, defaultValue))
}

/**
* Get a size parameter as bytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then bytes are assumed.
* @throws NoSuchElementException
*/
def getSizeAsBytes(key: String): Long = {
Utils.byteStringAsBytes(get(key))
}

/**
* Get a size parameter as bytes, falling back to a default if not set. If no
* suffix is provided then bytes are assumed.
*/
def getSizeAsBytes(key: String, defaultValue: String): Long = {
Utils.byteStringAsBytes(get(key, defaultValue))
}

/**
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Kibibytes are assumed.
* @throws NoSuchElementException
*/
def getSizeAsKb(key: String): Long = {
Utils.byteStringAsKb(get(key))
}

/**
* Get a size parameter as Kibibytes, falling back to a default if not set. If no
* suffix is provided then Kibibytes are assumed.
*/
def getSizeAsKb(key: String, defaultValue: String): Long = {
Utils.byteStringAsKb(get(key, defaultValue))
}

/**
* Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Mebibytes are assumed.
* @throws NoSuchElementException
*/
def getSizeAsMb(key: String): Long = {
Utils.byteStringAsMb(get(key))
}

/**
* Get a size parameter as Mebibytes, falling back to a default if not set. If no
* suffix is provided then Mebibytes are assumed.
*/
def getSizeAsMb(key: String, defaultValue: String): Long = {
Utils.byteStringAsMb(get(key, defaultValue))
}

/**
* Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Gibibytes are assumed.
* @throws NoSuchElementException
*/
def getSizeAsGb(key: String): Long = {
Utils.byteStringAsGb(get(key))
}

/**
* Get a size parameter as Gibibytes, falling back to a default if not set. If no
* suffix is provided then Gibibytes are assumed.
*/
def getSizeAsGb(key: String, defaultValue: String): Long = {
Utils.byteStringAsGb(get(key, defaultValue))
}

/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
Expand Down Expand Up @@ -407,7 +474,13 @@ private[spark] object SparkConf extends Logging {
"The spark.cache.class property is no longer being used! Specify storage levels using " +
"the RDD.persist() method instead."),
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
"Please use spark.{driver,executor}.userClassPathFirst instead."))
"Please use spark.{driver,executor}.userClassPathFirst instead."),
DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have a translation function for this, no? Like spark.yarn.applicationMaster.waitTries below.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automatic translation would fail with a NumberFormatException though if they try to pass in a correctly formatted new value for the old config (e.g. 64k) and there's nowhere to catch that exception in this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you added the alternate config, you can remove this entry.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to still provide this message to clarify that fractional values are no longer allowed.

"Please use spark.kryoserializer.buffer instead. The default value for " +
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
)

Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
}

Expand All @@ -432,6 +505,21 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
translation = s => s"${s.toLong * 10}s")),
"spark.reducer.maxSizeInFlight" -> Seq(
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
"spark.kryoserializer.buffer" ->
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${s.toDouble * 1000}k")),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This automatic translation may throw a NumberFormatException if someone tries to use the .mb parameter as "64k" (e.g. the correct new format). Is that a case we should be concerned with? There will be enough warnings and errors thrown for them to readily track down the problem and fix the erroneous config so this should be ok but want to confirm that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since you're adding the new config, it's fine to not allow the old config take the new style. If you really want to support that, you could try to parse the config using the new API in an exception handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very small nit, but I would put the Seq( on L510 to be consistent with the rest.

"spark.kryoserializer.buffer.max" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
"spark.shuffle.file.buffer" -> Seq(
AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
"spark.executor.logs.rolling.maxSize" -> Seq(
AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
"spark.io.compression.snappy.blockSize" -> Seq(
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
"spark.io.compression.lz4.blockSize" -> Seq(
AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
"spark.rpc.numRetries" -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
"spark.rpc.retry.wait" -> Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
} else {
None
}
blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
// Note: use getSizeAsKb (not bytes) to maintain compatiblity if no units are provided
blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
}
setConf(SparkEnv.get.conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[spark] object CompressionCodec {
/**
* :: DeveloperApi ::
* LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
* Block size can be configured by `spark.io.compression.lz4.block.size`.
* Block size can be configured by `spark.io.compression.lz4.blockSize`.
*
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
Expand All @@ -107,7 +107,7 @@ private[spark] object CompressionCodec {
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768)
val blockSize = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
new LZ4BlockOutputStream(s, blockSize)
}

Expand Down Expand Up @@ -137,7 +137,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
/**
* :: DeveloperApi ::
* Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
* Block size can be configured by `spark.io.compression.snappy.block.size`.
* Block size can be configured by `spark.io.compression.snappy.blockSize`.
*
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
Expand All @@ -153,7 +153,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
}

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
new SnappyOutputStream(s, blockSize)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ class KryoSerializer(conf: SparkConf)
with Logging
with Serializable {

private val bufferSizeMb = conf.getDouble("spark.kryoserializer.buffer.mb", 0.064)
if (bufferSizeMb >= 2048) {
throw new IllegalArgumentException("spark.kryoserializer.buffer.mb must be less than " +
s"2048 mb, got: + $bufferSizeMb mb.")
private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All - I don't know what the right solution here is. The old value simply can't work using the new framework. Fractional values are no longer supported and near as I can tell this is is the only instance of such usage. The only way to truly maintain backwards compatibility (short of throwing an exception) is to leave this as conf.getDouble but then this is an exception to the rule for how we handle size variables.

Proper handling of fractional values throughout the code will be more intrusive and will require a lot of code modification all for the sake of this one parameter.

This needs to be getSizeAsKb() and the value must be specified in the right format, otherwise an exception is thrown.


if (bufferSizeKb >= 2048) {
throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
s"2048 mb, got: + $bufferSizeKb mb.")
}
private val bufferSize = (bufferSizeMb * 1024 * 1024).toInt
private val bufferSize = (bufferSizeKb * 1024).toInt

val maxBufferSizeMb = conf.getInt("spark.kryoserializer.buffer.max.mb", 64)
val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
if (maxBufferSizeMb >= 2048) {
throw new IllegalArgumentException("spark.kryoserializer.buffer.max.mb must be less than " +
throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " +
s"2048 mb, got: + $maxBufferSizeMb mb.")
}
private val maxBufferSize = maxBufferSizeMb * 1024 * 1024
Expand Down Expand Up @@ -173,7 +174,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
"increase spark.kryoserializer.buffer.max.mb value.")
"increase spark.kryoserializer.buffer.max value.")
}
ByteBuffer.wrap(output.toBytes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class FileShuffleBlockManager(conf: SparkConf)
private val consolidateShuffleFiles =
conf.getBoolean("spark.shuffle.consolidateFiles", false)

private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
// Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024

/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockManager,
blocksByAddress,
serializer,
SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
val itr = blockFetcherItr.flatMap(unpackBlock)

val completionIter = CompletionIterator[T, Iterator[T]](itr, {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.util.Utils
private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {

val minMemoryMapBytes = blockManager.conf.getLong(
"spark.storage.memoryMapThreshold", 2 * 1024L * 1024L)
val minMemoryMapBytes = blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")

override def getSize(blockId: BlockId): Long = {
diskManager.getFile(blockId.name).length
Expand Down
53 changes: 40 additions & 13 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1041,21 +1041,48 @@ private[spark] object Utils extends Logging {
}

/**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
* Convert a passed byte string (e.g. 50b, 100k, or 250m) to bytes for internal use.
*
* If no suffix is provided, the passed number is assumed to be in bytes.
*/
def byteStringAsBytes(str: String): Long = {
JavaUtils.byteStringAsBytes(str)
}

/**
* Convert a passed byte string (e.g. 50b, 100k, or 250m) to kibibytes for internal use.
*
* If no suffix is provided, the passed number is assumed to be in kibibytes.
*/
def byteStringAsKb(str: String): Long = {
JavaUtils.byteStringAsKb(str)
}

/**
* Convert a passed byte string (e.g. 50b, 100k, or 250m) to mebibytes for internal use.
*
* If no suffix is provided, the passed number is assumed to be in mebibytes.
*/
def byteStringAsMb(str: String): Long = {
JavaUtils.byteStringAsMb(str)
}

/**
* Convert a passed byte string (e.g. 50b, 100k, or 250m, 500g) to gibibytes for internal use.
*
* If no suffix is provided, the passed number is assumed to be in gibibytes.
*/
def byteStringAsGb(str: String): Long = {
JavaUtils.byteStringAsGb(str)
}

/**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of mebibytes.
*/
def memoryStringToMb(str: String): Int = {
val lower = str.toLowerCase
if (lower.endsWith("k")) {
(lower.substring(0, lower.length-1).toLong / 1024).toInt
} else if (lower.endsWith("m")) {
lower.substring(0, lower.length-1).toInt
} else if (lower.endsWith("g")) {
lower.substring(0, lower.length-1).toInt * 1024
} else if (lower.endsWith("t")) {
lower.substring(0, lower.length-1).toInt * 1024 * 1024
} else {// no suffix, so it's just a number in bytes
(lower.toLong / 1024 / 1024).toInt
}
// Convert to bytes, rather than directly to MB, because when no units are specified the unit
// is assumed to be bytes
(JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: you could drop the JavaUtils. prefix in the call.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ class ExternalAppendOnlyMap[K, V, C](

// Number of bytes spilled in total
private var _diskBytesSpilled = 0L

private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

// Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
private val fileBufferSize =
sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024

// Write metrics for current spill
private var curWriteMetrics: ShuffleWriteMetrics = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ private[spark] class ExternalSorter[K, V, C](

private val conf = SparkEnv.get.conf
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

// Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)

// Size of object batches when reading/writing from serializers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private[spark] object RollingFileAppender {
val STRATEGY_DEFAULT = ""
val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval"
val INTERVAL_DEFAULT = "daily"
val SIZE_PROPERTY = "spark.executor.logs.rolling.size.maxBytes"
val SIZE_PROPERTY = "spark.executor.logs.rolling.maxSize"
val SIZE_DEFAULT = (1024 * 1024).toString
val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
val DEFAULT_BUFFER_SIZE = 8192
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {
}

test("groupByKey where map output sizes exceed maxMbInFlight") {
val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "1m")
sc = new SparkContext(clusterUrl, "test", conf)
// This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
// file should be about 2.5 MB
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,30 @@ import scala.language.postfixOps
import scala.util.{Try, Random}

import org.scalatest.FunSuite
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
test("Test byteString conversion") {
val conf = new SparkConf()
// Simply exercise the API, we don't need a complete conversion test since that's handled in
// UtilsSuite.scala
assert(conf.getSizeAsBytes("fake","1k") === ByteUnit.KiB.toBytes(1))
assert(conf.getSizeAsKb("fake","1k") === ByteUnit.KiB.toKiB(1))
assert(conf.getSizeAsMb("fake","1k") === ByteUnit.KiB.toMiB(1))
assert(conf.getSizeAsGb("fake","1k") === ByteUnit.KiB.toGiB(1))
}

test("Test timeString conversion") {
val conf = new SparkConf()
// Simply exercise the API, we don't need a complete conversion test since that's handled in
// UtilsSuite.scala
assert(conf.getTimeAsMs("fake","1ms") === TimeUnit.MILLISECONDS.toMillis(1))
assert(conf.getTimeAsSeconds("fake","1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000))
}

test("loading from system properties") {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class KryoSerializerResizableOutputSuite extends FunSuite {
test("kryo without resizable output buffer should fail on large array") {
val conf = new SparkConf(false)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.mb", "1")
conf.set("spark.kryoserializer.buffer.max.mb", "1")
conf.set("spark.kryoserializer.buffer", "1m")
conf.set("spark.kryoserializer.buffer.max", "1m")
val sc = new SparkContext("local", "test", conf)
intercept[SparkException](sc.parallelize(x).collect())
LocalSparkContext.stop(sc)
Expand All @@ -43,8 +43,8 @@ class KryoSerializerResizableOutputSuite extends FunSuite {
test("kryo with resizable output buffer should succeed on large array") {
val conf = new SparkConf(false)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.mb", "1")
conf.set("spark.kryoserializer.buffer.max.mb", "2")
conf.set("spark.kryoserializer.buffer", "1m")
conf.set("spark.kryoserializer.buffer.max", "2m")
val sc = new SparkContext("local", "test", conf)
assert(sc.parallelize(x).collect() === x)
LocalSparkContext.stop(sc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {

test("serialization buffer overflow reporting") {
import org.apache.spark.SparkException
val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max.mb"
val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"

val largeObject = (1 to 1000000).toArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
val allStores = new ArrayBuffer[BlockManager]

// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
conf.set("spark.kryoserializer.buffer.mb", "1")
conf.set("spark.kryoserializer.buffer", "1m")
val serializer = new KryoSerializer(conf)

// Implicitly convert strings to BlockIds for test clarity.
Expand Down
Loading