Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ class RPackageUtilsSuite
IvyTestUtils.withRepository(main, None, None) { repo =>
val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil,
useIvyLayout = false, withR = false, None)
val jarFile = new JarFile(jar)
assert(jarFile.getManifest == null, "jar file should have null manifest")
assert(!RPackageUtils.checkManifestForR(jarFile), "null manifest should return false")
Utils.tryWithResource(new JarFile(jar)) { jarFile =>
assert(jarFile.getManifest == null, "jar file should have null manifest")
assert(!RPackageUtils.checkManifestForR(jarFile), "null manifest should return false")
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply closes JarFile. This should be closed

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ class SparkSubmitSuite
val hadoopConf = new Configuration()
val tmpDir = Files.createTempDirectory("tmp").toFile
updateConfWithFakeS3Fs(hadoopConf)
val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
val sourcePath = s"s3a://${jarFile.toURI.getPath}"
Copy link
Member Author

@HyukjinKwon HyukjinKwon Aug 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Windows:

Before:

scala> f.getAbsolutePath
res2: String = C:\a\b\c

After:

scala> f.toURI.getPath
res1: String = /C:/a/b/c

Linux:

Before:

scala> new File("/a/b/c").getAbsolutePath
res0: String = /a/b/c

After:

scala> new File("/a/b/c").toURI.getPath
res1: String = /a/b/c

val outputPath = DependencyUtils.downloadFile(sourcePath, tmpDir, sparkConf, hadoopConf,
new SecurityManager(sparkConf))
checkDownloadedFile(sourcePath, outputPath)
Expand All @@ -847,7 +847,7 @@ class SparkSubmitSuite
val hadoopConf = new Configuration()
val tmpDir = Files.createTempDirectory("tmp").toFile
updateConfWithFakeS3Fs(hadoopConf)
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.toURI.getPath}")
val outputPaths = DependencyUtils
.downloadFileList(sourcePaths.mkString(","), tmpDir, sparkConf, hadoopConf,
new SecurityManager(sparkConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.net.URI
import java.util.concurrent.atomic.AtomicInteger

import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter

Expand Down Expand Up @@ -84,24 +85,23 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
val buffered = new ByteArrayOutputStream
val codec = new LZ4CompressionCodec(new SparkConf())
val compstream = codec.compressedOutputStream(buffered)
val writer = new PrintWriter(compstream)
Utils.tryWithResource(new PrintWriter(compstream)) { writer =>

val applicationStart = SparkListenerApplicationStart("AppStarts", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
val applicationStart = SparkListenerApplicationStart("AppStarts", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)

// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
writer.close()
// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
}

val logFilePath = Utils.getFilePath(testDir, "events.lz4.inprogress")
val fstream = fileSystem.create(logFilePath)
val bytes = buffered.toByteArray

fstream.write(bytes, 0, buffered.size)
fstream.close
Utils.tryWithResource(fileSystem.create(logFilePath)) { fstream =>
fstream.write(bytes, 0, buffered.size)
}

// Read the compressed .inprogress file and verify only first event was parsed.
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
Expand All @@ -112,17 +112,19 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp

// Verify the replay returns the events given the input maybe truncated.
val logData = EventLoggingListener.openEventLog(logFilePath, fileSystem)
val failingStream = new EarlyEOFInputStream(logData, buffered.size - 10)
replayer.replay(failingStream, logFilePath.toString, true)
Utils.tryWithResource(new EarlyEOFInputStream(logData, buffered.size - 10)) { failingStream =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here EarlyEOFInputStream was not being closed.

replayer.replay(failingStream, logFilePath.toString, true)

assert(eventMonster.loggedEvents.size === 1)
assert(failingStream.didFail)
assert(eventMonster.loggedEvents.size === 1)
assert(failingStream.didFail)
}

// Verify the replay throws the EOF exception since the input may not be truncated.
val logData2 = EventLoggingListener.openEventLog(logFilePath, fileSystem)
val failingStream2 = new EarlyEOFInputStream(logData2, buffered.size - 10)
intercept[EOFException] {
replayer.replay(failingStream2, logFilePath.toString, false)
Utils.tryWithResource(new EarlyEOFInputStream(logData2, buffered.size - 10)) { failingStream2 =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can still use failingStream here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks so but I think I am not confident enough to change this. Will keep this in mind and point out when someone fixes the codes around this.

intercept[EOFException] {
replayer.replay(failingStream2, logFilePath.toString, false)
}
}
}

Expand Down Expand Up @@ -151,7 +153,10 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
* assumption that the event logging behavior is correct (tested in a separate suite).
*/
private def testApplicationReplay(codecName: Option[String] = None) {
val logDirPath = Utils.getFilePath(testDir, "test-replay")
val logDir = new File(testDir.getAbsolutePath, "test-replay")
// Here, it creates `Path` from the URI instead of the absolute path for the explicit file
// scheme so that the string representation of this `Path` has leading file scheme correctly.
val logDirPath = new Path(logDir.toURI)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we create this from the absolute path, it appears that the string ends up with C:/../.. form and
 Utils.resolveURI recognises C as the scheme, causing "No FileSystem for scheme: C"
 exception.

It looks Path can handle this but we can't currently replace Utils.resolveURI to
 Path due to of some corner case of behaviour changes.

For example, with Path, "hdfs:///root/spark.jar#app.jar" becomes
 "hdfs:///root/spark.jar%23app.jar" but with Utils.resolveURI,
 "hdfs:///root/spark.jar#app.jar" becomes "hdfs:///root/spark.jar#app.jar".

Utils.resolveURI is being called via,

sc = new SparkContext("local-cluster[2,1,1024]", "Test replay", conf)

Some(Utils.resolveURI(unresolvedDir))

Copy link
Member Author

@HyukjinKwon HyukjinKwon Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test itself was added long time ago but looks there was a recent change related with this code path - edcb878.

val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)

I think this simple test describes what I intended:

Before

scala> import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.Path

scala> val path = new Path("C:\\a\\b\\c")
path: org.apache.hadoop.fs.Path = C:/a/b/c

scala> path.toString
res0: String = C:/a/b/c

scala> path.toUri.toString
res1: String = /C:/a/b/c

After

scala> import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.Path

scala> import java.io.File
import java.io.File

scala> val file = new File("C:\\a\\b\\c")
file: java.io.File = C:\a\b\c

scala> val path = new Path(file.toURI)
path: org.apache.hadoop.fs.Path = file:/C:/a/b/c

scala> path.toString
res2: String = file:/C:/a/b/c

scala> path.toUri.toString
res3: String = file:/C:/a/b/c

Please correct me if I am mistaken here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @vanzin, I believe I need your look. Could you take a look when you have some time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @sarutak too. Other changes should be fine as they are what I have usually fixed but I am less sure of this one. Current status conservatively fixes the test only but I guess I need a sign-off on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From your description it sounds like Utils.resolveURI might not be correct for Windows paths. I don't have Windows available, so if you could try these it might help in understanding:

Utils.resolveURI("C:\\WINDOWS")
Utils.resolveURI("/C:/WINDOWS")
Utils.resolveURI("C:/WINDOWS")

The first two should return the same thing ("file:/C:/WINDOWS" or something along those lines) while the third I'm not sure, since it's ambiguous. But that's probably the cause of the change of behavior.

Anyway the code change looks correct.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have Windows one set up properly for dev env, and also a set of scripts to run a specific Scala tests by test-only via AppVeyor automatically against a PR. So, it is not really hard to test. I am fine with asking more cases in the future.

println("=== org.apache.spark.util.Utils.resolveURI")
println(Utils.resolveURI("C:\\WINDOWS").toString)
println(Utils.resolveURI("/C:/WINDOWS").toString)
println(Utils.resolveURI("C:/WINDOWS").toString)
println
println(Utils.resolveURI("C:\\WINDOWS").getScheme)
println(Utils.resolveURI("/C:/WINDOWS").getScheme)
println(Utils.resolveURI("C:/WINDOWS").getScheme)
println

println("=== java.io.File")
println(new File("C:\\WINDOWS").toURI.toString)
println(new File("/C:/WINDOWS").toURI.toString)
println(new File("C:/WINDOWS").toURI.toString)
println
println(new File("C:\\WINDOWS").toURI.getScheme)
println(new File("/C:/WINDOWS").toURI.getScheme)
println(new File("C:/WINDOWS").toURI.getScheme)
println

println("=== org.apache.hadoop.fs.Path")
println(new Path("C:\\WINDOWS").toUri.toString)
println(new Path("/C:/WINDOWS").toUri.toString)
println(new Path("C:/WINDOWS").toUri.toString)
println
println(new Path("C:\\WINDOWS").toString)
println(new Path("/C:/WINDOWS").toString)
println(new Path("C:/WINDOWS").toString)
println
println(new Path("C:\\WINDOWS").toUri.getScheme)
println(new Path("/C:/WINDOWS").toUri.getScheme)
println(new Path("C:/WINDOWS").toUri.getScheme)
println

println("=== java.io.File.toURI and org.apache.hadoop.fs.Path")
println(new Path(new File("C:\\WINDOWS").toURI).toUri.toString)
println(new Path(new File("/C:/WINDOWS").toURI).toUri.toString)
println(new Path(new File("C:/WINDOWS").toURI).toUri.toString)
println
println(new Path(new File("C:\\WINDOWS").toURI).toString)
println(new Path(new File("/C:/WINDOWS").toURI).toString)
println(new Path(new File("C:/WINDOWS").toURI).toString)
println
println(new Path(new File("C:\\WINDOWS").toURI).toUri.getScheme)
println(new Path(new File("/C:/WINDOWS").toURI).toUri.getScheme)
println(new Path(new File("C:/WINDOWS").toURI).toUri.getScheme)

produced

=== org.apache.spark.util.Utils.resolveURI
file:/C:/WINDOWS/
file:/C:/WINDOWS/
C:/WINDOWS

file
file
C

=== java.io.File
file:/C:/WINDOWS/
file:/C:/WINDOWS/
file:/C:/WINDOWS/

file
file
file

=== org.apache.hadoop.fs.Path
/C:/WINDOWS
/C:/WINDOWS
/C:/WINDOWS

C:/WINDOWS
C:/WINDOWS
C:/WINDOWS

null
null
null

=== java.io.File.toURI and org.apache.hadoop.fs.Path
file:/C:/WINDOWS/
file:/C:/WINDOWS/
file:/C:/WINDOWS/

file:/C:/WINDOWS/
file:/C:/WINDOWS/
file:/C:/WINDOWS/

file
file
file

Copy link
Member

@sarutak sarutak Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I think this change it self looks reasonable.
resolveURI should seem to be fixed so that Windows' path is handled correctly.
If C:/path/to/some/file is passed to resolveURI, the letter drive "C" should not parsed as URI scheme.

fileSystem.mkdirs(logDirPath)

val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName)
Expand Down Expand Up @@ -221,12 +226,14 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
def didFail: Boolean = countDown.get == 0

@throws[IOException]
def read: Int = {
override def read(): Int = {
if (countDown.get == 0) {
throw new EOFException("Stream ended prematurely")
}
countDown.decrementAndGet()
in.read
in.read()
}

override def close(): Unit = in.close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EarlyEOFInputStream was not being closed.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') SELECT * FROM src")
}

sql(s"ALTER TABLE $tableName SET LOCATION '$path'")
sql(s"ALTER TABLE $tableName SET LOCATION '${path.toURI}'")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests here do not look dedicated to test path. I have fixed those so far.


sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")

Expand All @@ -222,7 +222,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
s"""
|CREATE TABLE $sourceTableName (key STRING, value STRING)
|PARTITIONED BY (ds STRING)
|LOCATION '$path'
|LOCATION '${path.toURI}'
""".stripMargin)

val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
Expand All @@ -239,7 +239,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
s"""
|CREATE TABLE $tableName (key STRING, value STRING)
|PARTITIONED BY (ds STRING)
|LOCATION '$path'
|LOCATION '${path.toURI}'
""".stripMargin)

// Register only one of the partitions found on disk
Expand Down