Skip to content

Commit 059b47a

Browse files
author
Yuval Itzchakov
committed
Merge remote-tracking branch 'origin/master'
2 parents 46af335 + 53ca975 commit 059b47a

File tree

52 files changed

+1542
-212
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1542
-212
lines changed

core/src/main/scala/org/apache/spark/BarrierTaskContext.scala

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,80 @@
1717

1818
package org.apache.spark
1919

20+
import java.util.Properties
21+
2022
import org.apache.spark.annotation.{Experimental, Since}
23+
import org.apache.spark.executor.TaskMetrics
24+
import org.apache.spark.memory.TaskMemoryManager
25+
import org.apache.spark.metrics.MetricsSystem
2126

2227
/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
23-
trait BarrierTaskContext extends TaskContext {
28+
class BarrierTaskContext(
29+
override val stageId: Int,
30+
override val stageAttemptNumber: Int,
31+
override val partitionId: Int,
32+
override val taskAttemptId: Long,
33+
override val attemptNumber: Int,
34+
override val taskMemoryManager: TaskMemoryManager,
35+
localProperties: Properties,
36+
@transient private val metricsSystem: MetricsSystem,
37+
// The default value is only used in tests.
38+
override val taskMetrics: TaskMetrics = TaskMetrics.empty)
39+
extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber,
40+
taskMemoryManager, localProperties, metricsSystem, taskMetrics) {
2441

2542
/**
2643
* :: Experimental ::
2744
* Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to
2845
* MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same
2946
* stage have reached this routine.
47+
*
48+
* CAUTION! In a barrier stage, each task must have the same number of barrier() calls, in all
49+
* possible code branches. Otherwise, you may get the job hanging or a SparkException after
50+
* timeout. Some examples of misuses listed below:
51+
* 1. Only call barrier() function on a subset of all the tasks in the same barrier stage, it
52+
* shall lead to timeout of the function call.
53+
* {{{
54+
* rdd.barrier().mapPartitions { (iter, context) =>
55+
* if (context.partitionId() == 0) {
56+
* // Do nothing.
57+
* } else {
58+
* context.barrier()
59+
* }
60+
* iter
61+
* }
62+
* }}}
63+
*
64+
* 2. Include barrier() function in a try-catch code block, this may lead to timeout of the
65+
* second function call.
66+
* {{{
67+
* rdd.barrier().mapPartitions { (iter, context) =>
68+
* try {
69+
* // Do something that might throw an Exception.
70+
* doSomething()
71+
* context.barrier()
72+
* } catch {
73+
* case e: Exception => logWarning("...", e)
74+
* }
75+
* context.barrier()
76+
* iter
77+
* }
78+
* }}}
3079
*/
3180
@Experimental
3281
@Since("2.4.0")
33-
def barrier(): Unit
82+
def barrier(): Unit = {
83+
// TODO SPARK-24817 implement global barrier.
84+
}
3485

3586
/**
3687
* :: Experimental ::
3788
* Returns the all task infos in this barrier stage, the task infos are ordered by partitionId.
3889
*/
3990
@Experimental
4091
@Since("2.4.0")
41-
def getTaskInfos(): Array[BarrierTaskInfo]
92+
def getTaskInfos(): Array[BarrierTaskInfo] = {
93+
val addressesStr = localProperties.getProperty("addresses", "")
94+
addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_))
95+
}
4296
}

core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala

Lines changed: 0 additions & 49 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ private[spark] class SparkSubmit extends Logging {
181181
if (args.isStandaloneCluster && args.useRest) {
182182
try {
183183
logInfo("Running Spark using the REST application submission protocol.")
184+
doRunMain()
184185
} catch {
185186
// Fail over to use the legacy submission gateway
186187
case e: SubmitRestConnectionException =>

core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class RDDBarrier[T: ClassTag](rdd: RDD[T]) {
2828

2929
/**
3030
* :: Experimental ::
31-
* Maps partitions together with a provided BarrierTaskContext.
31+
* Maps partitions together with a provided [[org.apache.spark.BarrierTaskContext]].
3232
*
3333
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
3434
* should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys.

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private[spark] abstract class Task[T](
8383
// TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether
8484
// the stage is barrier.
8585
context = if (isBarrier) {
86-
new BarrierTaskContextImpl(
86+
new BarrierTaskContext(
8787
stageId,
8888
stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal
8989
partitionId,

dev/deps/spark-deps-hadoop-2.6

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ stax-api-1.0.1.jar
191191
stream-2.7.0.jar
192192
stringtemplate-3.2.1.jar
193193
super-csv-2.2.0.jar
194-
univocity-parsers-2.6.3.jar
194+
univocity-parsers-2.7.3.jar
195195
validation-api-1.1.0.Final.jar
196196
xbean-asm6-shaded-4.8.jar
197197
xercesImpl-2.9.1.jar

dev/deps/spark-deps-hadoop-2.7

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ stax-api-1.0.1.jar
192192
stream-2.7.0.jar
193193
stringtemplate-3.2.1.jar
194194
super-csv-2.2.0.jar
195-
univocity-parsers-2.6.3.jar
195+
univocity-parsers-2.7.3.jar
196196
validation-api-1.1.0.Final.jar
197197
xbean-asm6-shaded-4.8.jar
198198
xercesImpl-2.9.1.jar

dev/deps/spark-deps-hadoop-3.1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ stream-2.7.0.jar
212212
stringtemplate-3.2.1.jar
213213
super-csv-2.2.0.jar
214214
token-provider-1.0.1.jar
215-
univocity-parsers-2.6.3.jar
215+
univocity-parsers-2.7.3.jar
216216
validation-api-1.1.0.Final.jar
217217
woodstox-core-5.0.3.jar
218218
xbean-asm6-shaded-4.8.jar

docs/sql-programming-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1876,6 +1876,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
18761876

18771877
## Upgrading From Spark SQL 2.3 to 2.4
18781878

1879+
- Since Spark 2.4, Spark will evaluate the set operations referenced in a query by following a precedence rule as per the SQL standard. If the order is not specified by parentheses, set operations are performed from left to right with the exception that all INTERSECT operations are performed before any UNION, EXCEPT or MINUS operations. The old behaviour of giving equal precedence to all the set operations are preserved under a newly added configuaration `spark.sql.legacy.setopsPrecedence.enabled` with a default value of `false`. When this property is set to `true`, spark will evaluate the set operators from left to right as they appear in the query given no explicit ordering is enforced by usage of parenthesis.
18791880
- Since Spark 2.4, Spark will display table description column Last Access value as UNKNOWN when the value was Jan 01 1970.
18801881
- Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
18811882
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
2323
import scala.collection.mutable.ArrayBuffer
2424

2525
import org.apache.avro.{Schema, SchemaBuilder}
26+
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
2627
import org.apache.avro.Schema.Type._
2728
import org.apache.avro.generic._
2829
import org.apache.avro.util.Utf8
@@ -86,8 +87,18 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
8687
case (LONG, LongType) => (updater, ordinal, value) =>
8788
updater.setLong(ordinal, value.asInstanceOf[Long])
8889

89-
case (LONG, TimestampType) => (updater, ordinal, value) =>
90-
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
90+
case (LONG, TimestampType) => avroType.getLogicalType match {
91+
case _: TimestampMillis => (updater, ordinal, value) =>
92+
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
93+
case _: TimestampMicros => (updater, ordinal, value) =>
94+
updater.setLong(ordinal, value.asInstanceOf[Long])
95+
case null => (updater, ordinal, value) =>
96+
// For backward compatibility, if the Avro type is Long and it is not logical type,
97+
// the value is processed as timestamp type with millisecond precision.
98+
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
99+
case other => throw new IncompatibleSchemaException(
100+
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
101+
}
91102

92103
case (LONG, DateType) => (updater, ordinal, value) =>
93104
updater.setInt(ordinal, (value.asInstanceOf[Long] / DateTimeUtils.MILLIS_PER_DAY).toInt)

0 commit comments

Comments
 (0)