Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,18 @@ class SessionCatalog(
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
validateName(table)
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))

val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be easier if locationUri is of type URI?

Copy link
Contributor Author

@windpiger windpiger Mar 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the URI without schema is also legal, this fix also needed even if it is a URI.
while if it is a URI, we can do this when the URI created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but why we have to store the full qualified path? What can we gain from this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the location without schema like hdfs/file, when we restore it from metastore, we did not know what filesystem where the table stored.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we apply it to all locations like database location, partition location?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they should all be applied this logic~
database has already contain this logic, shall I add the logic of partition in another pr?

// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get).toString
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation.toString)),
identifier = TableIdentifier(table, Some(db)))
} else {
tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
}

requireDbExists(db)
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
import java.io.File
import java.net.URI

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkFunSuite
Expand Down Expand Up @@ -459,7 +460,9 @@ class CatalogSuite
options = Map("path" -> dir.getAbsolutePath))
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.tableType == CatalogTableType.EXTERNAL)
assert(table.storage.locationUri.get == dir.getAbsolutePath)
val dirPath = new Path(dir.getAbsolutePath)
val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
assert(new Path(table.storage.locationUri.get) == fs.makeQualified(dirPath))

Seq((1)).toDF("i").write.insertInto("t")
assert(dir.exists() && dir.listFiles().nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '/tmp/path')
""".stripMargin)
assert(getPathOption("src") == Some("/tmp/path"))
assert(getPathOption("src") == Some("file:/tmp/path"))
}

// should exist even path option is not specified when creating table
Expand All @@ -85,15 +85,17 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
test("path option also exist for write path") {
withTable("src") {
withTempPath { p =>
val path = new Path(p.getAbsolutePath).toString
val path = new Path(p.getAbsolutePath)
sql(
s"""
|CREATE TABLE src
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '$path')
|AS SELECT 1
""".stripMargin)
assert(spark.table("src").schema.head.metadata.getString("path") == path)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
assert(new Path(spark.table("src").schema.head.metadata.getString("path")) ==
fs.makeQualified(path))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
Expand Down Expand Up @@ -140,7 +142,9 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(hiveTable.storage.serde === Some(serde))

assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
assert(hiveTable.storage.locationUri === Some(path.toString))
val dirPath = new Path(dir.getAbsolutePath)
val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
assert(hiveTable.storage.locationUri === Some(fs.makeQualified(dirPath).toString))

val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,20 +651,19 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
import spark.implicits._

val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
val fs = tPath.getFileSystem(spark.sessionState.newHadoopConf())
Seq("1").toDF("a").write.saveAsTable("t")
val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))

assert(table.location.stripSuffix("/") == expectedPath)
assert(new Path(table.location) == fs.makeQualified(tPath))
assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
checkAnswer(spark.table("t"), Row("1") :: Nil)

val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
spark.sql("create table t1 using parquet as select 2 as a")
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"

assert(table1.location.stripSuffix("/") == expectedPath1)
assert(new Path(table1.location) == fs.makeQualified(t1Path))
assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
checkAnswer(spark.table("t1"), Row(2) :: Nil)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1587,4 +1587,33 @@ class HiveDDLSuite
}
}
}

test("the qualified path of a datasource table is stored in the catalog") {
withTable("t", "t1") {
withTempDir { dir =>
assert(!dir.getAbsolutePath.startsWith("file:/"))
spark.sql(
s"""
|CREATE TABLE t(a string)
|USING parquet
|LOCATION '$dir'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location.startsWith("file:/"))
}

withTempDir { dir =>
assert(!dir.getAbsolutePath.startsWith("file:/"))
spark.sql(
s"""
|CREATE TABLE t1(a string, b string)
|USING parquet
|PARTITIONED BY(b)
|LOCATION '$dir'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location.startsWith("file:/"))
}
}
}
}