Skip to content
Merged

sync #10

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
# This script builds and pushes docker images when run from a release of Spark
# with Kubernetes support.

set -x

function error {
echo "$@" 1>&2
exit 1
Expand Down Expand Up @@ -172,13 +174,19 @@ function build {
local BASEDOCKERFILE=${BASEDOCKERFILE:-"kubernetes/dockerfiles/spark/Dockerfile"}
local PYDOCKERFILE=${PYDOCKERFILE:-false}
local RDOCKERFILE=${RDOCKERFILE:-false}
local ARCHS=${ARCHS:-"--platform linux/amd64,linux/arm64"}

(cd $(img_ctx_dir base) && docker build $NOCACHEARG "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
-f "$BASEDOCKERFILE" .)
if [ $? -ne 0 ]; then
error "Failed to build Spark JVM Docker image, please refer to Docker build output for details."
fi
if [ "${CROSS_BUILD}" != "false" ]; then
(cd $(img_ctx_dir base) && docker buildx build $ARCHS $NOCACHEARG "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
-f "$BASEDOCKERFILE" .)
fi

if [ "${PYDOCKERFILE}" != "false" ]; then
(cd $(img_ctx_dir pyspark) && docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
Expand All @@ -187,6 +195,11 @@ function build {
if [ $? -ne 0 ]; then
error "Failed to build PySpark Docker image, please refer to Docker build output for details."
fi
if [ "${CROSS_BUILD}" != "false" ]; then
(cd $(img_ctx_dir pyspark) && docker buildx build $ARCHS $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-py) \
-f "$PYDOCKERFILE" .)
fi
fi

if [ "${RDOCKERFILE}" != "false" ]; then
Expand All @@ -196,6 +209,11 @@ function build {
if [ $? -ne 0 ]; then
error "Failed to build SparkR Docker image, please refer to Docker build output for details."
fi
if [ "${CROSS_BUILD}" != "false" ]; then
(cd $(img_ctx_dir sparkr) && docker buildx build $ARCHS $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-r) \
-f "$RDOCKERFILE" .)
fi
fi
}

Expand Down Expand Up @@ -227,6 +245,8 @@ Options:
-n Build docker image with --no-cache
-u uid UID to use in the USER directive to set the user the main Spark process runs as inside the
resulting container
-X Use docker buildx to cross build. Automatically pushes.
See https://docs.docker.com/buildx/working-with-buildx/ for steps to setup buildx.
-b arg Build arg to build or push the image. For multiple build args, this option needs to
be used separately for each build arg.

Expand All @@ -252,6 +272,12 @@ Examples:
- Build and push JDK11-based image with tag "v3.0.0" to docker.io/myrepo
$0 -r docker.io/myrepo -t v3.0.0 -b java_image_tag=11-jre-slim build
$0 -r docker.io/myrepo -t v3.0.0 push

- Build and push JDK11-based image for multiple archs to docker.io/myrepo
$0 -r docker.io/myrepo -t v3.0.0 -X -b java_image_tag=11-jre-slim build
# Note: buildx, which does cross building, needs to do the push during build
# So there is no seperate push step with -X

EOF
}

Expand All @@ -268,7 +294,8 @@ RDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
SPARK_UID=
while getopts f:p:R:mr:t:nb:u: option
CROSS_BUILD="false"
while getopts f:p:R:mr:t:Xnb:u: option
do
case "${option}"
in
Expand All @@ -279,6 +306,7 @@ do
t) TAG=${OPTARG};;
n) NOCACHEARG="--no-cache";;
b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
X) CROSS_BUILD=1;;
m)
if ! which minikube 1>/dev/null; then
error "Cannot find minikube."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ function renderDagVizForJob(svgContainer) {
var dot = metadata.select(".dot-file").text();
var stageId = metadata.attr("stage-id");
var containerId = VizConstants.graphPrefix + stageId;
var isSkipped = metadata.attr("skipped") == "true";
var isSkipped = metadata.attr("skipped") === "true";
var container;
if (isSkipped) {
container = svgContainer
Expand All @@ -225,11 +225,8 @@ function renderDagVizForJob(svgContainer) {
.attr("skipped", "true");
} else {
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
// Use the link from the stage table so it also works for the history server
var attemptId = 0;
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a.name-link")
.attr("href");
var stageLink = uiRoot + appBasePath + "/stages/stage/?id=" + stageId + "&attempt=" + attemptId;
container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/webui.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
*/

var uiRoot = "";
var appBasePath = "";

function setUIRoot(val) {
uiRoot = val;
}

function setAppBasePath(path) {
appBasePath = path;
}

function collapseTablePageLoad(name, table){
if (window.localStorage.getItem(name) == "true") {
// Set it to false so that the click function can revert it
Expand All @@ -33,7 +38,7 @@ function collapseTable(thisName, table){
var status = window.localStorage.getItem(thisName) == "true";
status = !status;

var thisClass = '.' + thisName
var thisClass = '.' + thisName;

// Expand the list of additional metrics.
var tableDiv = $(thisClass).parent().find('.' + table);
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ private[spark] object UIUtils extends Logging {
<html>
<head>
{commonHeaderNodes(request)}
<script>setAppBasePath('{activeTab.basePath}')</script>
{if (showVisualization) vizHeaderNodes(request) else Seq.empty}
{if (useDataTables) dataTablesHeaderNodes(request) else Seq.empty}
<link rel="shortcut icon"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,28 @@ abstract class RealBrowserUISeleniumSuite(val driverProp: String)
}
}

test("SPARK-31882: Link URL for Stage DAGs should not depend on paged table.") {
withSpark(newSparkContext()) { sc =>
sc.parallelize(1 to 100).map(v => (v, v)).repartition(10).reduceByKey(_ + _).collect

eventually(timeout(10.seconds), interval(50.microseconds)) {
val pathWithPagedTable =
"/jobs/job/?id=0&completedStage.page=2&completedStage.sort=Stage+Id&" +
"completedStage.desc=true&completedStage.pageSize=1#completed"
goToUi(sc, pathWithPagedTable)

// Open DAG Viz.
webDriver.findElement(By.id("job-dag-viz")).click()
val stages = webDriver.findElements(By.cssSelector("svg[class='job'] > a"))
stages.size() should be (3)

stages.get(0).getAttribute("href") should include ("/stages/stage/?id=0&attempt=0")
stages.get(1).getAttribute("href") should include ("/stages/stage/?id=1&attempt=0")
stages.get(2).getAttribute("href") should include ("/stages/stage/?id=2&attempt=0")
}
}
}

/**
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
Expand Down
14 changes: 11 additions & 3 deletions dev/create-release/do-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# limitations under the License.
#

set -e

SELF=$(cd $(dirname $0) && pwd)
. "$SELF/release-util.sh"

Expand Down Expand Up @@ -52,9 +54,6 @@ function should_build {
if should_build "tag" && [ $SKIP_TAG = 0 ]; then
run_silent "Creating release tag $RELEASE_TAG..." "tag.log" \
"$SELF/release-tag.sh"
echo "It may take some time for the tag to be synchronized to github."
echo "Press enter when you've verified that the new tag ($RELEASE_TAG) is available."
read
else
echo "Skipping tag creation for $RELEASE_TAG."
fi
Expand All @@ -79,3 +78,12 @@ if should_build "publish"; then
else
echo "Skipping publish step."
fi

if should_build "tag" && [ $SKIP_TAG = 0 ]; then
git push origin $RELEASE_TAG
if [[ $RELEASE_TAG != *"preview"* ]]; then
git push origin HEAD:$GIT_BRANCH
else
echo "It's preview release. We only push $RELEASE_TAG to remote."
fi
fi
7 changes: 5 additions & 2 deletions dev/create-release/release-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,12 @@ BASE_DIR=$(pwd)
init_java
init_maven_sbt

rm -rf spark
git clone "$ASF_REPO"
# Only clone repo fresh if not present, otherwise use checkout from the tag step
if [ ! -d spark ]; then
git clone "$ASF_REPO"
fi
cd spark
git fetch
git checkout $GIT_REF
git_hash=`git rev-parse --short HEAD`
echo "Checked out Spark git hash $git_hash"
Expand Down
16 changes: 3 additions & 13 deletions dev/create-release/release-tag.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ function exit_with_usage {
cat << EOF
usage: $NAME
Tags a Spark release on a particular branch.
You must push the tags after.

Inputs are specified with the following environment variables:
ASF_USERNAME - Apache Username
Expand Down Expand Up @@ -105,19 +106,8 @@ sed -i".tmp7" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: '"$R_NEXT_VERSION"

git commit -a -m "Preparing development version $NEXT_VERSION"

if ! is_dry_run; then
# Push changes
git push origin $RELEASE_TAG
if [[ $RELEASE_VERSION != *"preview"* ]]; then
git push origin HEAD:$GIT_BRANCH
else
echo "It's preview release. We only push $RELEASE_TAG to remote."
fi

cd ..
rm -rf spark
else
cd ..
cd ..
if is_dry_run; then
mv spark spark.tag
echo "Clone with version changes and tag available as spark.tag in the output directory."
fi
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7-hive-1.2
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,4 @@ xmlenc/0.52//xmlenc-0.52.jar
xz/1.5//xz-1.5.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
zookeeper/3.4.14//zookeeper-3.4.14.jar
zstd-jni/1.4.4-3//zstd-jni-1.4.4-3.jar
zstd-jni/1.4.5-2//zstd-jni-1.4.5-2.jar
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,4 @@ xmlenc/0.52//xmlenc-0.52.jar
xz/1.5//xz-1.5.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
zookeeper/3.4.14//zookeeper-3.4.14.jar
zstd-jni/1.4.4-3//zstd-jni-1.4.4-3.jar
zstd-jni/1.4.5-2//zstd-jni-1.4.5-2.jar
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,4 @@ xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
xz/1.5//xz-1.5.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
zookeeper/3.4.14//zookeeper-3.4.14.jar
zstd-jni/1.4.4-3//zstd-jni-1.4.4-3.jar
zstd-jni/1.4.5-2//zstd-jni-1.4.5-2.jar
18 changes: 18 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,24 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
<td><code>/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id]</code></td>
<td>Details of the given operation and given batch.</td>
</tr>
<tr>
<td><code>/applications/[app-id]/sql</code></td>
<td>A list of all queries for a given application.
<br>
<code>?details=[true (default) | false]</code> lists/hides details of Spark plan nodes.
<br>
<code>?planDescription=[true (default) | false]</code> enables/disables Physical <code>planDescription</code> on demand when Physical Plan size is high.
<br>
<code>?offset=[offset]&length=[len]</code> lists queries in the given range.
</tr>
<tr>
<td><code>/applications/[app-id]/sql/[execution-id]</code></td>
<td>Details for the given query.
<br>
<code>?details=[true (default) | false]</code> lists/hides metric details in addition to given query details.
<br>
<code>?planDescription=[true (default) | false]</code> enables/disables Physical <code>planDescription</code> on demand for the given query when Physical Plan size is high.
</tr>
<tr>
<td><code>/applications/[app-id]/environment</code></td>
<td>Environment details of the given application.</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ CREATE TABLE t (v INT);
-- `spark.sql.storeAssignmentPolicy=ANSI`
INSERT INTO t VALUES ('1');
org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table '`default`.`t`':
- Cannot safely cast 'v': StringType to IntegerType;
- Cannot safely cast 'v': string to int;

-- `spark.sql.storeAssignmentPolicy=LEGACY` (This is a legacy behaviour until Spark 2.x)
INSERT INTO t VALUES ('1');
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.4.4-3</version>
<version>1.4.5-2</version>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
Expand Down
35 changes: 24 additions & 11 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ def explain(self, extended=None, mode=None):
"""Prints the (logical and physical) plans to the console for debugging purpose.

:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
When this is a string without specifying the ``mode``, it works as the mode is
specified.
:param mode: specifies the expected output format of plans.

* ``simple``: Print only a physical plan.
Expand Down Expand Up @@ -306,31 +308,40 @@ def explain(self, extended=None, mode=None):
Output [2]: [age#0, name#1]
...

>>> df.explain("cost")
== Optimized Logical Plan ==
...Statistics...
...

.. versionchanged:: 3.0.0
Added optional argument `mode` to specify the expected output format of plans.
"""

if extended is not None and mode is not None:
raise Exception("extended and mode can not be specified simultaneously")
raise Exception("extended and mode should not be set together.")

# For the no argument case: df.explain()
is_no_argument = extended is None and mode is None

# For the cases below:
# explain(True)
# explain(extended=False)
is_extended_case = extended is not None and isinstance(extended, bool)
is_extended_case = isinstance(extended, bool) and mode is None

# For the mode specified: df.explain(mode="formatted")
is_mode_case = mode is not None and isinstance(mode, basestring)
# For the case when extended is mode:
# df.explain("formatted")
is_extended_as_mode = isinstance(extended, basestring) and mode is None

if not is_no_argument and not (is_extended_case or is_mode_case):
if extended is not None:
err_msg = "extended (optional) should be provided as bool" \
", got {0}".format(type(extended))
else: # For mode case
err_msg = "mode (optional) should be provided as str, got {0}".format(type(mode))
raise TypeError(err_msg)
# For the mode specified:
# df.explain(mode="formatted")
is_mode_case = extended is None and isinstance(mode, basestring)

if not (is_no_argument or is_extended_case or is_extended_as_mode or is_mode_case):
argtypes = [
str(type(arg)) for arg in [extended, mode] if arg is not None]
raise TypeError(
"extended (optional) and mode (optional) should be a string "
"and bool; however, got [%s]." % ", ".join(argtypes))

# Sets an explain mode depending on a given argument
if is_no_argument:
Expand All @@ -339,6 +350,8 @@ def explain(self, extended=None, mode=None):
explain_mode = "extended" if extended else "simple"
elif is_mode_case:
explain_mode = mode
elif is_extended_as_mode:
explain_mode = extended

print(self._sc._jvm.PythonSQLUtils.explainString(self._jdf.queryExecution(), explain_mode))

Expand Down
Loading