Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ private[hive] class HiveClientImpl(
// Note that this statistics could be overridden by Spark's statistics if that's available.
val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_))
val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_))
val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)).filter(_ >= 0)
val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)).filter(_ > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive has a flag called StatsSetupConst.COLUMN_STATS_ACCURATE. If I remember correctly, this flag will become false if user changes table properties or table data. Can you check if the flag exists in your case? If so, we can use the flag to decide whether to read statistics from Hive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root problem is that user can set "wrong" table properties. So if we want to prevent using wrong stats, we need to detect changes in properties. Otherwise your case can't be avoided.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StatsSetupConst.COLUMN_STATS_ACCURATE to ensure that statistics have been updated, but can not be guaranteed to be correct:

cat <<EOF > data
1,1
2,2
3,3
4,4
5,5
EOF

hive -e "CREATE TABLE spark_22626(c1 int, c2 int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';"

hive -e "LOAD DATA local inpath 'data' into table spark_22626;"

hive -e "INSERT INTO table spark_22626 values(6, 6);"

hive -e "desc extended spark_22626;"

The result is:

parameters:{totalSize=24, numRows=1, rawDataSize=3, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true"}

numRows should be 6, but got 1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be more clear:

val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_))
val stats =
  if (totalSize.isDefined && totalSize.get > 0L) {
    Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount.filter(_ > 0)))
  } else if (rawDataSize.isDefined && rawDataSize.get > 0) {
    Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount.filter(_ > 0)))
  } else {
    None
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the investigation. Seems hive can't protect its stats properties.

// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, BuildLeft}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
Expand Down Expand Up @@ -1187,6 +1187,22 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
}
}
}

test("Wrong Hive table statistics may trigger OOM if enables join reorder in CBO") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO you can just test the read logic for Hive's stats properties in StatisticsSuite, instead of a end-to-end test case, developers may not know what's going on by this test case.

withTable("small", "big") {
sql("CREATE TABLE small (c1 bigint)" +
"TBLPROPERTIES ('numRows'='3', 'rawDataSize'='600','totalSize'='800')")
sql("CREATE TABLE big (c1 bigint)" +
"TBLPROPERTIES ('numRows'='0', 'rawDataSize'='60000000000', 'totalSize'='8000000000000')")

withSQLConf(SQLConf.CBO_ENABLED.key -> "true", SQLConf.JOIN_REORDER_ENABLED.key -> "true") {
val plan = sql("select * from small t1 join big t2 on (t1.c1 = t2.c1)")
.queryExecution.executedPlan
val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
assert(buildSide === BuildLeft)
}
}
}
}

// for SPARK-2180 test
Expand Down