Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'upstream/master' into dfWriterAudit
  • Loading branch information
gatorsmile committed Nov 12, 2016
commit 6bf0779f7216ab602da12c3837c88243ed7e888d
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ case class DataSource(
val equality = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
}.orElse {
if (allPaths.isEmpty && !format.isInstanceOf[TextFileFormat]) {
Copy link
Member

@HyukjinKwon HyukjinKwon Nov 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gatorsmile, would this be better if we explain here text data source is excluded because text datasource always uses a schema consisting of a string field if the schema is not explicitly given?

BTW, should we maybe change text.TextFileFormat to TextFileFormat https://github.com/gatorsmile/spark/blob/45110370fb1889f244a6750ef2a18dbc9f1ba9c2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L139 ?

throw new IllegalArgumentException(s"'path' is not specified")
}
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
Expand Down Expand Up @@ -370,6 +373,8 @@ case class DataSource(
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else if (allPaths.length < 1) {
throw new IllegalArgumentException("'path' is not specified")
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
Expand Down Expand Up @@ -552,220 +557,9 @@ object DataSource {
* This method extracts the `path` option and treat it as table location to build a
* [[CatalogStorageFormat]]. Note that, the `path` option is removed from options after this.
*/
def hasMetadata(path: Seq[String]): Boolean = {
path match {
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
val res = fs.exists(metadataPath)
res
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
false
}
case _ => false
}
}

/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
*
* @param checkPathExist A flag to indicate whether to check the existence of path or not.
* This flag will be set to false when we create an empty table (the
* path of the table does not exist).
*/
def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
case (dataSource: RelationProvider, None) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
case (_: RelationProvider, Some(_)) =>
throw new AnalysisException(s"$className does not allow user-specified schemas.")

// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +
"It must be specified manually")
}

HadoopFsRelation(
sparkSession,
fileCatalog,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema,
bucketSpec = None,
format,
options)

// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val globbedPaths = allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)

if (checkPathExist && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
if (checkPathExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}.toArray

// If they gave a schema, then we try and figure out the types of the partition columns
// from that schema.
val partitionSchema = userSpecifiedSchema.map { schema =>
StructType(
partitionColumns.map { c =>
// TODO: Case sensitivity.
schema
.find(_.name.toLowerCase() == c.toLowerCase())
.getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
})
}

val fileCatalog =
new ListingFileCatalog(sparkSession, globbedPaths, options, partitionSchema)

val dataSchema = userSpecifiedSchema.map { schema =>
val equality =
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
} else {
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
}

StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
}.orElse {
if (allPaths.isEmpty && !format.isInstanceOf[TextFileFormat]) {
throw new IllegalArgumentException(s"'path' is not specified")
}
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
"It must be specified manually")
}

HadoopFsRelation(
sparkSession,
fileCatalog,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)

case _ =>
throw new AnalysisException(
s"$className is not a valid Spark SQL Data Source.")
}

relation
}

/** Writes the give [[DataFrame]] out to this [[DataSource]]. */
def write(
mode: SaveMode,
data: DataFrame): BaseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}

providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, options, data)
case format: FileFormat =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val outputPath = {
val path = new Path(caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
}))
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}

val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumns, caseSensitive)

// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
resolveRelation()
.asInstanceOf[HadoopFsRelation]
.location
.partitionSpec()
.partitionColumns
.fieldNames
.toSeq
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.size > 0 && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}

// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath,
partitionColumns.map(UnresolvedAttribute.quoted),
bucketSpec,
format,
() => Unit, // No existing table needs to be refreshed.
options,
data.logicalPlan,
mode)
sparkSession.sessionState.executePlan(plan).toRdd

case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}

// We replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
val path = new CaseInsensitiveMap(options).get("path")
val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1728,4 +1728,61 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(e.contains("Codec [illegal] is not available. Known codecs are"))
}
}

test("Write dates correctly with dateFormat option") {
val customSchema = new StructType(Array(StructField("date", DateType, true)))
withTempDir { dir =>
// With dateFormat option.
val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.json"
val datesWithFormat = spark.read
.schema(customSchema)
.option("dateFormat", "dd/MM/yyyy HH:mm")
.json(datesRecords)

datesWithFormat.write
.format("json")
.option("dateFormat", "yyyy/MM/dd")
.save(datesWithFormatPath)

// This will load back the dates as string.
val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
val stringDatesWithFormat = spark.read
.schema(stringSchema)
.json(datesWithFormatPath)
val expectedStringDatesWithFormat = Seq(
Row("2015/08/26"),
Row("2014/10/27"),
Row("2016/01/28"))

checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
}
}

test("Write timestamps correctly with dateFormat option") {
val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
withTempDir { dir =>
// With dateFormat option.
val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
val timestampsWithFormat = spark.read
.schema(customSchema)
.option("timestampFormat", "dd/MM/yyyy HH:mm")
.json(datesRecords)
timestampsWithFormat.write
.format("json")
.option("timestampFormat", "yyyy/MM/dd HH:mm")
.save(timestampsWithFormatPath)

// This will load back the timestamps as string.
val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
val stringTimestampsWithFormat = spark.read
.schema(stringSchema)
.json(timestampsWithFormatPath)
val expectedStringDatesWithFormat = Seq(
Row("2015/08/26 18:00"),
Row("2014/10/27 18:30"),
Row("2016/01/28 20:00"))

checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ class CatalogSuite
}

test("createExternalTable should fail if path is not given for file-based data source") {
val e = intercept[AnalysisException] {
val e = intercept[IllegalArgumentException] {
spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String])
}
assert(e.message.contains("Unable to infer schema"))
assert(e.getMessage.contains("'path' is not specified"))

val e2 = intercept[AnalysisException] {
spark.catalog.createExternalTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,15 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
test("prevent all column partitioning") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val e = intercept[AnalysisException] {
var e = intercept[AnalysisException] {
spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
}.getMessage
assert(e.contains("Cannot use all columns for partition columns"))
e = intercept[AnalysisException] {
spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path)
}.getMessage
assert(e.contains("Cannot use all columns for partition columns"))
spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path)
}
}

Expand Down Expand Up @@ -457,6 +462,56 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
spark.read.schema(userSchema).parquet(Seq(dir, dir): _*), expData ++ expData, userSchema)
}

/**
* This only tests whether API compiles, but does not run it as orc()
* cannot be run without Hive classes.
*/
ignore("orc - API") {
// Reader, with user specified schema
// Refer to csv-specific test suites for behavior without user specified schema
spark.read.schema(userSchema).orc()
spark.read.schema(userSchema).orc(dir)
spark.read.schema(userSchema).orc(dir, dir, dir)
spark.read.schema(userSchema).orc(Seq(dir, dir): _*)
Option(dir).map(spark.read.schema(userSchema).orc)

// Writer
spark.range(10).write.orc(dir)
}

test("column nullability and comment - write and then read") {
import testImplicits._

Seq("json", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
}
}
}

test("SPARK-17230: write out results of decimal calculation") {
val df = spark.range(99, 101)
.selectExpr("id", "cast(id as long) * cast('1.0' as decimal(38, 18)) as num")
df.write.mode(SaveMode.Overwrite).parquet(dir)
val df2 = spark.read.parquet(dir)
checkAnswer(df2, df)
}

private def testRead(
df: => DataFrame,
expectedResult: Seq[String],
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.