diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index d31f3fb8f604..167833488980 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -158,7 +158,8 @@ def load(self, path=None, format=None, schema=None, **options):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
+ timeZone=None):
"""
Loads a JSON file (`JSON Lines text format or newline-delimited JSON
`_) or an RDD of Strings storing JSON objects (one object per
@@ -204,11 +205,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -225,7 +228,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
- timestampFormat=timestampFormat)
+ timestampFormat=timestampFormat, timeZone=timeZone)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -298,7 +301,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
- maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -341,11 +344,11 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -357,6 +360,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
uses the default value, ``10``.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields.
@@ -374,7 +379,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
- maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
@@ -591,7 +596,8 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
self._jwrite.saveAsTable(name)
@since(1.4)
- def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
+ def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
+ timeZone=None):
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -607,17 +613,20 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ :param timeZone: sets the string that indicates a timezone to be used to format timestamps.
+ If None is set, it uses the default value, session local timezone.
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
- compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
+ compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
+ timeZone=timeZone)
self._jwrite.json(path)
@since(1.4)
@@ -664,7 +673,7 @@ def text(self, path, compression=None):
@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
- timestampFormat=None):
+ timestampFormat=None, timeZone=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -699,18 +708,20 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
- dateFormat=dateFormat, timestampFormat=timestampFormat)
+ dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone)
self._jwrite.csv(path)
@since(1.5)
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a10b185cd4c7..d988e596a86d 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -429,7 +429,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
- timestampFormat=None):
+ timestampFormat=None, timeZone=None):
"""
Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
`_) and returns a :class`DataFrame`.
@@ -476,11 +476,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
@@ -494,7 +496,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
- timestampFormat=timestampFormat)
+ timestampFormat=timestampFormat, timeZone=timeZone)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
@@ -552,7 +554,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
- maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -597,11 +599,11 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -609,6 +611,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
``-1`` meaning unlimited length.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields.
@@ -628,7 +632,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
- maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c410e7919a35..bd852a50fe71 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -482,19 +482,29 @@ case class JsonTuple(children: Seq[Expression])
/**
* Converts an json input string to a [[StructType]] with the specified schema.
*/
-case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
- extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+case class JsonToStruct(
+ schema: StructType,
+ options: Map[String, String],
+ child: Expression,
+ timeZoneId: Option[String] = None)
+ extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true
+ def this(schema: StructType, options: Map[String, String], child: Expression) =
+ this(schema, options, child, None)
+
@transient
lazy val parser =
new JacksonParser(
schema,
"invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
- new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+ new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
override def dataType: DataType = schema
+ override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+ copy(timeZoneId = Option(timeZoneId))
+
override def nullSafeEval(json: Any): Any = {
try parser.parse(json.toString).headOption.orNull catch {
case _: SparkSQLJsonProcessingException => null
@@ -507,10 +517,15 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:
/**
* Converts a [[StructType]] to a json output string.
*/
-case class StructToJson(options: Map[String, String], child: Expression)
- extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+case class StructToJson(
+ options: Map[String, String],
+ child: Expression,
+ timeZoneId: Option[String] = None)
+ extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true
+ def this(options: Map[String, String], child: Expression) = this(options, child, None)
+
@transient
lazy val writer = new CharArrayWriter()
@@ -519,7 +534,7 @@ case class StructToJson(options: Map[String, String], child: Expression)
new JacksonGenerator(
child.dataType.asInstanceOf[StructType],
writer,
- new JSONOptions(options))
+ new JSONOptions(options, timeZoneId.get))
override def dataType: DataType = StringType
@@ -538,6 +553,9 @@ case class StructToJson(options: Map[String, String], child: Expression)
}
}
+ override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+ copy(timeZoneId = Option(timeZoneId))
+
override def nullSafeEval(row: Any): Any = {
gen.write(row.asInstanceOf[InternalRow])
gen.flush()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 02bd8dede43c..5307ce1cb711 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.json
-import java.util.Locale
+import java.util.{Locale, TimeZone}
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat
@@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
private[sql] class JSONOptions(
- @transient private val parameters: CaseInsensitiveMap[String])
+ @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String)
extends Logging with Serializable {
- def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ def this(parameters: Map[String, String], defaultTimeZoneId: String) =
+ this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -58,13 +59,15 @@ private[sql] class JSONOptions(
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
+ val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
+
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
- parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US)
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US)
// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index bf8e3c812ee8..dec55279c9fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types._
private[sql] class JacksonGenerator(
schema: StructType,
writer: Writer,
- options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
+ options: JSONOptions) {
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
// JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
// we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 8e20bd1d9724..0c46819cdb9c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -17,10 +17,12 @@
package org.apache.spark.sql.catalyst.expressions
+import java.util.Calendar
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.ParseModes
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -305,51 +307,53 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("json_tuple - hive key 4 - null json") {
checkJsonTuple(
JsonTuple(Literal(null) :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - hive key 5 - null and empty fields") {
checkJsonTuple(
JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, null)))
+ InternalRow(UTF8String.fromString(""), null, null, null, null))
}
test("json_tuple - hive key 6 - invalid json (array)") {
checkJsonTuple(
JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - invalid json (object start only)") {
checkJsonTuple(
JsonTuple(Literal("{") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - invalid json (no object end)") {
checkJsonTuple(
JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - invalid json (invalid json)") {
checkJsonTuple(
JsonTuple(Literal("\\") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - preserve newlines") {
checkJsonTuple(
JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
- InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
+ InternalRow(UTF8String.fromString("b\nc")))
}
+ val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
test("from_json") {
val jsonData = """{"a": 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData)),
- InternalRow.fromSeq(1 :: Nil)
+ JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+ InternalRow(1)
)
}
@@ -357,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val jsonData = """{"a" 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData)),
+ JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
null
)
// Other modes should still return `null`.
checkEvaluation(
- JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
+ JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
null
)
}
@@ -371,15 +375,58 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("from_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal.create(null, StringType)),
+ JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId),
null
)
}
+ test("from_json with timestamp") {
+ val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+ val jsonData1 = """{"t": "2016-01-01T00:00:00.123Z"}"""
+ var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+ c.set(2016, 0, 1, 0, 0, 0)
+ c.set(Calendar.MILLISECOND, 123)
+ checkEvaluation(
+ JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId),
+ InternalRow(c.getTimeInMillis * 1000L)
+ )
+ // The result doesn't change because the json string includes timezone string ("Z" here),
+ // which means the string represents the timestamp string in the timezone regardless of
+ // the timeZoneId parameter.
+ checkEvaluation(
+ JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")),
+ InternalRow(c.getTimeInMillis * 1000L)
+ )
+
+ val jsonData2 = """{"t": "2016-01-01T00:00:00"}"""
+ for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
+ c = Calendar.getInstance(tz)
+ c.set(2016, 0, 1, 0, 0, 0)
+ c.set(Calendar.MILLISECOND, 0)
+ checkEvaluation(
+ JsonToStruct(
+ schema,
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
+ Literal(jsonData2),
+ Option(tz.getID)),
+ InternalRow(c.getTimeInMillis * 1000L)
+ )
+ checkEvaluation(
+ JsonToStruct(
+ schema,
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID),
+ Literal(jsonData2),
+ gmtId),
+ InternalRow(c.getTimeInMillis * 1000L)
+ )
+ }
+ }
+
test("SPARK-19543: from_json empty input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal.create(" ", StringType)),
+ JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId),
null
)
}
@@ -388,7 +435,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(create_row(1), schema)
checkEvaluation(
- StructToJson(Map.empty, struct),
+ StructToJson(Map.empty, struct, gmtId),
"""{"a":1}"""
)
}
@@ -397,8 +444,40 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(null, schema)
checkEvaluation(
- StructToJson(Map.empty, struct),
+ StructToJson(Map.empty, struct, gmtId),
null
)
}
+
+ test("to_json with timestamp") {
+ val schema = StructType(StructField("t", TimestampType) :: Nil)
+ val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+ c.set(2016, 0, 1, 0, 0, 0)
+ c.set(Calendar.MILLISECOND, 0)
+ val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema)
+
+ checkEvaluation(
+ StructToJson(Map.empty, struct, gmtId),
+ """{"t":"2016-01-01T00:00:00.000Z"}"""
+ )
+ checkEvaluation(
+ StructToJson(Map.empty, struct, Option("PST")),
+ """{"t":"2015-12-31T16:00:00.000-08:00"}"""
+ )
+
+ checkEvaluation(
+ StructToJson(
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> gmtId.get),
+ struct,
+ gmtId),
+ """{"t":"2016-01-01T00:00:00"}"""
+ )
+ checkEvaluation(
+ StructToJson(
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"),
+ struct,
+ gmtId),
+ """{"t":"2015-12-31T16:00:00"}"""
+ )
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1830839aeebb..780fe51ac699 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -27,6 +27,7 @@ import org.apache.spark.Partition
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
@@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*
`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.
+ * `timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.
*
*
* @since 2.0.0
@@ -329,7 +332,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def json(jsonRDD: RDD[String]): DataFrame = {
- val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone)
val columnNameOfCorruptRecord =
parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -401,6 +405,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.
+ * `timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.
* `maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.
* `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 81657d9e47fe..55f7ba2359bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -492,6 +492,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.
+ * `timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to format timestamps.
*
*
* @since 1.4.0
@@ -598,6 +600,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.
+ * `timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to format timestamps.
*
*
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ce6e8be8b0ab..7fe51c9e8c4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.json.JacksonGenerator
+import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans._
@@ -2670,10 +2670,12 @@ class Dataset[T] private[sql](
*/
def toJSON: Dataset[String] = {
val rowSchema = this.schema
+ val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone
val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
- val gen = new JacksonGenerator(rowSchema, writer)
+ val gen = new JacksonGenerator(rowSchema, writer,
+ new JSONOptions(Map.empty[String, String], sessionLocalTimeZone))
new Iterator[String] {
override def hasNext: Boolean = iter.hasNext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 1d2bf07047a2..566f40f45439 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.sources._
@@ -55,7 +55,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
files: Seq[FileStatus]): Option[StructType] = {
require(files.nonEmpty, "Cannot infer schema from an empty set of files")
- val csvOptions = new CSVOptions(options)
+ val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val paths = files.map(_.getPath.toString)
val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, paths)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -69,7 +69,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
dataSchema: StructType): OutputWriterFactory = {
CSVUtils.verifySchema(dataSchema)
val conf = job.getConfiguration
- val csvOptions = new CSVOptions(options)
+ val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
csvOptions.compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
@@ -96,7 +96,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
- val csvOptions = new CSVOptions(options)
+ val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 9d79ea6ed178..b7fbaa4f44a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.StandardCharsets
-import java.util.Locale
+import java.util.{Locale, TimeZone}
import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
import org.apache.commons.lang3.time.FastDateFormat
@@ -26,10 +26,12 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
-private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap[String])
+private[csv] class CSVOptions(
+ @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String)
extends Logging with Serializable {
- def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ def this(parameters: Map[String, String], defaultTimeZoneId: String) =
+ this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
private def getChar(paramName: String, default: Char): Char = {
val paramValue = parameters.get(paramName)
@@ -106,13 +108,15 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive
name.map(CompressionCodecs.getCodecClassName)
}
+ val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
+
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
- parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US)
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US)
val maxColumns = getInt("maxColumns", 20480)
@@ -161,12 +165,3 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive
settings
}
}
-
-object CSVOptions {
-
- def apply(): CSVOptions = new CSVOptions(CaseInsensitiveMap(Map.empty))
-
- def apply(paramName: String, paramValue: String): CSVOptions = {
- new CSVOptions(Map(paramName -> paramValue))
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
index ee79138c0f19..4082a0df8ba7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types._
private[csv] class UnivocityGenerator(
schema: StructType,
writer: Writer,
- options: CSVOptions = new CSVOptions(Map.empty[String, String])) {
+ options: CSVOptions) {
private val writerSettings = options.asWriterSettings
writerSettings.setHeaders(schema.fieldNames: _*)
private val gen = new CsvWriter(writer, writerSettings)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 3b42aa60b024..2e409b3f5fbf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -76,7 +76,7 @@ private[csv] class UnivocityParser(
name: String,
dataType: DataType,
nullable: Boolean = true,
- options: CSVOptions = CSVOptions()): ValueConverter = dataType match {
+ options: CSVOptions): ValueConverter = dataType match {
case _: ByteType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(_.toByte)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 98ab9d285000..b4a8ff2cf01a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -47,7 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
if (files.isEmpty) {
None
} else {
- val parsedOptions: JSONOptions = new JSONOptions(options)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val columnNameOfCorruptRecord =
parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -67,7 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val conf = job.getConfiguration
- val parsedOptions: JSONOptions = new JSONOptions(options)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
parsedOptions.compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
@@ -97,7 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
- val parsedOptions: JSONOptions = new JSONOptions(options)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index b7ffb3cddb47..4e706da184c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -181,6 +181,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.
+ * `timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.
*
*
* @since 2.0.0
@@ -230,6 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.
+ * `timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.
* `maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.
* `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index d8c6c2550478..661742087112 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.types._
class CSVInferSchemaSuite extends SparkFunSuite {
test("String fields types are inferred correctly from null types") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(NullType, "", options) == NullType)
assert(CSVInferSchema.inferField(NullType, null, options) == NullType)
assert(CSVInferSchema.inferField(NullType, "100000000000", options) == LongType)
@@ -41,7 +41,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("String fields types are inferred correctly from other types") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType)
assert(CSVInferSchema.inferField(LongType, "test", options) == StringType)
assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == DoubleType)
@@ -60,21 +60,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("Timestamp field types are inferred correctly via custom data format") {
- var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"))
+ var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
- options = new CSVOptions(Map("timestampFormat" -> "yyyy"))
+ options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
}
test("Timestamp field types are inferred correctly from other types") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)
assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType)
assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType)
}
test("Boolean fields types are inferred correctly from other types") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType)
assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == StringType)
}
@@ -92,12 +92,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("Null fields are handled properly when a nullValue is specified") {
- var options = new CSVOptions(Map("nullValue" -> "null"))
+ var options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
assert(CSVInferSchema.inferField(NullType, "null", options) == NullType)
assert(CSVInferSchema.inferField(StringType, "null", options) == StringType)
assert(CSVInferSchema.inferField(LongType, "null", options) == LongType)
- options = new CSVOptions(Map("nullValue" -> "\\N"))
+ options = new CSVOptions(Map("nullValue" -> "\\N"), "GMT")
assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType)
assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType)
assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType)
@@ -111,12 +111,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
- val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"))
+ val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
}
test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
// 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9).
assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) ==
@@ -134,7 +134,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
test("DoubleType should be infered when user defined nan/inf are provided") {
val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf",
- "positiveInf" -> "inf"))
+ "positiveInf" -> "inf"), "GMT")
assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType)
assert(CSVInferSchema.inferField(NullType, "inf", options) == DoubleType)
assert(CSVInferSchema.inferField(NullType, "-inf", options) == DoubleType)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index df9cebbe58d5..0c9a7298c3fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -839,7 +839,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}
- test("Write timestamps correctly with dateFormat option") {
+ test("Write timestamps correctly with timestampFormat option") {
withTempDir { dir =>
// With dateFormat option.
val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
@@ -870,6 +870,48 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}
+ test("Write timestamps correctly with timestampFormat option and timeZone option") {
+ withTempDir { dir =>
+ // With dateFormat option and timeZone option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
+ val timestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ timestampsWithFormat.write
+ .format("csv")
+ .option("header", "true")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .option("timeZone", "GMT")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringTimestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(timestampsWithFormatPath)
+ val expectedStringTimestampsWithFormat = Seq(
+ Row("2015/08/27 01:00"),
+ Row("2014/10/28 01:30"),
+ Row("2016/01/29 04:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat)
+
+ val readBack = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .option("timeZone", "GMT")
+ .load(timestampsWithFormatPath)
+
+ checkAnswer(readBack, timestampsWithFormat)
+ }
+ }
+
test("load duplicated field names consistently with null or empty strings - case sensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
withTempPath { path =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
index 62dae08861df..a74b22a4a88a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.unsafe.types.UTF8String
class UnivocityParserSuite extends SparkFunSuite {
private val parser =
- new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String]))
+ new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String], "GMT"))
private def assertNull(v: Any) = assert(v == null)
@@ -38,7 +38,8 @@ class UnivocityParserSuite extends SparkFunSuite {
stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) =>
val decimalValue = new BigDecimal(decimalVal.toString)
- assert(parser.makeConverter("_1", decimalType).apply(strVal) ===
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
+ assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) ===
Decimal(decimalValue, decimalType.precision, decimalType.scale))
}
}
@@ -50,20 +51,23 @@ class UnivocityParserSuite extends SparkFunSuite {
// Nullable field with nullValue option.
types.foreach { t =>
// Tests that a custom nullValue.
+ val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), "GMT")
val converter =
- parser.makeConverter("_1", t, nullable = true, CSVOptions("nullValue", "-"))
+ parser.makeConverter("_1", t, nullable = true, options = nullValueOptions)
assertNull(converter.apply("-"))
assertNull(converter.apply(null))
// Tests that the default nullValue is empty string.
- assertNull(parser.makeConverter("_1", t, nullable = true).apply(""))
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
+ assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply(""))
}
// Not nullable field with nullValue option.
types.foreach { t =>
// Casts a null to not nullable field should throw an exception.
+ val options = new CSVOptions(Map("nullValue" -> "-"), "GMT")
val converter =
- parser.makeConverter("_1", t, nullable = false, CSVOptions("nullValue", "-"))
+ parser.makeConverter("_1", t, nullable = false, options = options)
var message = intercept[RuntimeException] {
converter.apply("-")
}.getMessage
@@ -77,48 +81,52 @@ class UnivocityParserSuite extends SparkFunSuite {
// If nullValue is different with empty string, then, empty string should not be casted into
// null.
Seq(true, false).foreach { b =>
+ val options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
val converter =
- parser.makeConverter("_1", StringType, nullable = b, CSVOptions("nullValue", "null"))
+ parser.makeConverter("_1", StringType, nullable = b, options = options)
assert(converter.apply("") == UTF8String.fromString(""))
}
}
test("Throws exception for empty string with non null type") {
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
val exception = intercept[RuntimeException]{
- parser.makeConverter("_1", IntegerType, nullable = false, CSVOptions()).apply("")
+ parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("")
}
assert(exception.getMessage.contains("null value found but field _1 is not nullable."))
}
test("Types are cast correctly") {
- assert(parser.makeConverter("_1", ByteType).apply("10") == 10)
- assert(parser.makeConverter("_1", ShortType).apply("10") == 10)
- assert(parser.makeConverter("_1", IntegerType).apply("10") == 10)
- assert(parser.makeConverter("_1", LongType).apply("10") == 10)
- assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0)
- assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0)
- assert(parser.makeConverter("_1", BooleanType).apply("true") == true)
-
- val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm")
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
+ assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10)
+ assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10)
+ assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10)
+ assert(parser.makeConverter("_1", LongType, options = options).apply("10") == 10)
+ assert(parser.makeConverter("_1", FloatType, options = options).apply("1.00") == 1.0)
+ assert(parser.makeConverter("_1", DoubleType, options = options).apply("1.00") == 1.0)
+ assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true)
+
+ val timestampsOptions =
+ new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT")
val customTimestamp = "31/01/2015 00:00"
val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
val castedTimestamp =
- parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions)
+ parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
.apply(customTimestamp)
assert(castedTimestamp == expectedTime * 1000L)
val customDate = "31/01/2015"
- val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy")
+ val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT")
val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
val castedDate =
- parser.makeConverter("_1", DateType, nullable = true, dateOptions)
+ parser.makeConverter("_1", DateType, nullable = true, options = dateOptions)
.apply(customTimestamp)
assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
val timestamp = "2015-01-01 00:00:00"
- assert(parser.makeConverter("_1", TimestampType).apply(timestamp) ==
+ assert(parser.makeConverter("_1", TimestampType, options = options).apply(timestamp) ==
DateTimeUtils.stringToTime(timestamp).getTime * 1000L)
- assert(parser.makeConverter("_1", DateType).apply("2015-01-01") ==
+ assert(parser.makeConverter("_1", DateType, options = options).apply("2015-01-01") ==
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime))
}
@@ -127,16 +135,18 @@ class UnivocityParserSuite extends SparkFunSuite {
try {
Locale.setDefault(new Locale("fr", "FR"))
// Would parse as 1.0 in fr-FR
- assert(parser.makeConverter("_1", FloatType).apply("1,00") == 100.0)
- assert(parser.makeConverter("_1", DoubleType).apply("1,00") == 100.0)
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
+ assert(parser.makeConverter("_1", FloatType, options = options).apply("1,00") == 100.0)
+ assert(parser.makeConverter("_1", DoubleType, options = options).apply("1,00") == 100.0)
} finally {
Locale.setDefault(originalLocale)
}
}
test("Float NaN values are parsed correctly") {
+ val options = new CSVOptions(Map("nanValue" -> "nn"), "GMT")
val floatVal: Float = parser.makeConverter(
- "_1", FloatType, nullable = true, CSVOptions("nanValue", "nn")
+ "_1", FloatType, nullable = true, options = options
).apply("nn").asInstanceOf[Float]
// Java implements the IEEE-754 floating point standard which guarantees that any comparison
@@ -145,36 +155,41 @@ class UnivocityParserSuite extends SparkFunSuite {
}
test("Double NaN values are parsed correctly") {
+ val options = new CSVOptions(Map("nanValue" -> "-"), "GMT")
val doubleVal: Double = parser.makeConverter(
- "_1", DoubleType, nullable = true, CSVOptions("nanValue", "-")
+ "_1", DoubleType, nullable = true, options = options
).apply("-").asInstanceOf[Double]
assert(doubleVal.isNaN)
}
test("Float infinite values can be parsed") {
+ val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
val floatVal1 = parser.makeConverter(
- "_1", FloatType, nullable = true, CSVOptions("negativeInf", "max")
+ "_1", FloatType, nullable = true, options = negativeInfOptions
).apply("max").asInstanceOf[Float]
assert(floatVal1 == Float.NegativeInfinity)
+ val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
val floatVal2 = parser.makeConverter(
- "_1", FloatType, nullable = true, CSVOptions("positiveInf", "max")
+ "_1", FloatType, nullable = true, options = positiveInfOptions
).apply("max").asInstanceOf[Float]
assert(floatVal2 == Float.PositiveInfinity)
}
test("Double infinite values can be parsed") {
+ val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
val doubleVal1 = parser.makeConverter(
- "_1", DoubleType, nullable = true, CSVOptions("negativeInf", "max")
+ "_1", DoubleType, nullable = true, options = negativeInfOptions
).apply("max").asInstanceOf[Double]
assert(doubleVal1 == Double.NegativeInfinity)
+ val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
val doubleVal2 = parser.makeConverter(
- "_1", DoubleType, nullable = true, CSVOptions("positiveInf", "max")
+ "_1", DoubleType, nullable = true, options = positiveInfOptions
).apply("max").asInstanceOf[Double]
assert(doubleVal2 == Double.PositiveInfinity)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 156fd965b468..9344aeda0017 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -62,7 +62,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
generator.flush()
}
- val dummyOption = new JSONOptions(Map.empty[String, String])
+ val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
val dummySchema = StructType(Seq.empty)
val parser = new JacksonParser(dummySchema, "", dummyOption)
@@ -1366,7 +1366,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
// This is really a test that it doesn't throw an exception
- val emptySchema = JsonInferSchema.infer(empty, "", new JSONOptions(Map.empty[String, String]))
+ val emptySchema = JsonInferSchema.infer(
+ empty, "", new JSONOptions(Map.empty[String, String], "GMT"))
assert(StructType(Seq()) === emptySchema)
}
@@ -1391,7 +1392,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-8093 Erase empty structs") {
val emptySchema = JsonInferSchema.infer(
- emptyRecords, "", new JSONOptions(Map.empty[String, String]))
+ emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT"))
assert(StructType(Seq()) === emptySchema)
}
@@ -1723,7 +1724,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
- test("Write timestamps correctly with dateFormat option") {
+ test("Write timestamps correctly with timestampFormat option") {
val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
withTempDir { dir =>
// With dateFormat option.
@@ -1751,6 +1752,43 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
+ test("Write timestamps correctly with timestampFormat option and timeZone option") {
+ val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
+ withTempDir { dir =>
+ // With dateFormat option and timeZone option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+ val timestampsWithFormat = spark.read
+ .schema(customSchema)
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .json(datesRecords)
+ timestampsWithFormat.write
+ .format("json")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .option("timeZone", "GMT")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+ val stringTimestampsWithFormat = spark.read
+ .schema(stringSchema)
+ .json(timestampsWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/27 01:00"),
+ Row("2014/10/28 01:30"),
+ Row("2016/01/29 04:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
+
+ val readBack = spark.read
+ .schema(customSchema)
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .option("timeZone", "GMT")
+ .json(timestampsWithFormatPath)
+
+ checkAnswer(readBack, timestampsWithFormat)
+ }
+ }
+
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
val records = sparkContext
.parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 76ffb949f129..9b5e364e512a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -19,11 +19,15 @@ package org.apache.spark.sql.sources
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
class ResolvedDataSourceSuite extends SparkFunSuite {
private def getProvidingClass(name: String): Class[_] =
- DataSource(sparkSession = null, className = name).providingClass
+ DataSource(
+ sparkSession = null,
+ className = name,
+ options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass
test("jdbc") {
assert(