From 72095b9cc418c1aebaf750eb3e6e58c468292b21 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 21 Oct 2019 17:20:39 -0700 Subject: [PATCH 1/4] Extend FilterInputStream and use the stream directly Instead of trying to return a composite struct. This is because codegen will attempt to cast the returned stream as a java.io.InputStream --- .../spark/shuffle/api/ShuffleBlockInputStream.java | 10 +++------- .../apache/spark/shuffle/BlockStoreShuffleReader.scala | 3 +-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleBlockInputStream.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleBlockInputStream.java index 19b700101c58..6f04e9042a92 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleBlockInputStream.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleBlockInputStream.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle.api; +import java.io.FilterInputStream; import org.apache.spark.storage.BlockId; import java.io.InputStream; @@ -26,20 +27,15 @@ * An object defining the shuffle block and length metadata associated with the block. * @since 3.0.0 */ -public class ShuffleBlockInputStream { +public class ShuffleBlockInputStream extends FilterInputStream { private final BlockId blockId; - private final InputStream inputStream; public ShuffleBlockInputStream(BlockId blockId, InputStream inputStream) { + super(inputStream); this.blockId = blockId; - this.inputStream = inputStream; } public BlockId getBlockId() { return this.blockId; } - - public InputStream getInputStream() { - return this.inputStream; - } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 15df30c313b6..5dca16a41293 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -73,8 +73,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( } }.asJava).iterator() - val retryingWrappedStreams = streamsIterator.asScala.map { shuffleBlock => - val rawReaderStream = shuffleBlock.getInputStream + val retryingWrappedStreams = streamsIterator.asScala.map { rawReaderStream => if (shuffleExecutorComponents.shouldWrapPartitionReaderStream()) { if (compressShuffle) { compressionCodec.compressedInputStream( From 007741cd23b229c6ec28387a5f552078f9ffdbea Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 21 Oct 2019 17:44:58 -0700 Subject: [PATCH 2/4] Invalidate sbt caches --- .circleci/config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 13a8051e1486..cfdf3df84ff9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -51,10 +51,10 @@ step_templates: restore-home-sbt-cache: &restore-home-sbt-cache restore_cache: keys: - - v2-home-sbt-{{ checksum "build/sbt" }}-{{ checksum "project/target/streams/$global/update/$global/streams/update_cache_2.10/inputs" }} + - v3-home-sbt-{{ checksum "build/sbt" }}-{{ checksum "project/target/streams/$global/update/$global/streams/update_cache_2.10/inputs" }} restore-build-sbt-cache: &restore-build-sbt-cache restore_cache: - key: v1-build-sbt-{{ .Branch }}-{{ .Revision }} + key: v2-build-sbt-{{ .Branch }}-{{ .Revision }} link-in-build-sbt-cache: &link-in-build-sbt-cache run: name: Hard link cache contents into current build directory @@ -244,7 +244,7 @@ jobs: path: /tmp/build-apache-spark.log destination: build-apache-spark.log - save_cache: - key: v2-home-sbt-{{ checksum "build/sbt" }}-{{ checksum "project/target/streams/$global/update/$global/streams/update_cache_2.10/inputs" }} + key: v3-home-sbt-{{ checksum "build/sbt" }}-{{ checksum "project/target/streams/$global/update/$global/streams/update_cache_2.10/inputs" }} paths: ~/.sbt # Also hard link all the things so we can save it as a cache and restore it in future builds - run: @@ -254,7 +254,7 @@ jobs: --exclude '***/*.jar' --include 'target/***' --include '**/' --exclude '*' . "$BUILD_SBT_CACHE/" - save_cache: - key: v1-build-sbt-{{ .Branch }}-{{ .Revision }} + key: v2-build-sbt-{{ .Branch }}-{{ .Revision }} paths: - "~/build-sbt-cache" # Also save all the assembly jars directories to the workspace - need them for spark submitting From 92f342d6611dc9ae6c428cccc8ad23723cd0acfc Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 21 Oct 2019 18:07:59 -0700 Subject: [PATCH 3/4] Update to sbt-git 1.0.0 --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index e2cfbe7bd6d4..e3ab387044e6 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ -addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.8.5") +addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0") addSbtPlugin("com.etsy" % "sbt-checkstyle-plugin" % "3.1.1") From 13e7b6aba500325bcbaaa10e044a3e972eb1a10c Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 21 Oct 2019 18:10:29 -0700 Subject: [PATCH 4/4] Revert "Invalidate sbt caches" This reverts commit 007741cd23b229c6ec28387a5f552078f9ffdbea. --- .circleci/config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index cfdf3df84ff9..13a8051e1486 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -51,10 +51,10 @@ step_templates: restore-home-sbt-cache: &restore-home-sbt-cache restore_cache: keys: - - v3-home-sbt-{{ checksum "build/sbt" }}-{{ checksum "project/target/streams/$global/update/$global/streams/update_cache_2.10/inputs" }} + - v2-home-sbt-{{ checksum "build/sbt" }}-{{ checksum "project/target/streams/$global/update/$global/streams/update_cache_2.10/inputs" }} restore-build-sbt-cache: &restore-build-sbt-cache restore_cache: - key: v2-build-sbt-{{ .Branch }}-{{ .Revision }} + key: v1-build-sbt-{{ .Branch }}-{{ .Revision }} link-in-build-sbt-cache: &link-in-build-sbt-cache run: name: Hard link cache contents into current build directory @@ -244,7 +244,7 @@ jobs: path: /tmp/build-apache-spark.log destination: build-apache-spark.log - save_cache: - key: v3-home-sbt-{{ checksum "build/sbt" }}-{{ checksum "project/target/streams/$global/update/$global/streams/update_cache_2.10/inputs" }} + key: v2-home-sbt-{{ checksum "build/sbt" }}-{{ checksum "project/target/streams/$global/update/$global/streams/update_cache_2.10/inputs" }} paths: ~/.sbt # Also hard link all the things so we can save it as a cache and restore it in future builds - run: @@ -254,7 +254,7 @@ jobs: --exclude '***/*.jar' --include 'target/***' --include '**/' --exclude '*' . "$BUILD_SBT_CACHE/" - save_cache: - key: v2-build-sbt-{{ .Branch }}-{{ .Revision }} + key: v1-build-sbt-{{ .Branch }}-{{ .Revision }} paths: - "~/build-sbt-cache" # Also save all the assembly jars directories to the workspace - need them for spark submitting