Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Supported ConstantInspector for UDAF
Fixed HiveUdaf wrap object issue.
  • Loading branch information
gvramana committed Nov 22, 2014
commit cb7c61e286b3339fbd216aac4c317c3279aefeaa
26 changes: 18 additions & 8 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,13 @@ private[hive] case class HiveGenericUdaf(

@transient
protected lazy val objectInspector = {
resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray)
val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors.toArray,false,false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, It is other way round. GenericUDAFResolver and its method GenericUDAFEvaluator getEvaluator(TypeInfo[] info) is deprecated and replaced by GenericUDAFResolver2. AbstractGenericUDAFResolver is for migration.
UDAF function like percentile_approx no longer supports the deprecated interface. So i have changed this to use AbstractGenericUDAFResolver(GenericUDAFParameterInfo).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it. thanks.
Nit: spaces after ,.

resolver.getEvaluator(parameterInfo)
.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray)
}

@transient
protected lazy val inspectors = children.map(_.dataType).map(toInspector)
protected lazy val inspectors = children.map(toInspector)

def dataType: DataType = inspectorToDataType(objectInspector)

Expand Down Expand Up @@ -233,7 +234,7 @@ private[hive] case class HiveUdaf(
}

@transient
protected lazy val inspectors = children.map(_.dataType).map(toInspector)
protected lazy val inspectors = children.map(ex => toInspector(ex.dataType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

children.map(toInspector) instead, we should always get the ObjectInspector via expression. Sorry it's a legacy bug in my PR.


def dataType: DataType = inspectorToDataType(objectInspector)

Expand Down Expand Up @@ -266,7 +267,7 @@ private[hive] case class HiveGenericUdtf(
protected lazy val function: GenericUDTF = createFunction()

@transient
protected lazy val inputInspectors = children.map(_.dataType).map(toInspector)
protected lazy val inputInspectors = children.map( ex => toInspector(ex.dataType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove the space before ex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

children.map(toInspector) instead, we should always get the ObjectInspector via expression. Sorry it's a legacy bug in my PR.


@transient
protected lazy val outputInspector = function.initialize(inputInspectors.toArray)
Expand Down Expand Up @@ -341,9 +342,15 @@ private[hive] case class HiveUdafFunction(
createFunction[AbstractGenericUDAFResolver]()
}

private val inspectors = exprs.map(_.dataType).map(toInspector).toArray

private val function = resolver.getEvaluator(exprs.map(_.dataType.toTypeInfo).toArray)

private val inspectors =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have to distinguish the isUDAFBridgeRequired? Can we just use the exprs.map(toInspector).toArray?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously flag added to not disturb UDAFBridge path. Fixed and tested the same.

if(isUDAFBridgeRequired) exprs.map(ex => toInspector(ex.dataType)).toArray
else exprs.map(toInspector).toArray

private val function = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo)` is deprecated. We'd better keep the previous implementation.
https://github.com/apache/hive/blob/b8250ac2f30539f6b23ce80a20a9e338d3d31458/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/AbstractGenericUDAFResolver.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, It is other way round. GenericUDAFResolver and its method GenericUDAFEvaluator getEvaluator(TypeInfo[] info) is deprecated and replaced by GenericUDAFResolver2. AbstractGenericUDAFResolver is for migration.
UDAF function like percentile_approx no longer supports the deprecated interface. So i have changed this to use AbstractGenericUDAFResolver(GenericUDAFParameterInfo).

val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors,false,false)
resolver.getEvaluator(parameterInfo)
}

private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)

Expand All @@ -356,8 +363,11 @@ private[hive] case class HiveUdafFunction(
@transient
val inputProjection = new InterpretedProjection(exprs)

@transient
protected lazy val cached = new Array[AnyRef](exprs.length)

def update(input: Row): Unit = {
val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray
function.iterate(buffer, inputs)
function.iterate(buffer, wrap(inputs,inspectors,cached))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spaces after ,

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,19 @@ class HiveUdfSuite extends QueryTest {
test("SPARK-2693 udaf aggregates test") {
checkAnswer(sql("SELECT percentile(key,1) FROM src LIMIT 1"),
sql("SELECT max(key) FROM src").collect().toSeq)

checkAnswer(sql("SELECT percentile(key,array(1,1)) FROM src LIMIT 1"),
sql("SELECT array(max(key),max(key)) FROM src").collect().toSeq)
}

test("Generic UDAF aggregates") {
checkAnswer(sql("SELECT ceiling(percentile_approx(key,0.99999)) FROM src LIMIT 1"),
sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq)

checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9,0.9)) FROM src LIMIT 1"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space after ,

sql("SELECT array(100,100) FROM src LIMIT 1").collect().toSeq)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A tab after }?


test("UDFIntegerToString") {
val testData = TestHive.sparkContext.parallelize(
IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil)
Expand Down