Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add failing test which demonstrates exception masking issue
  • Loading branch information
JoshRosen committed Jul 28, 2015
commit 8b162b6776f5cda19a0e80147e27a4fa53d1780f
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package org.apache.spark.sql.hive.execution

import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan, SparkPlanTest}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.types.StringType

Expand Down Expand Up @@ -71,4 +73,31 @@ class ScriptTransformationSuite extends SparkPlanTest {
)(TestHive),
rowsDf.collect())
}

test("script transformation should not swallow errors from upstream pipelined operators") {
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
val e = intercept[Exception] {
checkAnswer(
rowsDf,
(child: SparkPlan) => new ScriptTransformation(
input = Seq(rowsDf.col("a").expr),
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
child = ExceptionInjectingOperator(child),
ioschema = serdeIOSchema
)(TestHive),
rowsDf.collect())
}
assert(e.getMessage === "intentional exception")
}
}

private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryNode {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it was important to test both with and without a SerDe, since an earlier version of my fix missed one of the cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that there's quite a bit of code redundancy in this test suite but I'm not super motivated to clean it up since I don't expect that it will ever need to change.

override protected def doExecute(): RDD[InternalRow] = {
child.execute().map { x =>
Thread.sleep(1000) // This sleep gives the external process time to start.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was necessary in order to trigger certain timing-related bugs. This shouldn't lead to test flakiness, though, but it might hypothetically lead to false-negatives if someone were to break this code again in the future. The idea here is to give the external child process time to start so that it has a chance to hang.

throw new Exception("intentional exception")
}
}
override def output: Seq[Attribute] = child.output
}