Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver

import java.util.UUID

import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.GetTypeInfoOperation
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own GetTypeInfoOperation
*
* @param sqlContext SQLContext to use
* @param parentSession a HiveSession from SessionManager
*/
private[hive] class SparkGetTypeInfoOperation(
sqlContext: SQLContext,
parentSession: HiveSession)
extends GetTypeInfoOperation(parentSession) with Logging {

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.listener.onOperationClosed(statementId)
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
val logMsg = "Listing type info"
logInfo(s"$logMsg with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

if (isAuthV2Enabled) {
authorizeMetaGets(HiveOperationType.GET_TYPEINFO, null)
}

HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
statementId,
parentSession.getUsername)

try {
ThriftserverShimUtils.supportedType().foreach(typeInfo => {
val rowData = Array[AnyRef](
typeInfo.getName, // TYPE_NAME
typeInfo.toJavaSQLType.asInstanceOf[AnyRef], // DATA_TYPE
typeInfo.getMaxPrecision.asInstanceOf[AnyRef], // PRECISION
typeInfo.getLiteralPrefix, // LITERAL_PREFIX
typeInfo.getLiteralSuffix, // LITERAL_SUFFIX
typeInfo.getCreateParams, // CREATE_PARAMS
typeInfo.getNullable.asInstanceOf[AnyRef], // NULLABLE
typeInfo.isCaseSensitive.asInstanceOf[AnyRef], // CASE_SENSITIVE
typeInfo.getSearchable.asInstanceOf[AnyRef], // SEARCHABLE
typeInfo.isUnsignedAttribute.asInstanceOf[AnyRef], // UNSIGNED_ATTRIBUTE
typeInfo.isFixedPrecScale.asInstanceOf[AnyRef], // FIXED_PREC_SCALE
typeInfo.isAutoIncrement.asInstanceOf[AnyRef], // AUTO_INCREMENT
typeInfo.getLocalizedName, // LOCAL_TYPE_NAME
typeInfo.getMinimumScale.asInstanceOf[AnyRef], // MINIMUM_SCALE
typeInfo.getMaximumScale.asInstanceOf[AnyRef], // MAXIMUM_SCALE
null, // SQL_DATA_TYPE, unused
null, // SQL_DATETIME_SUB, unused
typeInfo.getNumPrecRadix // NUM_PREC_RADIX
)
rowSet.addRow(rowData)
})
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,15 @@ private[thriftserver] class SparkSQLOperationManager()
logDebug(s"Created GetFunctionsOperation with session=$parentSession.")
operation
}

override def newGetTypeInfoOperation(
parentSession: HiveSession): GetTypeInfoOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
" initialized or had already closed.")
val operation = new SparkGetTypeInfoOperation(sqlContext, parentSession)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created GetTypeInfoOperation with session=$parentSession.")
operation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,20 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
assert(!rs.next())
}
}

test("GetTypeInfo Thrift API") {
def checkResult(rs: ResultSet, typeNames: Seq[String]): Unit = {
for (i <- typeNames.indices) {
assert(rs.next())
assert(rs.getString("TYPE_NAME") === typeNames(i))
}
// Make sure there are no more elements
assert(!rs.next())
}

withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
checkResult(metaData.getTypeInfo, ThriftserverShimUtils.supportedType().map(_.getName))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class GetTypeInfoOperation extends MetadataOperation {
.addPrimitiveColumn("NUM_PREC_RADIX", Type.INT_TYPE,
"Usually 2 or 10");

private final RowSet rowSet;
protected final RowSet rowSet;

protected GetTypeInfoOperation(HiveSession parentSession) {
super(parentSession, OperationType.GET_TYPE_INFO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.{RowSet, RowSetFactory, TableSchema, Type}
import org.apache.hive.service.cli.Type._
import org.apache.hive.service.cli.thrift.TProtocolVersion._

/**
Expand Down Expand Up @@ -51,6 +52,14 @@ private[thriftserver] object ThriftserverShimUtils {

private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType

private[thriftserver] def supportedType(): Seq[Type] = {
Seq(NULL_TYPE, BOOLEAN_TYPE, STRING_TYPE, BINARY_TYPE,
TINYINT_TYPE, SMALLINT_TYPE, INT_TYPE, BIGINT_TYPE,
FLOAT_TYPE, DOUBLE_TYPE, DECIMAL_TYPE,
DATE_TYPE, TIMESTAMP_TYPE,
ARRAY_TYPE, MAP_TYPE, STRUCT_TYPE)
}

private[thriftserver] def addToClassPath(
loader: ClassLoader,
auxJars: Array[String]): ClassLoader = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class GetTypeInfoOperation extends MetadataOperation {
.addPrimitiveColumn("NUM_PREC_RADIX", Type.INT_TYPE,
"Usually 2 or 10");

private final RowSet rowSet;
protected final RowSet rowSet;

protected GetTypeInfoOperation(HiveSession parentSession) {
super(parentSession, OperationType.GET_TYPE_INFO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.hive.ql.exec.AddToClassPathAction
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.thrift.Type
import org.apache.hadoop.hive.serde2.thrift.Type._
import org.apache.hive.service.cli.{RowSet, RowSetFactory, TableSchema}
import org.apache.hive.service.rpc.thrift.TProtocolVersion._
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -56,6 +57,14 @@ private[thriftserver] object ThriftserverShimUtils {

private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType

private[thriftserver] def supportedType(): Seq[Type] = {
Seq(NULL_TYPE, BOOLEAN_TYPE, STRING_TYPE, BINARY_TYPE,
TINYINT_TYPE, SMALLINT_TYPE, INT_TYPE, BIGINT_TYPE,
FLOAT_TYPE, DOUBLE_TYPE, DECIMAL_TYPE,
DATE_TYPE, TIMESTAMP_TYPE,
ARRAY_TYPE, MAP_TYPE, STRUCT_TYPE)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we skip ARRAY_TYPE, MAP_TYPE, STRUCT_TYPE and USER_DEFINED_TYPE?

Copy link
Contributor Author

@AngersZhuuuu AngersZhuuuu Sep 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we skip ARRAY_TYPE, MAP_TYPE, STRUCT_TYPE and USER_DEFINED_TYPE?

Support this type just convert to string to show. Should add .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add these types. Hive-3.1.2 also converted these types to strings.
@juliuszsompolski What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the client handle it?
If you do

val stmt = conn.prepareStatement("SELECT array, map, struct, interval FROM table")
val rs = stmt.executeQuery()
val md = rs.getMetaData()

Then what does md.getColumnType(i) return for each of these columns?
What type of rs.getXXX call should the user use for each of these columns? For the array column, should it be rs.getArray(i) or rs.getString(i)?
What is the mapping of types returned by md.getColumnType(i), with the getters that should be used for them in rs.getXXX(i)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For getMetadataResult, it truly return ARRAY, MAP, STRUCT.
Return content is organized by HiveResult.toHiveString() method as each's DataType.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. We can not fully support these type. Please remove them @AngersZhuuuu
Thanks @juliuszsompolski for you example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, thanks for explaining it @AngersZhuuuu, and you convinced me that ARRAY, MAP and STRUCT must be included.

scala> md.getColumnType(1)
res11: Int = 2003 (== java.sql.Types.ARRAY)

but then

scala> md.getColumnClassName(1)
res10: String = java.lang.String

so that tells to the client that it is actually returned as String, and I should retrieve it as such, either with rs.getObject(1).asInstance[String] or as convenient shorthand with rs.getString(1).
It would actually be incorrect to not include Array, Map, Struct, because we do return them in ResultSet schema (through SparkExecuteStatement.getTableSchema), so the client can get these type returned, and for any type that can be returned to the client there should be an entry in GetTypeInfo.
We therefore should not include INTERVAL (because we explicitly turn it to String return type after #25277), and not include UNIONTYPE or USER_DEFINED because they don't have any Spark equivalent, but ARRAY, MAP and STRUCT should be there.
Thank you for the explanation 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski

case Types.OTHER:
      case Types.JAVA_OBJECT: {
        switch (hiveType) {
          case INTERVAL_YEAR_MONTH_TYPE:
            return HiveIntervalYearMonth.class.getName();
          case INTERVAL_DAY_TIME_TYPE:
            return HiveIntervalDayTime.class.getName();
          default:
            return String.class.getName();
        }
      }

USER_DEFINED in java.sql.Types is OTHERS, in the convert progress, it's also converted to String , same as ARRAY, MAP, STRUCT.
Maybe we should add USER_DEFINED.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But Spark will never return a USER_DEFINED type.
The current implementation of org.apache.spark.sql.types.UserDefinedType will return the underlying sqlType.simpleString as it's catalogString, so Thriftserver queries will return the underlying type in the schema.
Hence for USER_DEFINED (and UNIONTYPE) the argument is not that they wouldn't potentially work, but that Spark does not use them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But Spark will never return a USER_DEFINED type.
The current implementation of org.apache.spark.sql.types.UserDefinedType will return the underlying sqlType.simpleString as it's catalogString, so Thriftserver queries will return the underlying type in the schema.
Hence for USER_DEFINED (and UNIONTYPE) the argument is not that they wouldn't potentially work, but that Spark does not use them.

Remove it and resolve conflicts.


private[thriftserver] def addToClassPath(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line between functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add it .

loader: ClassLoader,
auxJars: Array[String]): ClassLoader = {
Expand Down