Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add test for snappy-java handling of magic header
Probably needs TLC with the while loop, this is based on https://github.com/xerial/snappy-java/blob/60cc0c2e1d1a76ae2981d0572a5164fcfdfba5f1/src/test/java/org/xerial/snappy/SnappyInputStreamTest.java but outside of the snappy package so we can't use MAGIC_HEADER[0].
  • Loading branch information
a-roberts authored Sep 6, 2016
commit accc88f8c73864d50b5588f43c53c11323052d2e
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.io

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}

import com.google.common.io.ByteStreams

Expand Down Expand Up @@ -130,4 +130,58 @@ class CompressionCodecSuite extends SparkFunSuite {
ByteStreams.readFully(concatenatedBytes, decompressed)
assert(decompressed.toSeq === (0 to 127))
}

// Based on https://github.com/xerial/snappy-java/blob/60cc0c2e1d1a76ae2981d0572a5164fcfdfba5f1/src/test/java/org/xerial/snappy/SnappyInputStreamTest.java
test("SPARK 17378: snappy-java should handle magic header when reading stream") {
val b = new ByteArrayOutputStream()
// Write uncompressed length beginning with -126 (the same with magicheader[0])
b.write(-126) // Can't access magic header[0] as it isn't public, so access this way
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this seems to tie this test to this internal detail of Snappy though. Spark itself doesn't really need to assert this detail in a test.

I feel like this test is just testing snappy, which snappy can test. I could see testing a case at the level of Spark that triggers this bug and verifies it's fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, so how about we just revert the test case commit here and merge the 1.1.2.6 change itself as folks want it, and then in a later PR add an extra test for robustness if we want to.

b.write(0x01)
// uncompressed data length = 130

var data = new ByteArrayOutputStream()

for (i <- 0 until 130) {
data.write('A')
}

var dataMoreThan8Len = data.toByteArray()

// write literal (lower 2-bit of the first tag byte is 00, upper 6-bits represents data size)
b.write(60<<2) // 1-byte data length follows
b.write(dataMoreThan8Len.length-1) // subsequent data length
b.write(dataMoreThan8Len)

var compressed = b.toByteArray()

// This should succeed
assert(dataMoreThan8Len === org.xerial.snappy.Snappy.uncompress(compressed))

// Reproduce error in #142
val in = new org.xerial.snappy.SnappyInputStream(new ByteArrayInputStream(b.toByteArray()))

var uncompressed = readFully(in)
assert(dataMoreThan8Len === uncompressed) // this fails as uncompressed is empty
}

private def readFully(input: InputStream): Array[Byte] = {
try {
val out = new ByteArrayOutputStream()
var buf = new Array[Byte](4096)

var readBytes = 0
while (readBytes != -1) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop makes me weep, it's an attempt @ rewriting the following but in Scala

for (int readBytes = 0; (readBytes = input.read(buf)) != -1; ) {

readBytes = input.read(buf)
if (readBytes != -1) {
out.write(buf, 0, readBytes)
}
}
out.flush()
return out.toByteArray()
}
finally {
input.close();
}
}

}