-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-9393] [SQL] Fix several error-handling bugs in ScriptTransform operator #7710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
fa18d26
b43e4ec
bd4c948
4ee36a2
8b162b6
88278de
b31258d
323bb2b
494cde0
6a06a8c
983f200
16c44e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution | |
|
|
||
| import java.io.{BufferedReader, DataInputStream, DataOutputStream, EOFException, InputStreamReader} | ||
| import java.util.Properties | ||
| import javax.annotation.Nullable | ||
|
|
||
| import scala.collection.JavaConversions._ | ||
|
|
||
|
|
@@ -68,7 +69,11 @@ case class ScriptTransformation( | |
| val errorStream = proc.getErrorStream | ||
| val reader = new BufferedReader(new InputStreamReader(inputStream)) | ||
|
|
||
| val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output) | ||
| // This nullability is a performance optimization in order to avoid an Option.foreach() call | ||
| // inside of a loop | ||
| @Nullable val (outputSerde, outputSoi) = { | ||
| ioschema.initOutputSerDe(output).getOrElse((null, null)) | ||
| } | ||
|
|
||
| val iterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors { | ||
| var cacheRow: InternalRow = null | ||
|
|
@@ -146,7 +151,9 @@ case class ScriptTransformation( | |
| } | ||
| } | ||
|
|
||
| val (inputSerde, inputSoi) = ioschema.initInputSerDe(input) | ||
| // This nullability is a performance optimization in order to avoid an Option.foreach() call | ||
| // inside of a loop | ||
| @Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) | ||
| val dataOutputStream = new DataOutputStream(outputStream) | ||
| val outputProjection = new InterpretedProjection(input, child.output) | ||
|
|
||
|
|
@@ -200,33 +207,43 @@ private[hive] | |
| case class HiveScriptIOSchema ( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes in this class are purely code-cleanup, mostly related to making more variables private and performing some cleanup related to nullability. |
||
| inputRowFormat: Seq[(String, String)], | ||
| outputRowFormat: Seq[(String, String)], | ||
| inputSerdeClass: String, | ||
| outputSerdeClass: String, | ||
| inputSerdeClass: Option[String], | ||
| outputSerdeClass: Option[String], | ||
| inputSerdeProps: Seq[(String, String)], | ||
| outputSerdeProps: Seq[(String, String)], | ||
| schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors { | ||
|
|
||
| val defaultFormat = Map(("TOK_TABLEROWFORMATFIELD", "\t"), | ||
| ("TOK_TABLEROWFORMATLINES", "\n")) | ||
| private val defaultFormat = Map( | ||
| ("TOK_TABLEROWFORMATFIELD", "\t"), | ||
| ("TOK_TABLEROWFORMATLINES", "\n") | ||
| ) | ||
|
|
||
| val inputRowFormatMap = inputRowFormat.toMap.withDefault((k) => defaultFormat(k)) | ||
| val outputRowFormatMap = outputRowFormat.toMap.withDefault((k) => defaultFormat(k)) | ||
|
|
||
|
|
||
| def initInputSerDe(input: Seq[Expression]): (AbstractSerDe, ObjectInspector) = { | ||
| val (columns, columnTypes) = parseAttrs(input) | ||
| val serde = initSerDe(inputSerdeClass, columns, columnTypes, inputSerdeProps) | ||
| (serde, initInputSoi(serde, columns, columnTypes)) | ||
| def initInputSerDe(input: Seq[Expression]): Option[(AbstractSerDe, ObjectInspector)] = { | ||
| inputSerdeClass.map { serdeClass => | ||
| val (columns, columnTypes) = parseAttrs(input) | ||
| val serde = initSerDe(serdeClass, columns, columnTypes, inputSerdeProps) | ||
| val fieldObjectInspectors = columnTypes.map(toInspector) | ||
| val objectInspector = ObjectInspectorFactory | ||
| .getStandardStructObjectInspector(columns, fieldObjectInspectors) | ||
| .asInstanceOf[ObjectInspector] | ||
| (serde, objectInspector) | ||
| } | ||
| } | ||
|
|
||
| def initOutputSerDe(output: Seq[Attribute]): (AbstractSerDe, StructObjectInspector) = { | ||
| val (columns, columnTypes) = parseAttrs(output) | ||
| val serde = initSerDe(outputSerdeClass, columns, columnTypes, outputSerdeProps) | ||
| (serde, initOutputputSoi(serde)) | ||
| def initOutputSerDe(output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] = { | ||
| outputSerdeClass.map { serdeClass => | ||
| val (columns, columnTypes) = parseAttrs(output) | ||
| val serde = initSerDe(serdeClass, columns, columnTypes, outputSerdeProps) | ||
| val structObjectInspector = serde.getObjectInspector().asInstanceOf[StructObjectInspector] | ||
| (serde, structObjectInspector) | ||
| } | ||
| } | ||
|
|
||
| def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = { | ||
|
|
||
| private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = { | ||
| val columns = attrs.map { | ||
| case aref: AttributeReference => aref.name | ||
| case e: NamedExpression => e.name | ||
|
|
@@ -242,52 +259,29 @@ case class HiveScriptIOSchema ( | |
| (columns, columnTypes) | ||
| } | ||
|
|
||
| def initSerDe(serdeClassName: String, columns: Seq[String], | ||
| columnTypes: Seq[DataType], serdeProps: Seq[(String, String)]): AbstractSerDe = { | ||
| private def initSerDe( | ||
| serdeClassName: String, | ||
| columns: Seq[String], | ||
| columnTypes: Seq[DataType], | ||
| serdeProps: Seq[(String, String)]): AbstractSerDe = { | ||
|
|
||
| val serde: AbstractSerDe = if (serdeClassName != "") { | ||
| val serde: AbstractSerDe = { | ||
| val trimed_class = serdeClassName.split("'")(1) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm going to fix this in a commit in a few minutes. This is really messy: it would be better to perform this work in the parser rather than doing it here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not only is this messy, I think it's also wrong:
|
||
| Utils.classForName(trimed_class) | ||
| .newInstance.asInstanceOf[AbstractSerDe] | ||
| } else { | ||
| null | ||
| } | ||
|
|
||
| if (serde != null) { | ||
| val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",") | ||
| val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",") | ||
|
|
||
| var propsMap = serdeProps.map(kv => { | ||
| (kv._1.split("'")(1), kv._2.split("'")(1)) | ||
| }).toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(",")) | ||
| propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames) | ||
| var propsMap = serdeProps.map(kv => { | ||
| (kv._1.split("'")(1), kv._2.split("'")(1)) | ||
| }).toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(",")) | ||
| propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames) | ||
|
|
||
| val properties = new Properties() | ||
| properties.putAll(propsMap) | ||
| serde.initialize(null, properties) | ||
| } | ||
| val properties = new Properties() | ||
| properties.putAll(propsMap) | ||
| serde.initialize(null, properties) | ||
|
|
||
| serde | ||
| } | ||
|
|
||
| def initInputSoi(inputSerde: AbstractSerDe, columns: Seq[String], columnTypes: Seq[DataType]) | ||
| : ObjectInspector = { | ||
|
|
||
| if (inputSerde != null) { | ||
| val fieldObjectInspectors = columnTypes.map(toInspector(_)) | ||
| ObjectInspectorFactory | ||
| .getStandardStructObjectInspector(columns, fieldObjectInspectors) | ||
| .asInstanceOf[ObjectInspector] | ||
| } else { | ||
| null | ||
| } | ||
| } | ||
|
|
||
| def initOutputputSoi(outputSerde: AbstractSerDe): StructObjectInspector = { | ||
| if (outputSerde != null) { | ||
| outputSerde.getObjectInspector().asInstanceOf[StructObjectInspector] | ||
| } else { | ||
| null | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it was confusing to use an empty string to represent a missing value, hence this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use
nullhere without changing the type?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could but I feel that's a bit less clear and more error-prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to this change.