Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make tests as functions so that other lineSep cna be tested easily.
  • Loading branch information
HyukjinKwon committed Oct 28, 2017
commit e7983d6cc72c11b67a6109c90ec83abc53a455e3
Original file line number Diff line number Diff line change
Expand Up @@ -185,46 +185,54 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
}
}

test("SPARK-21289: Support line separator") {
import testImplicits._

val data = """1 1:1.0 3:2.0 5:3.0|0|0 2:4.0 4:5.0 6:6.0"""
val lineSep = "|"
Seq(data, s"$data$lineSep").foreach { lines =>
val path0 = new File(tempDir.getCanonicalPath, "write0")
val path1 = new File(tempDir.getCanonicalPath, "write1")
try {
// Read
java.nio.file.Files.write(path0.toPath, lines.getBytes(StandardCharsets.UTF_8))
val df = spark.read
.option("lineSep", lineSep)
.format("libsvm")
.load(path0.getAbsolutePath)

assert(df.columns(0) == "label")
assert(df.columns(1) == "features")
val row1 = df.first()
assert(row1.getDouble(0) == 1.0)
val v = row1.getAs[SparseVector](1)
assert(v == Vectors.sparse(6, Seq((0, 1.0), (2, 2.0), (4, 3.0))))

// Write
df.coalesce(1).write.option("lineSep", "^").format("libsvm").save(path1.getAbsolutePath)
val partFile = Utils.recursiveList(path1).filter(f => f.getName.startsWith("part-")).head
val readBack = new String(
java.nio.file.Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
assert(readBack === "1.0 1:1.0 3:2.0 5:3.0^0.0^0.0 2:4.0 4:5.0 6:6.0^")

// Roundtrip
val readBackDF = spark.read
.option("lineSep", "^")
.format("libsvm")
.load(path1.getAbsolutePath)
assert(df.collect().toSet === readBackDF.collect().toSet)
} finally {
Utils.deleteRecursively(path0)
Utils.deleteRecursively(path1)

def testLineSeparator(lineSep: String): Unit = {
test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
val data = Seq(
"1.0 1:1.0 3:2.0 5:3.0", "0.0", "0.0", "0.0 2:4.0 4:5.0 6:6.0").mkString(lineSep)
val dataWithTrailingLineSep = s"$data$lineSep"

Seq(data, dataWithTrailingLineSep).foreach { lines =>
val path0 = new File(tempDir.getCanonicalPath, "write0")
val path1 = new File(tempDir.getCanonicalPath, "write1")
try {
// Read
java.nio.file.Files.write(path0.toPath, lines.getBytes(StandardCharsets.UTF_8))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not import this ? java.nio.file.Files

Copy link
Member Author

@HyukjinKwon HyukjinKwon Dec 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To differentiate it from google's Files explicitly above. Not a big deal.

val df = spark.read
.option("lineSep", lineSep)
.format("libsvm")
.load(path0.getAbsolutePath)

assert(df.columns(0) == "label")
assert(df.columns(1) == "features")
val row1 = df.first()
assert(row1.getDouble(0) == 1.0)
val v = row1.getAs[SparseVector](1)
assert(v == Vectors.sparse(6, Seq((0, 1.0), (2, 2.0), (4, 3.0))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use === instead of ==

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to ask why actually. I had some discussion about this and ended up without conclusion. The doc says === is preferred but the actual error messages are even clear with == sometimes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I prefer to keep consistent with others.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK but let me use ==. Seems it's used in the test cases of this file.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here you only test the first line ?
Why not use df.collect() to test every line ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just test the first line?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following test only include checking df and readbackDF equality. But, it seems we also need test the whole loaded df and raw file content equality.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we change how to deal with each line in iteration. I think both comparing single line or repeated multiple lines are fine. I think many tests here already test only first line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let me update it. It's easy to change anyway.

// Write
df.coalesce(1)
.write.option("lineSep", lineSep).format("libsvm").save(path1.getAbsolutePath)
val partFile = Utils.recursiveList(path1).filter(f => f.getName.startsWith("part-")).head
val readBack = new String(
java.nio.file.Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
assert(readBack === dataWithTrailingLineSep)

// Roundtrip
val readBackDF = spark.read
.option("lineSep", lineSep)
.format("libsvm")
.load(path1.getAbsolutePath)
assert(df.collect().toSet === readBackDF.collect().toSet)
} finally {
Utils.deleteRecursively(path0)
Utils.deleteRecursively(path1)
}
}
}
}

Seq("123!!@", "^", "&@").foreach { lineSep =>
testLineSeparator(lineSep)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1247,15 +1247,18 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
)
}

test("SPARK-21289: Support line separator") {
Seq(false, true).foreach { multiLine =>
def testLineSeparator(lineSep: String, multiLine: Boolean): Unit = {
test(s"SPARK-21289: Support line separator - lineSep: '$lineSep' and multiLine: $multiLine") {
// Read
Seq("a,b|1,\"a\nd\"|1,f|", "a,b|1,\"a\nd\"|1,f").foreach { lines =>
val data = Seq("a,b", "1,\"a\nd\"", "1,f").mkString(lineSep)
val dataWithTrailingLineSep = s"$data$lineSep"

Seq(data, dataWithTrailingLineSep).foreach { lines =>
withTempPath { path =>
Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
val df = spark.read
.option("multiLine", multiLine)
.option("lineSep", "|")
.option("lineSep", lineSep)
.option("header", true)
.option("inferSchema", true)
.csv(path.getAbsolutePath)
Expand All @@ -1270,22 +1273,28 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
// Write
withTempPath { path =>
Seq("a", "b", "c").toDF().coalesce(1)
.write.option("lineSep", "^").csv(path.getAbsolutePath)
.write.option("lineSep", lineSep).csv(path.getAbsolutePath)
val partFile = Utils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
assert(readBack === "a^b^c^")
assert(readBack === s"a${lineSep}b${lineSep}c${lineSep}")
}

// Roundtrip
withTempPath { path =>
val df = Seq("a", "b", "c").toDF()
df.write.option("lineSep", ":").csv(path.getAbsolutePath)
df.write.option("lineSep", lineSep).csv(path.getAbsolutePath)
val readBack = spark.read
.option("multiLine", multiLine)
.option("lineSep", ":")
.option("lineSep", lineSep)
.csv(path.getAbsolutePath)
checkAnswer(df, readBack)
}
}
}

Seq("|", "^", "::", "\r\n").foreach { lineSep =>
Seq(true, false).foreach { multiLine =>
testLineSeparator(lineSep, multiLine)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2065,42 +2065,50 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}

test("SPARK-21289: Support line separator") {
// Read
val data =
"""
| {"f":
|"a", "f0": 1}|{"f":
|
|"c", "f0": 2}|{"f": "d", "f0": 3}
""".stripMargin
val lineSep = "|"
Seq(data, s"$data$lineSep").foreach { lines =>
def testLineSeparator(lineSep: String): Unit = {
test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
// Read
val data =
s"""
| {"f":
|"a", "f0": 1}${lineSep}{"f":
|
|"c", "f0": 2}${lineSep}{"f": "d", "f0": 3}
""".stripMargin
val dataWithTrailingLineSep = s"$data$lineSep"

Seq(data, dataWithTrailingLineSep).foreach { lines =>
withTempPath { path =>
Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath)
val expectedSchema =
StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil)
checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
assert(df.schema === expectedSchema)
}
}

// Write
withTempPath { path =>
Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
val df = spark.read.option("lineSep", "|").json(path.getAbsolutePath)
val expectedSchema =
StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil)
checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
assert(df.schema === expectedSchema)
Seq("a", "b", "c").toDF("value").coalesce(1)
.write.option("lineSep", lineSep).json(path.getAbsolutePath)
val partFile = Utils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
assert(
readBack === s"""{"value":"a"}${lineSep}{"value":"b"}${lineSep}{"value":"c"}${lineSep}""")
}
}

// Write
withTempPath { path =>
Seq("a", "b", "c").toDF("value").coalesce(1)
.write.option("lineSep", "^").json(path.getAbsolutePath)
val partFile = Utils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
assert(readBack === """{"value":"a"}^{"value":"b"}^{"value":"c"}^""")
// Roundtrip
withTempPath { path =>
val df = Seq("a", "b", "c").toDF()
df.write.option("lineSep", lineSep).json(path.getAbsolutePath)
val readBack = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath)
checkAnswer(df, readBack)
}
}
}

// Roundtrip
withTempPath { path =>
val df = Seq("a", "b", "c").toDF()
df.write.option("lineSep", "!").json(path.getAbsolutePath)
val readBack = spark.read.option("lineSep", "!").json(path.getAbsolutePath)
checkAnswer(df, readBack)
}
Seq("|", "^", "::", "!!!@3").foreach { lineSep =>
testLineSeparator(lineSep)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,32 +174,41 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-21289: Support line separator") {
// Read
Seq("a|b|\nc|", "a|b|\nc").foreach { lines =>
def testLineSeparator(lineSep: String): Unit = {
test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
// Read
val values = Seq("a", "b", "\nc")
val data = values.mkString(lineSep)
val dataWithTrailingLineSep = s"$data$lineSep"
Seq(data, dataWithTrailingLineSep).foreach { lines =>
withTempPath { path =>
Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
val df = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath)
checkAnswer(df, Seq("a", "b", "\nc").toDF())
}
}

// Write
withTempPath { path =>
Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
val df = spark.read.option("lineSep", "|").text(path.getAbsolutePath)
checkAnswer(df, Seq("a", "b", "\nc").toDF())
values.toDF().coalesce(1)
.write.option("lineSep", lineSep).text(path.getAbsolutePath)
val partFile = Utils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
assert(readBack === s"a${lineSep}b${lineSep}\nc${lineSep}")
}
}

// Write
withTempPath { path =>
Seq("a", "b", "\nc").toDF().coalesce(1)
.write.option("lineSep", "^").text(path.getAbsolutePath)
val partFile = Utils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
assert(readBack === "a^b^\nc^")
// Roundtrip
withTempPath { path =>
val df = values.toDF()
df.write.option("lineSep", lineSep).text(path.getAbsolutePath)
val readBack = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath)
checkAnswer(df, readBack)
}
}
}

// Roundtrip
withTempPath { path =>
val df = Seq("a", "b", "\nc").toDF()
df.write.option("lineSep", ":").text(path.getAbsolutePath)
val readBack = spark.read.option("lineSep", ":").text(path.getAbsolutePath)
checkAnswer(df, readBack)
}
Seq("|", "^", "::", "!!!@3").foreach { lineSep =>
testLineSeparator(lineSep)
}

private def testFile: String = {
Expand Down