Skip to content
Merged

sync #12

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/webui.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
Expand Down
7 changes: 4 additions & 3 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,13 @@ Here are the details of all the sources in Spark.
<br/>
<code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
<br/>
<code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
<br/>
"file:///dataset.txt"<br/>
"s3://a/dataset.txt"<br/>
"s3n://a/b/dataset.txt"<br/>
"s3a://a/b/c/dataset.txt"<br/>
"s3a://a/b/c/dataset.txt"
<br/>
<code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
<br/>
<code>cleanSource</code>: option to clean up completed files after processing.<br/>
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@ class Iso8601DateFormatter(
try {
val localDate = toLocalDate(formatter.parse(s))
localDateToDays(localDate)
} catch checkDiffResult(s, legacyFormatter.parse)
} catch checkParsedDiff(s, legacyFormatter.parse)
}
}

override def format(localDate: LocalDate): String = {
localDate.format(formatter)
try {
localDate.format(formatter)
} catch checkFormattedDiff(toJavaDate(localDateToDays(localDate)),
(d: Date) => format(d))
}

override def format(days: Int): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.time._
import java.time.chrono.IsoChronology
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle}
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.Locale
import java.util.{Date, Locale}

import com.google.common.cache.CacheBuilder

Expand Down Expand Up @@ -109,13 +109,17 @@ trait DateTimeFormatterHelper {
formatter
}

private def needConvertToSparkUpgradeException(e: Throwable): Boolean = e match {
case _: DateTimeException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION => true
case _ => false
}
// When legacy time parser policy set to EXCEPTION, check whether we will get different results
// between legacy parser and new parser. If new parser fails but legacy parser works, throw a
// SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED,
// DateTimeParseException will address by the caller side.
protected def checkDiffResult[T](
protected def checkParsedDiff[T](
s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = {
case e: DateTimeException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION =>
case e if needConvertToSparkUpgradeException(e) =>
try {
legacyParseFunc(s)
} catch {
Expand All @@ -126,6 +130,25 @@ trait DateTimeFormatterHelper {
s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e)
}

// When legacy time parser policy set to EXCEPTION, check whether we will get different results
// between legacy formatter and new formatter. If new formatter fails but legacy formatter works,
// throw a SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED,
// DateTimeParseException will address by the caller side.
protected def checkFormattedDiff[T <: Date](
d: T,
legacyFormatFunc: T => String): PartialFunction[Throwable, String] = {
case e if needConvertToSparkUpgradeException(e) =>
val resultCandidate = try {
legacyFormatFunc(d)
} catch {
case _: Throwable => throw e
}
throw new SparkUpgradeException("3.0", s"Fail to format it to '$resultCandidate' in the new" +
s" formatter. You can set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore" +
" the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid" +
" datetime string.", e)
}

/**
* When the new DateTimeFormatter failed to initialize because of invalid datetime pattern, it
* will throw IllegalArgumentException. If the pattern can be recognized by the legacy formatter
Expand All @@ -137,7 +160,6 @@ trait DateTimeFormatterHelper {
* @param tryLegacyFormatter a func to capture exception, identically which forces a legacy
* datetime formatter to be initialized
*/

protected def checkLegacyFormatter(
pattern: String,
tryLegacyFormatter: => Unit): PartialFunction[Throwable, DateTimeFormatter] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,15 @@ class Iso8601TimestampFormatter(
val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)

Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
} catch checkDiffResult(s, legacyFormatter.parse)
} catch checkParsedDiff(s, legacyFormatter.parse)
}
}

override def format(instant: Instant): String = {
formatter.withZone(zoneId).format(instant)
try {
formatter.withZone(zoneId).format(instant)
} catch checkFormattedDiff(toJavaTimestamp(instantToMicros(instant)),
(t: Timestamp) => format(t))
}

override def format(us: Long): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,4 +418,19 @@ class TimestampFormatterSuite extends DatetimeFormatterSuite {
val t5 = f3.parse("AM")
assert(t5 === date(1970))
}

test("check result differences for datetime formatting") {
val formatter = TimestampFormatter("DD", UTC, isParsing = false)
assert(formatter.format(date(1970, 1, 3)) == "03")
assert(formatter.format(date(1970, 4, 9)) == "99")

if (System.getProperty("java.version").split("\\D+")(0).toInt < 9) {
// https://bugs.openjdk.java.net/browse/JDK-8079628
intercept[SparkUpgradeException] {
formatter.format(date(1970, 4, 10))
}
} else {
assert(formatter.format(date(1970, 4, 10)) == "100")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

/** A singleton object for the master program. The slaves should not access this. */
Expand All @@ -45,6 +46,8 @@ private[hive] object SparkSQLEnv extends Logging {

sparkConf
.setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}"))
.set(SQLConf.DATETIME_JAVA8API_ENABLED, true)


val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate()
sparkContext = sparkSession.sparkContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
import org.apache.spark.sql.internal.SQLConf


private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext)
Expand Down Expand Up @@ -63,6 +64,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
sqlContext.newSession()
}
ctx.setConf(HiveUtils.FAKE_HIVE_VERSION.key, HiveUtils.builtinHiveVersion)
ctx.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, true)
val hiveSessionState = session.getSessionState
setConfMap(ctx, hiveSessionState.getOverriddenConfigurations)
setConfMap(ctx, hiveSessionState.getHiveVariables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,10 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
errorResponses = Seq("AnalysisException"))(
("", "Error in query: The second argument of 'date_sub' function needs to be an integer."))
}

test("SPARK-30808: use Java 8 time API in Thrift SQL CLI by default") {
// If Java 8 time API is enabled via the SQL config `spark.sql.datetime.java8API.enabled`,
// the date formatter for `java.sql.LocalDate` must output negative years with sign.
runCliWithin(1.minute)("SELECT MAKE_DATE(-44, 3, 15);" -> "-0044-03-15")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,18 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
}
}

test("SPARK-30808: use Java 8 time API and Proleptic Gregorian calendar by default") {
withJdbcStatement() { st =>
// Proleptic Gregorian calendar has no gap in the range 1582-10-04..1582-10-15
val date = "1582-10-10"
val rs = st.executeQuery(s"select date '$date'")
rs.next()
val expected = java.sql.Date.valueOf(date)
assert(rs.getDate(1) === expected)
assert(rs.getString(1) === expected.toString)
}
}
}

class SingleSessionSuite extends HiveThriftJdbcTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
.map(col => col.getName).toSet

def unapply(attr: Attribute): Option[String] = {
if (varcharKeys.contains(attr.name)) {
val resolver = SQLConf.get.resolver
if (varcharKeys.exists(c => resolver(c, attr.name))) {
None
} else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType) {
Some(attr.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2720,4 +2720,14 @@ class HiveDDLSuite
checkAnswer(sql("SHOW PARTITIONS ta_part"), Row("ts=10") :: Nil)
}
}

test("SPARK-31904: Fix case sensitive problem of char and varchar partition columns") {
withTable("t1", "t2") {
sql("CREATE TABLE t1(a STRING, B VARCHAR(10), C CHAR(10)) STORED AS parquet")
sql("CREATE TABLE t2 USING parquet PARTITIONED BY (b, c) AS SELECT * FROM t1")
// make sure there is no exception
assert(sql("SELECT * FROM t2 WHERE b = 'A'").collect().isEmpty)
assert(sql("SELECT * FROM t2 WHERE c = 'A'").collect().isEmpty)
}
}
}
Loading