Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
149ea3e
ShuffleWriters write to temp file, then go through
squito Oct 21, 2015
cf8118e
assorted cleanup
squito Oct 22, 2015
ea1ae07
style
squito Oct 22, 2015
9356c67
fix compilation in StoragePerfTester
squito Oct 22, 2015
2b42eb5
mima
squito Oct 22, 2015
32d4b3b
update UnsafeShuffleWriterSuite
squito Oct 22, 2015
550e198
fix imports
squito Oct 22, 2015
4ff98bf
should work now, but needs cleanup
squito Oct 23, 2015
4a19702
only consider tmp files that exist; only consider the dest pre-existi…
squito Oct 23, 2015
89063dd
cleanup
squito Oct 23, 2015
4145651
ShuffleOutputCoordinatorSuite
squito Oct 23, 2015
2089e12
cleanup
squito Oct 23, 2015
2e9bbaa
Merge branch 'master' into SPARK-8029_first_wins
squito Oct 26, 2015
4cd423e
write the winning mapStatus to disk, so subsequent tasks can respond …
squito Oct 26, 2015
dc4b7f6
fix imports
squito Oct 26, 2015
b7a0981
fixes
squito Oct 26, 2015
830a097
shuffle writers must write always write all tmp files
squito Oct 27, 2015
5d11eca
more fixes for zero-sized blocks
squito Oct 27, 2015
3f5af9c
dont make ShuffleWriter return mapStatusFile
squito Oct 27, 2015
4b7c71a
rather than requiring all tmp files to exist, just write a zero-lengt…
squito Oct 27, 2015
eabf978
update test case
squito Oct 27, 2015
5bbeec3
minor cleanup
squito Oct 27, 2015
e141d82
test that shuffle output files are always the same
squito Oct 27, 2015
4df7955
fix compression settings of tmp files; minor cleanup
squito Oct 27, 2015
dc076b8
fix tests
squito Oct 27, 2015
cfdfd2c
review feedback
squito Nov 3, 2015
86f468a
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 4, 2015
5c8b247
fix imports
squito Nov 4, 2015
4d66df1
fix more imports
squito Nov 4, 2015
e59df41
couple more nits ...
squito Nov 4, 2015
c206fc5
minor cleanup
squito Nov 5, 2015
c0edff1
style
squito Nov 5, 2015
da33519
Merge branch 'master' into SPARK-8029_first_wins
squito Nov 11, 2015
c0b93a5
create temporary files in same location as destination files
squito Nov 11, 2015
9d0d9d9
no more @VisibleForTesting
squito Nov 11, 2015
80e037d
unused import
squito Nov 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
more fixes for zero-sized blocks
  • Loading branch information
squito committed Oct 27, 2015
commit 5d11eca843ab19fc4d1b83d50a677bd6f2b6f0d8
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,6 @@ private[spark] class ExternalSorter[K, V, C](
* called by the SortShuffleWriter.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this got removed in SPARK-10984, just cleaning up the comment

*/
def writePartitionedFile(
Expand Down Expand Up @@ -680,6 +679,12 @@ private[spark] class ExternalSorter[K, V, C](
}
}

// SPARK-8029 the ShuffleOutputCoordinator requires all shuffle output files to always exist,
// even if they are zero-length
if (!outputFile.exists()) {
outputFile.createNewFile()
}

context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.internalMetricsToAccumulators(
Expand Down
31 changes: 7 additions & 24 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.{KryoSerializer, Serializer}
import org.apache.spark.shuffle.{ShuffleOutputCoordinator, ShuffleWriter}
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleMapStatusBlockId}
import org.apache.spark.util.MutablePair
Expand Down Expand Up @@ -92,33 +92,16 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}

test("zero sized blocks") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)

// 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys
val NUM_BLOCKS = 201
val a = sc.parallelize(1 to 4, NUM_BLOCKS)
val b = a.map(x => (x, x*2))

// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS))
.setSerializer(new KryoSerializer(conf))

val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 4)

val blockSizes = (0 until NUM_BLOCKS).flatMap { id =>
val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, id)
statuses.flatMap(_._2.map(_._2))
}
val nonEmptyBlocks = blockSizes.filter(x => x > 0)

// We should have at most 4 non-zero sized partitions
assert(nonEmptyBlocks.size <= 4)
testZeroSizedBlocks(Some(new KryoSerializer(conf)))
}

test("zero sized blocks without kryo") {
testZeroSizedBlocks(None)
}

def testZeroSizedBlocks(serOpt: Option[Serializer]): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to this change, but "zero sized blocks" and "zero sized blocks without kryo" were almost identical so I made a helper,

// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)

Expand All @@ -127,8 +110,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
val a = sc.parallelize(1 to 4, NUM_BLOCKS)
val b = a.map(x => (x, x*2))

// NOTE: The default Java serializer should create zero-sized blocks
val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS))
serOpt.foreach(c.setSerializer(_))

val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 4)
Expand Down