Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@
# limitations under the License.
#

from __future__ import print_function
from functools import total_ordering
import itertools
import re
import os

if os.environ.get("AMPLAB_JENKINS"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, this will shows the info every time this modules is imported. why did we do this here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay. it's a temp fix so I'm fine. I will make a followup to handle https://github.com/apache/spark/pull/24639/files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip the hive-thriftserver module when running the hadoop-3.2 test. Will remove it in another PR: https://github.com/apache/spark/pull/24628/files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @HyukjinKwon

hadoop_version = os.environ.get("AMPLAB_JENKINS_BUILD_PROFILE", "hadoop2.7")
else:
hadoop_version = os.environ.get("HADOOP_PROFILE", "hadoop2.7")
print("[info] Choosing supported modules with Hadoop profile", hadoop_version)

all_modules = []

Copy link
Member

@gatorsmile gatorsmile May 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a log message to show which profile we are using; otherwise, it is hard for us to know which profile is activated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean, we should have a log message to show which profile is being actually used.

Just relying on the command parameter lists does not sound very reliable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. updated to:
image

Expand Down Expand Up @@ -72,7 +80,11 @@ def __init__(self, name, dependencies, source_file_regexes, build_profile_flags=
self.dependent_modules = set()
for dep in dependencies:
dep.dependent_modules.add(self)
all_modules.append(self)
# TODO: Skip hive-thriftserver module for hadoop-3.2. remove this once hadoop-3.2 support it
if name == "hive-thriftserver" and hadoop_version == "hadoop3.2":
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used to skip hive-thriftserver module for hadoop-3.2. Will revert this change once we can merge.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe leave a TODO here just to try to make sure that doesn't get lost

print("[info] Skip unsupported module:", name)
else:
all_modules.append(self)

def contains_file(self, filename):
return any(re.match(p, filename) for p in self.source_file_prefixes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
// if (isUsingHiveMetastore) {
// assert(storageFormat.properties.get("path") === expected)
// }
assert(storageFormat.locationUri === Some(expected))
assert(storageFormat.locationUri.map(_.getPath) === Some(expected.getPath))
}
// set table location
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}

protected def testSelectiveDictionaryEncoding(isSelective: Boolean) {
protected def testSelectiveDictionaryEncoding(isSelective: Boolean, isHive23: Boolean = false) {
val tableName = "orcTable"

withTempDir { dir =>
Expand Down Expand Up @@ -171,7 +171,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
// Hive 0.11 and RLE v2 is introduced in Hive 0.12 ORC with more improvements.
// For more details, see https://orc.apache.org/specification/
assert(stripe.getColumns(1).getKind === DICTIONARY_V2)
if (isSelective) {
if (isSelective || isHive23) {
assert(stripe.getColumns(2).getKind === DIRECT_V2)
} else {
assert(stripe.getColumns(2).getKind === DICTIONARY_V2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -191,7 +192,29 @@ private[hive] class HiveClientImpl(
}

/** Returns the configuration for the current session. */
def conf: HiveConf = state.getConf
def conf: HiveConf = if (!HiveUtils.isHive23) {
state.getConf
} else {
val hiveConf = state.getConf
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false
// and hive.metastore.schema.verification from false to true since Hive 2.0.
// For details, see the JIRA HIVE-6113, HIVE-12463 and HIVE-1841.
// isEmbeddedMetaStore should not be true in the production environment.
// We hard-code hive.metastore.schema.verification and datanucleus.schema.autoCreateAll to allow
// bin/spark-shell, bin/spark-sql and sbin/start-thriftserver.sh to automatically create the
// Derby Metastore when running Spark in the non-production environment.
val isEmbeddedMetaStore = {
val msUri = hiveConf.getVar(ConfVars.METASTOREURIS)
val msConnUrl = hiveConf.getVar(ConfVars.METASTORECONNECTURLKEY)
(msUri == null || msUri.trim().isEmpty) &&
(msConnUrl != null && msConnUrl.startsWith("jdbc:derby"))
}
if (isEmbeddedMetaStore) {
hiveConf.setBoolean("hive.metastore.schema.verification", false)
hiveConf.setBoolean("datanucleus.schema.autoCreateAll", true)
}
hiveConf
}

private val userName = conf.getUser

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.shims.ShimLoader

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitUtils
Expand Down Expand Up @@ -196,6 +197,7 @@ private[hive] class IsolatedClientLoader(
protected def isBarrierClass(name: String): Boolean =
name.startsWith(classOf[HiveClientImpl].getName) ||
name.startsWith(classOf[Shim].getName) ||
name.startsWith(classOf[ShimLoader].getName) ||
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add org.apache.hadoop.hive.shims.ShimLoader to BarrierClass, otherwise hadoop-3.2 can't access the Hive metastore from 0.12 to 2.2:

build/sbt "hive/testOnly *.VersionsSuite" -Phadoop-3.2 -Phive
...
[info] - 0.12: create client *** FAILED *** (36 seconds, 207 milliseconds)
[info]   java.lang.reflect.InvocationTargetException:
[info]   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info]   at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
[info]   at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[info]   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
[info]   at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:295)
[info]   at org.apache.spark.sql.hive.client.HiveClientBuilder$.buildClient(HiveClientBuilder.scala:58)
[info]   at org.apache.spark.sql.hive.client.VersionsSuite.$anonfun$new$7(VersionsSuite.scala:130)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:105)
[info]   at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
[info]   at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info]   at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
[info]   at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
[info]   at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info]   at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info]   at org.scalatest.Suite.run(Suite.scala:1147)
[info]   at org.scalatest.Suite.run$(Suite.scala:1129)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info]   at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
[info]   at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:54)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:54)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:507)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
[info]   Cause: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unrecognized Hadoop major version number: 3.2.0
[info]   at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:286)
[info]   at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:187)
[info]   at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:119)

barrierPrefixes.exists(name.startsWith)

protected def classToPath(name: String): String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ object TestHive
// SPARK-8910
.set(UI_ENABLED, false)
.set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
// Hive changed the default of hive.metastore.disallow.incompatible.col.type.changes
// from false to true. For details, see the JIRA HIVE-12320 and HIVE-17764.
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", "false")
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
Expand Down Expand Up @@ -116,8 +119,10 @@ class TestHiveContext(
@transient override val sparkSession: TestHiveSparkSession)
extends SQLContext(sparkSession) {

val HIVE_CONTRIB_JAR: String = "hive-contrib-0.13.1.jar"
val HIVE_HCATALOG_CORE_JAR: String = "hive-hcatalog-core-0.13.1.jar"
val HIVE_CONTRIB_JAR: String =
if (HiveUtils.isHive23) "hive-contrib-2.3.4.jar" else "hive-contrib-0.13.1.jar"
val HIVE_HCATALOG_CORE_JAR: String =
if (HiveUtils.isHive23) "hive-hcatalog-core-2.3.4.jar" else "hive-hcatalog-core-0.13.1.jar"

/**
* If loadTestTables is false, no test tables are loaded. Note that this flag can only be true
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,19 @@ class ClasspathDependenciesSuite extends SparkFunSuite {
}

test("shaded Protobuf") {
assertLoads("org.apache.hive.com.google.protobuf.ServiceException")
if (HiveUtils.isHive23) {
assertLoads("com.google.protobuf.ServiceException")
} else {
assertLoads("org.apache.hive.com.google.protobuf.ServiceException")
}
}

test("shaded Kryo") {
assertLoads("org.apache.hive.com.esotericsoftware.kryo.Kryo")
if (HiveUtils.isHive23) {
assertLoads("com.esotericsoftware.kryo.Kryo")
} else {
assertLoads("org.apache.hive.com.esotericsoftware.kryo.Kryo")
}
}

test("hive-common") {
Expand All @@ -81,7 +89,12 @@ class ClasspathDependenciesSuite extends SparkFunSuite {
}

test("parquet-hadoop-bundle") {
assertLoads("parquet.hadoop.ParquetOutputFormat")
assertLoads("parquet.hadoop.ParquetInputFormat")
if (HiveUtils.isHive23) {
assertLoads("org.apache.parquet.hadoop.ParquetOutputFormat")
assertLoads("org.apache.parquet.hadoop.ParquetInputFormat")
} else {
assertLoads("parquet.hadoop.ParquetOutputFormat")
assertLoads("parquet.hadoop.ParquetInputFormat")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
"--master", "local[2]",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", "spark.sql.hive.metastore.version=1.2.1",
"--conf", "spark.sql.hive.metastore.jars=maven",
"--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
"--conf", s"spark.sql.test.version.index=$index",
"--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
Expand All @@ -197,6 +199,8 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
"--master", "local[2]",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", "spark.sql.hive.metastore.version=1.2.1",
"--conf", "spark.sql.hive.metastore.jars=maven",
"--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
"--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
unusedJar.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,13 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))

checkAnswer(table("t"), testDF)
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
if (HiveUtils.isHive23) {
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.100\t1", "2.100\t2"))
} else {
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.1\t1", "2.1\t2"))
}
}
}

Expand Down Expand Up @@ -238,8 +244,13 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))

checkAnswer(table("t"), testDF)
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.1\t1", "2.1\t2"))
if (HiveUtils.isHive23) {
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.100\t1", "2.100\t2"))
} else {
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.1\t1", "2.1\t2"))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,18 @@ class HiveShimSuite extends SparkFunSuite {

// test when READ_COLUMN_NAMES_CONF_STR is empty
HiveShim.appendReadColumns(conf, ids, names)
assert(names.asJava === ColumnProjectionUtils.getReadColumnNames(conf))
if (HiveUtils.isHive23) {
assert(names === ColumnProjectionUtils.getReadColumnNames(conf))
} else {
assert(names.asJava === ColumnProjectionUtils.getReadColumnNames(conf))
}

// test when READ_COLUMN_NAMES_CONF_STR is non-empty
HiveShim.appendReadColumns(conf, moreIds, moreNames)
assert((names ++ moreNames).asJava === ColumnProjectionUtils.getReadColumnNames(conf))
if (HiveUtils.isHive23) {
assert((names ++ moreNames) === ColumnProjectionUtils.getReadColumnNames(conf))
} else {
assert((names ++ moreNames).asJava === ColumnProjectionUtils.getReadColumnNames(conf))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
.asInstanceOf[HiveTableRelation]

val properties = relation.tableMeta.ignoredProperties
assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0")
assert(properties("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
if (HiveUtils.isHive23) {
// Since HIVE-6727, Hive fixes table-level stats for external tables are incorrect.
assert(properties("totalSize").toLong == 6)
assert(properties.get("rawDataSize").isEmpty)
} else {
assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0")
assert(properties("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
}

val sizeInBytes = relation.stats.sizeInBytes
assert(sizeInBytes === BigInt(file1.length() + file2.length()))
Expand Down Expand Up @@ -865,17 +871,25 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val totalSize = extractStatsPropValues(describeResult, "totalSize")
assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost")

// ALTER TABLE SET/UNSET TBLPROPERTIES invalidates some Hive specific statistics, but not
// Spark specific statistics. This is triggered by the Hive alterTable API.
val numRows = extractStatsPropValues(describeResult, "numRows")
assert(numRows.isDefined && numRows.get == -1, "numRows is lost")
val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize")
assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost")

if (analyzedBySpark) {
if (HiveUtils.isHive23) {
// Since HIVE-15653(Hive 2.3.0), Hive fixs some ALTER TABLE commands drop table stats.
assert(numRows.isDefined && numRows.get == 500)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally, Hive fixed this annoying bug. The statistics are very important for us too. Please add this in the migration guide and document the behavior difference when they used different Hadoop profile.

val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize")
assert(rawDataSize.isDefined && rawDataSize.get == 5312)
checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
} else {
checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = None)
// ALTER TABLE SET/UNSET TBLPROPERTIES invalidates some Hive specific statistics, but not
// Spark specific statistics. This is triggered by the Hive alterTable API.
assert(numRows.isDefined && numRows.get == -1, "numRows is lost")
val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize")
assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost")

if (analyzedBySpark) {
checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
} else {
checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = None)
}
}
}
}
Expand Down
Loading