From 4b7ec9a9c7d136eee43adf5b6708944d76f7fe3e Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Mon, 5 Sep 2016 09:48:26 +0100 Subject: [PATCH 1/8] Update Snappy to 1.1.2.6 Pom change --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2c265c1fa325..e6c28977ca78 100644 --- a/pom.xml +++ b/pom.xml @@ -159,7 +159,7 @@ 2.11 1.9.13 2.6.5 - 1.1.2.4 + 1.1.2.6 1.1.2 1.2.0-incubating 1.10 From 440b1df04f7f85a9b99b3c57e83a9ca9b19e8b76 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Mon, 5 Sep 2016 09:49:57 +0100 Subject: [PATCH 2/8] Update spark-deps-hadoop-2.2 --- dev/deps/spark-deps-hadoop-2.2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index eaed0889ac36..81adde6a13a1 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -152,7 +152,7 @@ shapeless_2.11-2.0.0.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar -snappy-java-1.1.2.4.jar +snappy-java-1.1.2.6.jar spire-macros_2.11-0.7.4.jar spire_2.11-0.7.4.jar stax-api-1.0.1.jar From 140f70acf1d1940ccbddb5408dbed9ee0feee459 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Mon, 5 Sep 2016 09:50:06 +0100 Subject: [PATCH 3/8] Update spark-deps-hadoop-2.3 --- dev/deps/spark-deps-hadoop-2.3 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index d68a7f462ba7..75ab6286dec3 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -159,7 +159,7 @@ shapeless_2.11-2.0.0.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar -snappy-java-1.1.2.4.jar +snappy-java-1.1.2.6.jar spire-macros_2.11-0.7.4.jar spire_2.11-0.7.4.jar stax-api-1.0-2.jar From cda5016f03f184c3bb9e35038eb00a24c1aaf970 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Mon, 5 Sep 2016 09:50:17 +0100 Subject: [PATCH 4/8] Update spark-deps-hadoop-2.4 --- dev/deps/spark-deps-hadoop-2.4 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 346f19767d36..897d802a9d6a 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -159,7 +159,7 @@ shapeless_2.11-2.0.0.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar -snappy-java-1.1.2.4.jar +snappy-java-1.1.2.6.jar spire-macros_2.11-0.7.4.jar spire_2.11-0.7.4.jar stax-api-1.0-2.jar From 36253baf6abffa90c4480b98f38b78f2b1507447 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Mon, 5 Sep 2016 09:50:26 +0100 Subject: [PATCH 5/8] Update spark-deps-hadoop-2.6 --- dev/deps/spark-deps-hadoop-2.6 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 6f4695f345a4..f95ddb1c3065 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -167,7 +167,7 @@ shapeless_2.11-2.0.0.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar -snappy-java-1.1.2.4.jar +snappy-java-1.1.2.6.jar spire-macros_2.11-0.7.4.jar spire_2.11-0.7.4.jar stax-api-1.0-2.jar From 3795997a116c868343930ac9a91afb861c82705c Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Mon, 5 Sep 2016 09:50:35 +0100 Subject: [PATCH 6/8] Update spark-deps-hadoop-2.7 --- dev/deps/spark-deps-hadoop-2.7 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 7a86a8bd8884..8df02c032bf2 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -168,7 +168,7 @@ shapeless_2.11-2.0.0.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar -snappy-java-1.1.2.4.jar +snappy-java-1.1.2.6.jar spire-macros_2.11-0.7.4.jar spire_2.11-0.7.4.jar stax-api-1.0-2.jar From accc88f8c73864d50b5588f43c53c11323052d2e Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Tue, 6 Sep 2016 16:38:05 +0100 Subject: [PATCH 7/8] Add test for snappy-java handling of magic header Probably needs TLC with the while loop, this is based on https://github.com/xerial/snappy-java/blob/60cc0c2e1d1a76ae2981d0572a5164fcfdfba5f1/src/test/java/org/xerial/snappy/SnappyInputStreamTest.java but outside of the snappy package so we can't use MAGIC_HEADER[0]. --- .../spark/io/CompressionCodecSuite.scala | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 9e9c2b0165e1..57055274b878 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.io -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream} import com.google.common.io.ByteStreams @@ -130,4 +130,58 @@ class CompressionCodecSuite extends SparkFunSuite { ByteStreams.readFully(concatenatedBytes, decompressed) assert(decompressed.toSeq === (0 to 127)) } + + // Based on https://github.com/xerial/snappy-java/blob/60cc0c2e1d1a76ae2981d0572a5164fcfdfba5f1/src/test/java/org/xerial/snappy/SnappyInputStreamTest.java + test("SPARK 17378: snappy-java should handle magic header when reading stream") { + val b = new ByteArrayOutputStream() + // Write uncompressed length beginning with -126 (the same with magicheader[0]) + b.write(-126) // Can't access magic header[0] as it isn't public, so access this way + b.write(0x01) + // uncompressed data length = 130 + + var data = new ByteArrayOutputStream() + + for (i <- 0 until 130) { + data.write('A') + } + + var dataMoreThan8Len = data.toByteArray() + + // write literal (lower 2-bit of the first tag byte is 00, upper 6-bits represents data size) + b.write(60<<2) // 1-byte data length follows + b.write(dataMoreThan8Len.length-1) // subsequent data length + b.write(dataMoreThan8Len) + + var compressed = b.toByteArray() + + // This should succeed + assert(dataMoreThan8Len === org.xerial.snappy.Snappy.uncompress(compressed)) + + // Reproduce error in #142 + val in = new org.xerial.snappy.SnappyInputStream(new ByteArrayInputStream(b.toByteArray())) + + var uncompressed = readFully(in) + assert(dataMoreThan8Len === uncompressed) // this fails as uncompressed is empty + } + + private def readFully(input: InputStream): Array[Byte] = { + try { + val out = new ByteArrayOutputStream() + var buf = new Array[Byte](4096) + + var readBytes = 0 + while (readBytes != -1) { + readBytes = input.read(buf) + if (readBytes != -1) { + out.write(buf, 0, readBytes) + } + } + out.flush() + return out.toByteArray() + } + finally { + input.close(); + } + } + } From 022cad7b69f75b2e358fdc1b0f01224c3c067424 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Tue, 6 Sep 2016 17:03:59 +0100 Subject: [PATCH 8/8] Revert test addition in CompressionCodecSuite Was a duplicate of the Snappy test instead of exercising the problem from Spark, something to think about later --- .../spark/io/CompressionCodecSuite.scala | 56 +------------------ 1 file changed, 1 insertion(+), 55 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 57055274b878..9e9c2b0165e1 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.io -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import com.google.common.io.ByteStreams @@ -130,58 +130,4 @@ class CompressionCodecSuite extends SparkFunSuite { ByteStreams.readFully(concatenatedBytes, decompressed) assert(decompressed.toSeq === (0 to 127)) } - - // Based on https://github.com/xerial/snappy-java/blob/60cc0c2e1d1a76ae2981d0572a5164fcfdfba5f1/src/test/java/org/xerial/snappy/SnappyInputStreamTest.java - test("SPARK 17378: snappy-java should handle magic header when reading stream") { - val b = new ByteArrayOutputStream() - // Write uncompressed length beginning with -126 (the same with magicheader[0]) - b.write(-126) // Can't access magic header[0] as it isn't public, so access this way - b.write(0x01) - // uncompressed data length = 130 - - var data = new ByteArrayOutputStream() - - for (i <- 0 until 130) { - data.write('A') - } - - var dataMoreThan8Len = data.toByteArray() - - // write literal (lower 2-bit of the first tag byte is 00, upper 6-bits represents data size) - b.write(60<<2) // 1-byte data length follows - b.write(dataMoreThan8Len.length-1) // subsequent data length - b.write(dataMoreThan8Len) - - var compressed = b.toByteArray() - - // This should succeed - assert(dataMoreThan8Len === org.xerial.snappy.Snappy.uncompress(compressed)) - - // Reproduce error in #142 - val in = new org.xerial.snappy.SnappyInputStream(new ByteArrayInputStream(b.toByteArray())) - - var uncompressed = readFully(in) - assert(dataMoreThan8Len === uncompressed) // this fails as uncompressed is empty - } - - private def readFully(input: InputStream): Array[Byte] = { - try { - val out = new ByteArrayOutputStream() - var buf = new Array[Byte](4096) - - var readBytes = 0 - while (readBytes != -1) { - readBytes = input.read(buf) - if (readBytes != -1) { - out.write(buf, 0, readBytes) - } - } - out.flush() - return out.toByteArray() - } - finally { - input.close(); - } - } - }