Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Simply check the count via providing schema instead of checking excep…
…tion message on schema inference
  • Loading branch information
HeartSaVioR committed Nov 18, 2020
commit d216fcb9562e05116b6d01e29255cc64e8a10d4c
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import java.time.format.DateTimeFormatter

import scala.util.Random

import org.apache.spark.sql.{AnalysisException, DataFrameReader, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}

class PathFilterSuite extends QueryTest with SharedSparkSession {
import testImplicits._
Expand All @@ -34,17 +35,15 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
" and sharing same timestamp with file last modified time.") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formatTime(curTime)),
expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formatTime(curTime)))
}
}

test("SPARK-31962: modifiedAfter specified" +
" and sharing same timestamp with file last modified time.") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
executeTest(dir, Seq(curTime), modifiedAfter = Some(formatTime(curTime)),
expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedAfter = Some(formatTime(curTime)))
}
}

Expand All @@ -53,8 +52,8 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime), expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -64,8 +63,8 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val fileTime = curTime.minusDays(3)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(fileTime), modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime), expectedCount = None)
executeTest(dir, Seq(fileTime), 0, modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -74,8 +73,8 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime), expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -84,8 +83,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val pastTime = curTime.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(curTime), modifiedAfter = Some(formattedTime),
expectedCount = Some(1))
executeTest(dir, Seq(curTime), 1, modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -94,7 +92,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val futureTime = curTime.plusYears(1)
val formattedTime = formatTime(futureTime)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formattedTime), expectedCount = Some(1))
executeTest(dir, Seq(curTime), 1, modifiedBefore = Some(formattedTime))
}
}

Expand All @@ -103,7 +101,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val pastTime = curTime.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formattedTime), expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime))
}
}

Expand All @@ -113,8 +111,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val fileTime2 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val pastTime = fileTime1.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(fileTime1, fileTime2), modifiedAfter = Some(formattedTime),
expectedCount = Some(1))
executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -123,8 +120,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val pastTime = curTime.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(curTime, curTime), modifiedAfter = Some(formattedTime),
expectedCount = Some(2))
executeTest(dir, Seq(curTime, curTime), 2, modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -133,8 +129,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val pastTime = LocalDateTime.now(ZoneOffset.UTC).minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(fileTime, fileTime), modifiedAfter = Some(formattedTime),
expectedCount = None)
executeTest(dir, Seq(fileTime, fileTime), 0, modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -143,8 +138,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val futureTime = LocalDateTime.now(ZoneOffset.UTC).plusYears(1)
val formattedTime = formatTime(futureTime)
executeTest(dir, Seq(fileTime, fileTime), modifiedBefore = Some(formattedTime),
expectedCount = Some(2))
executeTest(dir, Seq(fileTime, fileTime), 2, modifiedBefore = Some(formattedTime))
}
}

Expand All @@ -154,17 +148,15 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val fileTime1 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val fileTime2 = curTime.plusDays(3)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(fileTime1, fileTime2), modifiedBefore = Some(formattedTime),
expectedCount = Some(1))
executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedBefore = Some(formattedTime))
}
}

test("SPARK-31962: modifiedBefore specified with a future date, multiple files, none valid") {
withTempDir { dir =>
val fileTime = LocalDateTime.now(ZoneOffset.UTC).minusDays(1)
val formattedTime = formatTime(fileTime)
executeTest(dir, Seq(fileTime, fileTime), modifiedBefore = Some(formattedTime),
expectedCount = None)
executeTest(dir, Seq(fileTime, fileTime), 0, modifiedBefore = Some(formattedTime))
}
}

Expand Down Expand Up @@ -233,41 +225,35 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
private def executeTest(
dir: File,
fileDates: Seq[LocalDateTime],
expectedCount: Option[Long],
expectedCount: Long,
modifiedBefore: Option[String] = None,
modifiedAfter: Option[String] = None): Unit = {
fileDates.foreach { fileDate =>
val file = createSingleFile(dir)
setFileTime(fileDate, file)
}

var dfReader = spark.read.format("csv").option("timeZone", "UTC")
val schema = StructType(Seq(StructField("a", StringType)))

var dfReader = spark.read.format("csv").option("timeZone", "UTC").schema(schema)
modifiedBefore.foreach { opt => dfReader = dfReader.option("modifiedBefore", opt) }
modifiedAfter.foreach { opt => dfReader = dfReader.option("modifiedAfter", opt) }

def assertQueryFailure(dfReader: DataFrameReader): Unit = {
val exc = intercept[AnalysisException] {
dfReader.load(dir.getCanonicalPath)
}
assert(exc.getMessage.contains("Unable to infer schema for CSV"))
}

expectedCount match {
case Some(count) =>
// without pathGlobFilter
val df1 = dfReader.load(dir.getCanonicalPath)
assert(df1.count() === count)

// pathGlobFilter matched
val df2 = dfReader.option("pathGlobFilter", "*.csv").load(dir.getCanonicalPath)
assert(df2.count() === count)

// pathGlobFilter mismatched
assertQueryFailure(dfReader.option("pathGlobFilter", "*.txt"))

case None =>
// expecting failure
assertQueryFailure(dfReader)
if (expectedCount > 0) {
// without pathGlobFilter
val df1 = dfReader.load(dir.getCanonicalPath)
assert(df1.count() === expectedCount)

// pathGlobFilter matched
val df2 = dfReader.option("pathGlobFilter", "*.csv").load(dir.getCanonicalPath)
assert(df2.count() === expectedCount)

// pathGlobFilter mismatched
val df3 = dfReader.option("pathGlobFilter", "*.txt").load(dir.getCanonicalPath)
assert(df3.count() === 0)
} else {
val df = dfReader.load(dir.getCanonicalPath)
assert(df.count() === 0)
}
}

Expand Down