Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package org.apache.spark.sql.hive.thriftserver

import java.io._
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{ArrayList => JArrayList, List => JList, Locale}
import java.util.{Locale, ArrayList => JArrayList, List => JList}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import jline.console.ConsoleReader
import jline.console.history.FileHistory
import org.apache.commons.lang3.StringUtils
Expand All @@ -38,14 +37,13 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.log4j.Level
import org.apache.thrift.transport.TSocket
import sun.misc.{Signal, SignalHandler}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
import org.apache.spark.util.ShutdownHookManager
import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
* This code doesn't support remote connections in Hive 1.2+, as the underlying CliDriver
Expand Down Expand Up @@ -336,6 +334,68 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
console.printInfo(s"Spark master: $master, Application Id: $appId")
}


/**
* Creates a formatted representation like its provided by the Dataset.show implementation.
*
* @param resultRows rows to compose as String
* @param header Seq of column names
* @return
*/
def showQueryResults(resultRows: Seq[Seq[String]],
header: Seq[String] = Nil): String = {


val sb = new StringBuilder

// If no headers/data is provided, an empty String will be returned.
val numCols =
if (resultRows.nonEmpty) {
resultRows.head.length
} else if (header.nonEmpty) {
header.length
} else {
0
}

val rows = if (header.nonEmpty && numCols == header.length) header +: resultRows else resultRows


// We set a minimum column width at '3'
val minimumColWidth = 3
val colWidths = Array.fill(numCols)(minimumColWidth)

for (row <- rows) {
for ((cell, i) <- row.zipWithIndex) {
colWidths(i) = math.max(colWidths(i), Utils.stringHalfWidth(cell))
}
}

val paddedRows = rows.map { row =>
row.zipWithIndex.map { case (cell, i) =>
StringUtils.leftPad(cell, colWidths(i) - Utils.stringHalfWidth(cell) + cell.length)
}
}

// Create SeparateLine
val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()

if (header.nonEmpty) {
// column names
paddedRows.head.addString(sb, "|", "|", "|\n")
sb.append(sep)

// data
paddedRows.tail.foreach(_.addString(sb, "|", "|", "|\n"))
sb.append(sep)
} else {
//Only data, no headers.
for (elem <- paddedRows) elem.addString(sb, "|", "|", "|\n")
sb.append(sep)
}
sb.toString()
}

override def processCmd(cmd: String): Int = {
val cmd_trimmed: String = cmd.trim()
val cmd_lower = cmd_trimmed.toLowerCase(Locale.ROOT)
Expand Down Expand Up @@ -398,15 +458,24 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
}
}

var counter = 0
// Getting query results as Seq[Seq[String]]
val tmpRows = driver.getResultsAsScala

try {
while (!out.checkError() && driver.getResults(res)) {
res.asScala.foreach { l =>
counter += 1
out.println(l)
// Getting Column names if "hive.cli.print.header" is set to true.
val headers: Option[Seq[String]] =
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)) {
Some(driver.getSchema.getFieldSchemas.asScala.map(x => x.getName).asInstanceOf[Seq[String]])
} else {
None
}
res.clear()
}

//Flushes the print stream
out.checkError()
//Print results only if there is a header or data present.
if (headers.getOrElse(Nil) != Nil || tmpRows.nonEmpty)
out.println(showQueryResults(tmpRows, headers.getOrElse(Nil)))

} catch {
case e: IOException =>
console.printError(
Expand All @@ -422,8 +491,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
}

var responseMsg = s"Time taken: $timeTaken seconds"
if (counter != 0) {
responseMsg += s", Fetched $counter row(s)"
if (tmpRows.nonEmpty) {
responseMsg += s", Fetched ${tmpRows.length} row(s)"
}
console.printInfo(responseMsg, null)
// Destroy the driver to release all the locks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
val analyzed = query.analyzed
logDebug(s"Result Schema: ${analyzed.output}")
if (analyzed.output.isEmpty) {
new Schema(Arrays.asList(new FieldSchema("Response code", "string", "")), null)
new Schema(Arrays.asList(), null)
} else {
val fieldSchemas = analyzed.output.map { attr =>
new FieldSchema(attr.name, attr.dataType.catalogString, "")
Expand Down Expand Up @@ -92,6 +92,20 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
}
}

/**
* Provides results as a scala Seq of rows of Seq
* @return Seq[Seq[String]] as rows[columns]
*/
def getResultsAsScala: Seq[Seq[String]] = {
if (hiveResponse == null) {
Nil
} else {
val result:Seq[Seq[String]] = hiveResponse.map(_.split("\t").toSeq)
hiveResponse = null
result
}
}

override def getSchema: Schema = tableSchema

override def destroy(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
}

test("Single command with -e") {
runCliWithin(2.minute, Seq("-e", "SHOW DATABASES;"))("" -> "")
runCliWithin(2.minute, Seq("-e", "SHOW DATABASES;"))("" -> "default")
}

test("Single command with --database") {
Expand All @@ -195,8 +195,8 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
-> "hive_test"
)

runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))(
"" -> "hive_test"
runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "INSERT INTO hive_test VALUES (1, 'test');"))(
"SELECT * FROM hive_test;" -> "1"
)
}

Expand Down Expand Up @@ -339,7 +339,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
Seq("--jars", s"$jarFile"))(
"CREATE TEMPORARY FUNCTION testjar AS" +
" 'org.apache.spark.sql.hive.execution.UDTFStack';" -> "",
"SELECT testjar(1,'TEST-SPARK-TEST-jar', 28840);" -> "TEST-SPARK-TEST-jar\t28840"
"SELECT testjar(1,'TEST-SPARK-TEST-jar', 28840);" -> "TEST-SPARK-TEST-jar|28840"
)
}

Expand All @@ -352,7 +352,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))(
"CREATE TEMPORARY FUNCTION testjar AS" +
" 'org.apache.spark.sql.hive.execution.UDTFStack';" -> "",
"SELECT testjar(1,'TEST-SPARK-TEST-jar', 28840);" -> "TEST-SPARK-TEST-jar\t28840",
"SELECT testjar(1,'TEST-SPARK-TEST-jar', 28840);" -> "TEST-SPARK-TEST-jar|28840",
"CREATE TEMPORARY FUNCTION example_max AS " +
"'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax';" -> "",
"SELECT concat_ws(',', 'First', example_max(1234321), 'Third');" -> "First,1234321,Third"
Expand Down Expand Up @@ -387,9 +387,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {

test("SPARK-26321 Should not split semicolon within quoted string literals") {
runCliWithin(3.minute)(
"""select 'Test1', "^;^";""" -> "Test1\t^;^",
"""select 'Test2', "\";";""" -> "Test2\t\";",
"""select 'Test3', "\';";""" -> "Test3\t';",
"""select 'Test1', "^;^";""" -> "Test1|^;^",
"""select 'Test2', "\";";""" -> "Test2| \";",
"""select 'Test3', "\';";""" -> "Test3| ';",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the output was matched to Hive's intentionally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the extra space? the minimun col width is 3

"select concat('Test4', ';');" -> "Test4;"
)
}
Expand Down