Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
scalastyle
  • Loading branch information
juliuszsompolski committed Jul 13, 2023
commit 2e6effa703155ca72d378969e6515a40156d1eea
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package org.apache.spark.sql.connect.execution

private[connect] case class CachedStreamResponse[T](
// the actual cached response
response: T,
// index of the response in the response stream.
// responses produced in the stream are numbered consecutively starting from 1.
streamIndex: Long
)
// the actual cached response
response: T,
// index of the response in the response stream.
// responses produced in the stream are numbered consecutively starting from 1.
streamIndex: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.internal.Logging

/**
* ExecuteGrpcResponseSender sends responses to the GRPC stream.
* It runs on the RPC thread, and gets notified by ExecuteResponseObserver about available
* responses.
* It notifies the ExecuteResponseObserver back about cached responses that can be removed
* after being sent out.
* @param responseObserver the GRPC request StreamObserver
* ExecuteGrpcResponseSender sends responses to the GRPC stream. It runs on the RPC thread, and
* gets notified by ExecuteResponseObserver about available responses. It notifies the
* ExecuteResponseObserver back about cached responses that can be removed after being sent out.
* @param responseObserver
* the GRPC request StreamObserver
*/
private[connect] class ExecuteGrpcResponseSender[T](
grpcObserver: StreamObserver[T]) extends Logging {
private[connect] class ExecuteGrpcResponseSender[T](grpcObserver: StreamObserver[T])
extends Logging {

private var detached = false

/** Detach this sender from executionObserver.
* Called only from executionObserver that this sender is attached to.
* executionObserver holds lock, and needs to notify after this call. */
/**
* Detach this sender from executionObserver. Called only from executionObserver that this
* sender is attached to. executionObserver holds lock, and needs to notify after this call.
*/
def detach(): Unit = {
if (detached == true) {
throw new IllegalStateException("ExecuteGrpcResponseSender already detached!")
Expand All @@ -46,13 +46,14 @@ private[connect] class ExecuteGrpcResponseSender[T](

/**
* Attach to the executionObserver, consume responses from it, and send them to grpcObserver.
* @param lastConsumedStreamIndex the last index that was already consumed and sent.
* This sender will start from index after that.
* 0 means start from beginning (since first response has index 1)
* @param lastConsumedStreamIndex
* the last index that was already consumed and sent. This sender will start from index after
* that. 0 means start from beginning (since first response has index 1)
*
* @return true if the execution was detached before stream completed.
* The caller needs to finish the grpcObserver stream
* false if stream was finished. In this case, grpcObserver stream is already completed.
* @return
* true if the execution was detached before stream completed. The caller needs to finish the
* grpcObserver stream false if stream was finished. In this case, grpcObserver stream is
* already completed.
*/
def run(
executionObserver: ExecuteResponseObserver[T],
Expand All @@ -72,7 +73,7 @@ private[connect] class ExecuteGrpcResponseSender[T](
executionObserver.synchronized {
logDebug(s"Acquired lock.")
while (!detached && response.isEmpty &&
executionObserver.getLastIndex().forall(nextIndex <= _)) {
executionObserver.getLastIndex().forall(nextIndex <= _)) {
logDebug(s"Try to get response with index=$nextIndex from observer.")
response = executionObserver.getResponse(nextIndex)
logDebug(s"Response index=$nextIndex from observer: ${response.isDefined}")
Expand All @@ -85,8 +86,9 @@ private[connect] class ExecuteGrpcResponseSender[T](
logDebug(s"Reacquired lock after waiting.")
}
}
logDebug(s"Exiting loop: detached=$detached, response=$response," +
s"lastIndex=${executionObserver.getLastIndex()}")
logDebug(
s"Exiting loop: detached=$detached, response=$response," +
s"lastIndex=${executionObserver.getLastIndex()}")
}

// Send next available response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,52 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.internal.Logging

/**
* This StreamObserver is running on the execution thread. Execution pushes responses to it,
* it caches them.
* ExecuteResponseGRPCSender is the consumer of the responses ExecuteResponseObserver "produces".
* It waits on the monitor of ExecuteResponseObserver. New produced responses notify the monitor.
* The consumer must consume the responses consecutively,
* @see getResponse.
* This StreamObserver is running on the execution thread. Execution pushes responses to it, it
* caches them. ExecuteResponseGRPCSender is the consumer of the responses ExecuteResponseObserver
* "produces". It waits on the monitor of ExecuteResponseObserver. New produced responses notify
* the monitor. The consumer must consume the responses consecutively,
* @see
* getResponse.
*
* ExecuteResponseObserver controls how responses stay cached after being returned to consumer,
* @see removeCachedResponses.
* @see
* removeCachedResponses.
*
* A single ExecuteResponseGRPCSender can be attached to the ExecuteResponseObserver.
* Attaching a new one will notify an existing one that it was detached.
* A single ExecuteResponseGRPCSender can be attached to the ExecuteResponseObserver. Attaching a
* new one will notify an existing one that it was detached.
*
* @param responseObserver
*/
private[connect] class ExecuteResponseObserver[T]()
extends StreamObserver[T]
with Logging {
private[connect] class ExecuteResponseObserver[T]() extends StreamObserver[T] with Logging {

/** Cached responses produced by the execution.
* Map from response index -> response.
* Response indexes are numbered consecutively starting from 1. */
/**
* Cached responses produced by the execution. Map from response index -> response. Response
* indexes are numbered consecutively starting from 1.
*/
private val responses: mutable.Map[Long, CachedStreamResponse[T]] =
new mutable.HashMap[Long, CachedStreamResponse[T]]()

/** Cached error of the execution, if an error was thrown. */
private var error: Option[Throwable] = None

/** If execution stream is finished (completed or with error), the index of the final response. */
/**
* If execution stream is finished (completed or with error), the index of the final response.
*/
private var finalProducedIndex: Option[Long] = None // index of final response before completed.

/** The index of the last response produced by execution. */
private var lastProducedIndex: Long = 0 // first response will have index 1

/** Highest response index that was consumed.
* Keeps track of it to decide which responses needs to be cached,
* and to assert that all responses are consumed. */
/**
* Highest response index that was consumed. Keeps track of it to decide which responses needs
* to be cached, and to assert that all responses are consumed.
*/
private var highestConsumedIndex: Long = 0

/** Consumer that waits for available responses.
* There can be only one at a time, @see attachConsumer. */
/**
* Consumer that waits for available responses. There can be only one at a time, @see
* attachConsumer.
*/
private var responseSender: Option[ExecuteGrpcResponseSender[T]] = None

def onNext(r: T): Unit = synchronized {
Expand Down Expand Up @@ -97,8 +102,7 @@ private[connect] class ExecuteResponseObserver[T]()
}

/** Attach a new consumer (ExecuteResponseGRPCSender). */
def attachConsumer(
newSender: ExecuteGrpcResponseSender[T]): Unit = synchronized {
def attachConsumer(newSender: ExecuteGrpcResponseSender[T]): Unit = synchronized {
// detach the current sender before attaching new one
// this.synchronized() needs to be held while detaching a sender, and the detached sender
// needs to be notified with notifyAll() afterwards.
Expand All @@ -121,7 +125,7 @@ private[connect] class ExecuteResponseObserver[T]()
ret
}

/** Get the stream error if there is one, otherwise None. */
/** Get the stream error if there is one, otherwise None. */
def getError(): Option[Throwable] = synchronized {
error
}
Expand All @@ -141,9 +145,11 @@ private[connect] class ExecuteResponseObserver[T]()
notifyAll()
}

/** Remove cached responses after response with lastReturnedIndex is returned from getResponse.
* Remove according to caching policy:
* - if query is not reattachable, remove all responses up to and including highestConsumedIndex.
/**
* Remove cached responses after response with lastReturnedIndex is returned from getResponse.
* Remove according to caching policy:
* - if query is not reattachable, remove all responses up to and including
* highestConsumedIndex.
*/
private def removeCachedResponses() = {
var i = highestConsumedIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import org.apache.spark.sql.connect.service.ExecuteHolder
import org.apache.spark.sql.connect.utils.ErrorUtils
import org.apache.spark.util.Utils

/** This class launches the actual execution in an execution thread.
* The execution pushes the responses to a ExecuteResponseObserver in executeHolder.
* ExecuteResponseObserver holds the responses that can be consumed by the RPC thread.
/**
* This class launches the actual execution in an execution thread. The execution pushes the
* responses to a ExecuteResponseObserver in executeHolder. ExecuteResponseObserver holds the
* responses that can be consumed by the RPC thread.
*/
private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder)
extends Logging {
private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends Logging {

// The newly created thread will inherit all InheritableThreadLocals used by Spark,
// e.g. SparkContext.localProperties. If considering implementing a threadpool,
Expand Down Expand Up @@ -171,4 +171,3 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
Dataset.ofRows(sessionHolder.session, planner.transformRelation(request.getPlan.getRoot))
responseObserver.onNext(createSchemaResponse(request.getSessionId, dataframe.schema))
processAsArrowBatches(request.getSessionId, dataframe, responseObserver)
responseObserver.onNext(MetricGenerator.createMetricsResponse(request.getSessionId, dataframe))
responseObserver.onNext(
MetricGenerator.createMetricsResponse(request.getSessionId, dataframe))
if (dataframe.queryExecution.observedMetrics.nonEmpty) {
responseObserver.onNext(
createObservedMetricsResponse(request.getSessionId, dataframe))
responseObserver.onNext(createObservedMetricsResponse(request.getSessionId, dataframe))
}
responseObserver.onCompleted()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.spark.sql.connect.execution.{ExecuteGrpcResponseSender, Execut
private[connect] class ExecuteHolder(
val request: proto.ExecutePlanRequest,
val operationId: String,
val sessionHolder: SessionHolder) extends Logging {
val sessionHolder: SessionHolder)
extends Logging {

val jobTag =
s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
Expand All @@ -39,24 +40,28 @@ private[connect] class ExecuteHolder(

var runner: ExecuteThreadRunner = new ExecuteThreadRunner(this)

/** Start the execution.
* The execution is started in a background thread in ExecuteThreadRunner.
* Responses are produced and cached in ExecuteResponseObserver.
* A GRPC thread consumes the responses by attaching an ExecuteGrpcResponseSender,
* @see attachAndRunGrpcResponseSender.
/**
* Start the execution. The execution is started in a background thread in ExecuteThreadRunner.
* Responses are produced and cached in ExecuteResponseObserver. A GRPC thread consumes the
* responses by attaching an ExecuteGrpcResponseSender,
* @see
* attachAndRunGrpcResponseSender.
*/
def start(): Unit = {
runner.start()
}

/** Attach an ExecuteGrpcResponseSender that will consume responses from the query and
* send them out on the Grpc response stream.
* @param responseSender the ExecuteGrpcResponseSender
* @param lastConsumedStreamIndex the last index that was already consumed.
* The consumer will start from index after that.
* 0 means start from beginning (since first response has index 1)
* @return true if the sender got detached without completing the stream.
* false if the executing stream was completely sent out.
/**
* Attach an ExecuteGrpcResponseSender that will consume responses from the query and send them
* out on the Grpc response stream.
* @param responseSender
* the ExecuteGrpcResponseSender
* @param lastConsumedStreamIndex
* the last index that was already consumed. The consumer will start from index after that. 0
* means start from beginning (since first response has index 1)
* @return
* true if the sender got detached without completing the stream. false if the executing
* stream was completely sent out.
*/
def attachAndRunGrpcResponseSender(
responseSender: ExecuteGrpcResponseSender[proto.ExecutePlanResponse],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ private[connect] object ErrorUtils extends Logging {
.newBuilder()
.setReason(st.getClass.getName)
.setDomain("org.apache.spark")
.putMetadata("classes",
.putMetadata(
"classes",
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName))))

lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
Expand All @@ -88,21 +89,21 @@ private[connect] object ErrorUtils extends Logging {
}

/**
* Common exception handling function for RPC methods.
* Closes the stream after the error has been sent.
* Common exception handling function for RPC methods. Closes the stream after the error has
* been sent.
*
* @param opType
* String value indicating the operation type (analysis, execution)
* String value indicating the operation type (analysis, execution)
* @param observer
* The GRPC response observer.
* The GRPC response observer.
* @tparam V
* @return
*/
def handleError[V](
opType: String,
observer: StreamObserver[V],
userId: String,
sessionId: String): PartialFunction[Throwable, Unit] = {
opType: String,
observer: StreamObserver[V],
userId: String,
sessionId: String): PartialFunction[Throwable, Unit] = {
val session =
SparkConnectService
.getOrCreateIsolatedSession(userId, sessionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ private[connect] object MetricGenerator extends AdaptiveSparkPlanHelper {
}

private def transformPlan(
p: SparkPlan,
parentId: Int): Seq[ExecutePlanResponse.Metrics.MetricObject] = {
p: SparkPlan,
parentId: Int): Seq[ExecutePlanResponse.Metrics.MetricObject] = {
val mv = p.metrics.map(m =>
m._1 -> ExecutePlanResponse.Metrics.MetricValue.newBuilder
.setName(m._2.name.getOrElse(""))
Expand Down