Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use broadcastTimeout
  • Loading branch information
Davies Liu committed Feb 12, 2016
commit 016c36c316e0fe2a544646291ba3e4573aa4bb4c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.concurrent.duration._

import org.apache.spark.Logging
import org.apache.spark.rdd.{RDD, RDDOperationScope}
Expand Down Expand Up @@ -139,10 +139,19 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

children.foreach(_.prepare())

val timeout: Duration = {
val timeoutValue = sqlContext.conf.broadcastTimeout
if (timeoutValue < 0) {
Duration.Inf
} else {
timeoutValue.seconds
}
}

// fill in the result of subqueries
queryResults.foreach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should move the blocking phase into execute, otherwise if multiple nodes have subqueries, it becomes blocking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok you can't have a general execute.

I guess this is why some query engines have init and then prepare.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or a subquery is now blocking broadcasting ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called after doPrepare(), and after prepare() of it's children, so it will NOT block broadcasting (will happen in the same time).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there is a broadcast join after this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadcast will be issued before this.

case (e, futureResult) =>
val rows = Await.result(futureResult, Duration.Inf)
val rows = Await.result(futureResult, timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout is kind of weird right? Like the max timeout here is numberOfSubqueries * timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the subqueries are submitted in the same (during beginning of prepare()), so the total time should be timeout.

Should we create another config for subquery or rename the broadcast one?

if (rows.length > 1) {
sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use postgres' error message: "more than one row returned by a subquery used as an expression"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have never thought we should match the exactly error message with PostgreSQL, that's great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current error message has more information than postgres', should we change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not 100% sure. maybe it's better to just say more than one, so we don't need to run the whole plan (e..g i'm thinking maybe we should inject a limit of 2 to subquery)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, changed to call executeTake(2)

s"${e.query.treeString}")
Expand Down