Skip to content

Commit 07c4b9b

Browse files
gatorsmiledongjoon-hyun
authored andcommitted
Revert "[SPARK-25474][SQL] Support spark.sql.statistics.fallBackToHdfs in data source tables"
This reverts commit 485ae6d. Closes apache#25563 from gatorsmile/revert. Authored-by: Xiao Li <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8258660 commit 07c4b9b

File tree

4 files changed

+15
-63
lines changed

4 files changed

+15
-63
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -344,15 +344,4 @@ object CommandUtils extends Logging {
344344
private def isDataPath(path: Path, stagingDir: String): Boolean = {
345345
!path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
346346
}
347-
348-
def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = {
349-
try {
350-
val hadoopConf = session.sessionState.newHadoopConf()
351-
path.getFileSystem(hadoopConf).getContentSummary(path).getLength
352-
} catch {
353-
case NonFatal(e) =>
354-
logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e)
355-
defaultSize
356-
}
357-
}
358347
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import org.apache.hadoop.fs.Path
20+
import java.util.Locale
21+
22+
import scala.collection.mutable
2123

2224
import org.apache.spark.sql.{SparkSession, SQLContext}
2325
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2426
import org.apache.spark.sql.execution.FileRelation
25-
import org.apache.spark.sql.execution.command.CommandUtils
2627
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
2728
import org.apache.spark.sql.types.{StructField, StructType}
2829

@@ -70,13 +71,7 @@ case class HadoopFsRelation(
7071

7172
override def sizeInBytes: Long = {
7273
val compressionFactor = sqlContext.conf.fileCompressionFactor
73-
val defaultSize = (location.sizeInBytes * compressionFactor).toLong
74-
location match {
75-
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled =>
76-
CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new Path(cfi.table.location),
77-
defaultSize)
78-
case _ => defaultSize
79-
}
74+
(location.sizeInBytes * compressionFactor).toLong
8075
}
8176

8277

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
3030
ScriptTransformation}
3131
import org.apache.spark.sql.catalyst.rules.Rule
3232
import org.apache.spark.sql.execution._
33-
import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils}
33+
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
3434
import org.apache.spark.sql.execution.datasources.CreateTable
3535
import org.apache.spark.sql.hive.execution._
3636
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -118,8 +118,16 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
118118
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
119119
val table = relation.tableMeta
120120
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
121-
CommandUtils.getSizeInBytesFallBackToHdfs(session, new Path(table.location),
122-
session.sessionState.conf.defaultSizeInBytes)
121+
try {
122+
val hadoopConf = session.sessionState.newHadoopConf()
123+
val tablePath = new Path(table.location)
124+
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
125+
fs.getContentSummary(tablePath).getLength
126+
} catch {
127+
case e: IOException =>
128+
logWarning("Failed to get table size from hdfs.", e)
129+
session.sessionState.conf.defaultSizeInBytes
130+
}
123131
} else {
124132
session.sessionState.conf.defaultSizeInBytes
125133
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,44 +1484,4 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
14841484
}
14851485
}
14861486
}
1487-
1488-
test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") {
1489-
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
1490-
withTable("t1", "t2") {
1491-
sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
1492-
sql("INSERT INTO t1 VALUES (1, 'a')")
1493-
checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB")
1494-
sql("CREATE TABLE t2 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
1495-
sql("INSERT INTO t2 VALUES (1, 'a')")
1496-
checkKeywordsExist(sql("EXPLAIN SELECT * FROM t1, t2 WHERE t1.id=t2.id"),
1497-
"BroadcastHashJoin")
1498-
}
1499-
}
1500-
}
1501-
1502-
test("SPARK-25474: should not fall back to hdfs when table statistics exists" +
1503-
" for CatalogFileIndex dataSourceTable") {
1504-
1505-
var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L
1506-
Seq(true, false).foreach { fallBackToHdfs =>
1507-
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> fallBackToHdfs.toString) {
1508-
withTable("t1") {
1509-
sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
1510-
sql("INSERT INTO t1 VALUES (1, 'a')")
1511-
// Analyze command updates the statistics of table `t1`
1512-
sql("ANALYZE TABLE t1 COMPUTE STATISTICS")
1513-
val catalogTable = getCatalogTable("t1")
1514-
assert(catalogTable.stats.isDefined)
1515-
1516-
if (!fallBackToHdfs) {
1517-
sizeInBytesDisabledFallBack = catalogTable.stats.get.sizeInBytes.toLong
1518-
} else {
1519-
sizeInBytesEnabledFallBack = catalogTable.stats.get.sizeInBytes.toLong
1520-
}
1521-
checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB")
1522-
}
1523-
}
1524-
}
1525-
assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack)
1526-
}
15271487
}

0 commit comments

Comments
 (0)