Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,19 @@ class SessionCatalog(
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
validateName(table)
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))

val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = TableIdentifier(table, Some(db)))
} else {
tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
}

requireDbExists(db)
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}

private def getDBPath(dbName: String): URI = {
val warehousePath = s"file:${spark.sessionState.conf.warehousePath.stripPrefix("file:")}"
new Path(warehousePath, s"$dbName.db").toUri
val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just write spark.sessionState.conf.warehousePath, no need to wrap it with s""

new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
}

test("the qualified path of a database is stored in the catalog") {
Expand Down Expand Up @@ -1360,7 +1360,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
val partitionLocation = if (isUsingHiveMetastore) {
val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
assert(tableLocation.isDefined)
makeQualifiedPath(new Path(tableLocation.get.toString, "paris"))
makeQualifiedPath(new Path(tableLocation.get.toString, "paris").toString)
} else {
new URI("paris")
}
Expand Down Expand Up @@ -1909,7 +1909,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == new URI(dir.getAbsolutePath))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

dir.delete
assert(!dir.exists)
Expand Down Expand Up @@ -1950,7 +1950,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|LOCATION "$dir"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == new URI(dir.getAbsolutePath))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
Expand All @@ -1976,7 +1976,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == new URI(dir.getAbsolutePath))

assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

dir.delete()
checkAnswer(spark.table("t"), Nil)
Expand Down Expand Up @@ -2032,7 +2033,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == new URI(dir.getAbsolutePath))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
Expand All @@ -2051,7 +2052,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == new URI(dir.getAbsolutePath))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

val partDir = new File(dir, "a=3")
assert(partDir.exists())
Expand Down Expand Up @@ -2099,7 +2100,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == new Path(loc.getAbsolutePath).toUri)
assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))

assert(loc.listFiles().isEmpty)
Expand All @@ -2120,7 +2121,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == new Path(loc.getAbsolutePath).toUri)
assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))

assert(loc.listFiles().isEmpty)
Expand Down Expand Up @@ -2162,4 +2163,33 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}

test("the qualified path of a datasource table is stored in the catalog") {
withTable("t", "t1") {
withTempDir { dir =>
assert(!dir.getAbsolutePath.startsWith("file:/"))
spark.sql(
s"""
|CREATE TABLE t(a string)
|USING parquet
|LOCATION '$dir'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location.toString.startsWith("file:/"))
}

withTempDir { dir =>
assert(!dir.getAbsolutePath.startsWith("file:/"))
spark.sql(
s"""
|CREATE TABLE t1(a string, b string)
|USING parquet
|PARTITIONED BY(b)
|LOCATION '$dir'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location.toString.startsWith("file:/"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.internal

import java.io.File
import java.net.URI

import org.scalatest.BeforeAndAfterEach

Expand Down Expand Up @@ -459,7 +458,7 @@ class CatalogSuite
options = Map("path" -> dir.getAbsolutePath))
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.tableType == CatalogTableType.EXTERNAL)
assert(table.storage.locationUri.get == new URI(dir.getAbsolutePath))
assert(table.storage.locationUri.get == makeQualifiedPath(dir.getAbsolutePath))

Seq((1)).toDF("i").write.insertInto("t")
assert(dir.exists() && dir.listFiles().nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '/tmp/path')
""".stripMargin)
assert(getPathOption("src") == Some("/tmp/path"))
assert(getPathOption("src") == Some("file:/tmp/path"))
}

// should exist even path option is not specified when creating table
Expand All @@ -88,15 +88,16 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
test("path option also exist for write path") {
withTable("src") {
withTempPath { p =>
val path = new Path(p.getAbsolutePath).toString
sql(
s"""
|CREATE TABLE src
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '$path')
|OPTIONS (PATH '$p')
|AS SELECT 1
""".stripMargin)
assert(spark.table("src").schema.head.metadata.getString("path") == path)
assert(CatalogUtils.stringToURI(
spark.table("src").schema.head.metadata.getString("path")) ==
makeQualifiedPath(p.getAbsolutePath))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,6 @@ private[sql] trait SQLTestUtils
val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(hadoopPath).toUri
}

def makeQualifiedPath(path: Path): URI = {
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(path).toUri
}
}

private[sql] object SQLTestUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.hive

import java.net.URI

import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
Expand Down Expand Up @@ -142,7 +140,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(hiveTable.storage.serde === Some(serde))

assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
assert(hiveTable.storage.locationUri === Some(new URI(path.getAbsolutePath)))
assert(hiveTable.storage.locationUri === Some(makeQualifiedPath(dir.getAbsolutePath)))

val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,19 +658,17 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w

val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
Seq("1").toDF("a").write.saveAsTable("t")
val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))

assert(table.location == CatalogUtils.stringToURI(expectedPath))
assert(table.location == makeQualifiedPath(tPath.toString))
assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
checkAnswer(spark.table("t"), Row("1") :: Nil)

val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
spark.sql("create table t1 using parquet as select 2 as a")
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"

assert(table1.location == CatalogUtils.stringToURI(expectedPath1))
assert(table1.location == makeQualifiedPath(t1Path.toString))
assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
checkAnswer(spark.table("t1"), Row(2) :: Nil)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1681,7 +1681,7 @@ class HiveDDLSuite
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == new URI(dir.getAbsolutePath))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
Expand All @@ -1701,7 +1701,7 @@ class HiveDDLSuite
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == new URI(dir.getAbsolutePath))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

val partDir = new File(dir, "a=3")
assert(partDir.exists())
Expand Down