Skip to content

Commit 42bf232

Browse files
committed
Merge remote-tracking branch 'origin/master' into type-framework-v1
2 parents 8e1cdbd + 589141e commit 42bf232

File tree

1,275 files changed

+18022
-4838
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,275 files changed

+18022
-4838
lines changed

.github/workflows/build_and_test.yml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ jobs:
362362
- name: Install Python packages (Python 3.11)
363363
if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect') || contains(matrix.modules, 'yarn')
364364
run: |
365-
python3.11 -m pip install 'numpy>=1.22' pyarrow pandas scipy unittest-xml-reporting 'lxml==4.9.4' 'grpcio==1.67.0' 'grpcio-status==1.67.0' 'protobuf==5.29.1'
365+
python3.11 -m pip install 'numpy>=1.22' pyarrow pandas pyyaml scipy unittest-xml-reporting 'lxml==4.9.4' 'grpcio==1.67.0' 'grpcio-status==1.67.0' 'protobuf==5.29.1'
366366
python3.11 -m pip list
367367
# Run the tests.
368368
- name: Run tests
@@ -499,7 +499,8 @@ jobs:
499499
if: (!cancelled()) && (fromJson(needs.precondition.outputs.required).pyspark == 'true' || fromJson(needs.precondition.outputs.required).pyspark-pandas == 'true')
500500
name: "Build modules: ${{ matrix.modules }}"
501501
runs-on: ubuntu-latest
502-
timeout-minutes: 120
502+
# TODO(SPARK-53605): Restore pyspark execution timeout to 2 hours after fixing test_pandas_transform_with_state
503+
timeout-minutes: 150
503504
container:
504505
image: ${{ needs.precondition.outputs.image_pyspark_url_link }}
505506
strategy:
@@ -947,7 +948,7 @@ jobs:
947948
- uses: actions/setup-java@v4
948949
with:
949950
distribution: zulu
950-
java-version: 25-ea
951+
java-version: 25
951952
- name: Build with Maven
952953
run: |
953954
export MAVEN_OPTS="-Xss64m -Xmx4g -Xms4g -XX:ReservedCodeCacheSize=128m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
@@ -1323,9 +1324,9 @@ jobs:
13231324
sudo apt update
13241325
sudo apt-get install r-base
13251326
- name: Start Minikube
1326-
uses: medyagh/setup-minikube@v0.0.19
1327+
uses: medyagh/setup-minikube@v0.0.20
13271328
with:
1328-
kubernetes-version: "1.33.0"
1329+
kubernetes-version: "1.34.0"
13291330
# Github Action limit cpu:2, memory: 6947MB, limit to 2U6G for better resource statistic
13301331
cpus: 2
13311332
memory: 6144m

LICENSE-binary

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,6 @@ dev.ludovic.netlib:blas
479479
dev.ludovic.netlib:arpack
480480
dev.ludovic.netlib:lapack
481481
net.razorvine:pickle
482-
org.bouncycastle:bcprov-jdk18on
483482
org.checkerframework:checker-qual
484483
org.typelevel:algebra_2.13:jar
485484
org.typelevel:cats-kernel_2.13

assembly/pom.xml

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,6 @@
136136
<artifactId>guava</artifactId>
137137
<scope>${hadoop.deps.scope}</scope>
138138
</dependency>
139-
140-
<!--
141-
SPARK-51311: HDFS-15098 (3.4.0) adds hard dependency on bcprov-jdk18on, Spark fails to submit
142-
to Kerberized cluster without this dependency, until HADOOP-19152 (3.5.0, unreleased)
143-
-->
144-
<dependency>
145-
<groupId>org.bouncycastle</groupId>
146-
<artifactId>bcprov-jdk18on</artifactId>
147-
<scope>${hadoop.deps.scope}</scope>
148-
</dependency>
149139
</dependencies>
150140

151141
<build>

bin/spark-pipelines

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,11 @@ fi
3030
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
3131
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.9-src.zip:$PYTHONPATH"
3232

33-
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkPipelines "$@"
33+
SDP_CLI_PY_FILE_PATH=$("${PYSPARK_PYTHON}" - <<'EOF'
34+
import pyspark, os
35+
from pathlib import Path
36+
print(Path(os.path.dirname(pyspark.__file__)) / "pipelines" / "cli.py")
37+
EOF
38+
)
39+
40+
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkPipelines "$SDP_CLI_PY_FILE_PATH" "$@"

common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -642,9 +642,13 @@ public UTF8String substring(final int start, final int until) {
642642
}
643643

644644
int j = i;
645-
while (i < numBytes && c < until) {
646-
i += numBytesForFirstByte(getByte(i));
647-
c += 1;
645+
if (until == Integer.MAX_VALUE) {
646+
i = numBytes;
647+
} else {
648+
while (i < numBytes && c < until) {
649+
i += numBytesForFirstByte(getByte(i));
650+
c += 1;
651+
}
648652
}
649653

650654
if (i > j) {
@@ -663,9 +667,8 @@ public UTF8String substringSQL(int pos, int length) {
663667
// refers to element i-1 in the sequence. If a start index i is less than 0, it refers
664668
// to the -ith element before the end of the sequence. If a start index i is 0, it
665669
// refers to the first element.
666-
int len = numChars();
667670
// `len + pos` does not overflow as `len >= 0`.
668-
int start = (pos > 0) ? pos -1 : ((pos < 0) ? len + pos : 0);
671+
int start = (pos > 0) ? pos -1 : ((pos < 0) ? numChars() + pos : 0);
669672

670673
int end;
671674
if ((long) start + length > Integer.MAX_VALUE) {

common/utils/src/main/java/org/apache/spark/SparkThrowable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ default boolean isInternalError() {
6060
return SparkThrowableHelper.isInternalError(this.getCondition());
6161
}
6262

63+
// If null, the error message is not for a breaking change
64+
default BreakingChangeInfo getBreakingChangeInfo() {
65+
return SparkThrowableHelper.getBreakingChangeInfo(
66+
this.getCondition()).getOrElse(() -> null);
67+
}
68+
6369
default Map<String, String> getMessageParameters() {
6470
return new HashMap<>();
6571
}

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,11 @@
368368
"The change log writer version cannot be <version>."
369369
]
370370
},
371+
"INVALID_CHECKPOINT_LINEAGE" : {
372+
"message" : [
373+
"Invalid checkpoint lineage: <lineage>. <message>"
374+
]
375+
},
371376
"KEY_ROW_FORMAT_VALIDATION_FAILURE" : {
372377
"message" : [
373378
"<msg>"
@@ -2737,6 +2742,12 @@
27372742
],
27382743
"sqlState" : "42001"
27392744
},
2745+
"INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE" : {
2746+
"message" : [
2747+
"Expression type must be string type but got <exprType>."
2748+
],
2749+
"sqlState" : "42K09"
2750+
},
27402751
"INVALID_EXTERNAL_TYPE" : {
27412752
"message" : [
27422753
"The external type <externalType> is not valid for the type <type> at the expression <expr>."
@@ -3914,12 +3925,6 @@
39143925
},
39153926
"sqlState" : "42K0M"
39163927
},
3917-
"INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE" : {
3918-
"message" : [
3919-
"Variable type must be string type but got <varType>."
3920-
],
3921-
"sqlState" : "42K09"
3922-
},
39233928
"INVALID_VARIANT_CAST" : {
39243929
"message" : [
39253930
"The variant value `<value>` cannot be cast into `<dataType>`. Please use `try_variant_get` instead."
@@ -4915,6 +4920,12 @@
49154920
],
49164921
"sqlState" : "42601"
49174922
},
4923+
"REMAINDER_BY_ZERO" : {
4924+
"message" : [
4925+
"Remainder by zero. Use `try_mod` to tolerate divisor being 0 and return NULL instead. If necessary set <config> to \"false\" to bypass this error."
4926+
],
4927+
"sqlState" : "22012"
4928+
},
49184929
"RENAME_SRC_PATH_NOT_FOUND" : {
49194930
"message" : [
49204931
"Failed to rename as <sourcePath> was not found."
@@ -5162,6 +5173,12 @@
51625173
],
51635174
"sqlState" : "42802"
51645175
},
5176+
"STATE_STORE_CHECKPOINT_IDS_NOT_SUPPORTED" : {
5177+
"message" : [
5178+
"<msg>"
5179+
],
5180+
"sqlState" : "KD002"
5181+
},
51655182
"STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY" : {
51665183
"message" : [
51675184
"The checkpoint location <checkpointLocation> should be empty on batch 0",
@@ -5407,6 +5424,14 @@
54075424
},
54085425
"sqlState" : "42616"
54095426
},
5427+
"STDS_MIXED_CHECKPOINT_FORMAT_VERSIONS_NOT_SUPPORTED" : {
5428+
"message" : [
5429+
"Reading state across different checkpoint format versions is not supported.",
5430+
"startBatchId=<startBatchId>, endBatchId=<endBatchId>.",
5431+
"startFormatVersion=<startFormatVersion>, endFormatVersion=<endFormatVersion>."
5432+
],
5433+
"sqlState" : "KD002"
5434+
},
54105435
"STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE" : {
54115436
"message" : [
54125437
"The state does not have any partition. Please double check that the query points to the valid state. options: <sourceOptions>"

common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {
7575
matches.map(m => m.stripSuffix(">").stripPrefix("<"))
7676
}
7777

78+
def getBreakingChangeInfo(errorClass: String): Option[BreakingChangeInfo] = {
79+
val errorClasses = errorClass.split('.')
80+
errorClasses match {
81+
case Array(mainClass) =>
82+
errorInfoMap.get(mainClass).flatMap(_.breakingChangeInfo)
83+
case Array(mainClass, subClass) =>
84+
errorInfoMap.get(mainClass).flatMap{
85+
errorInfo =>
86+
errorInfo.subClass.flatMap(_.get(subClass))
87+
.flatMap(_.breakingChangeInfo)
88+
.orElse(errorInfo.breakingChangeInfo)
89+
}
90+
case _ => None
91+
}
92+
}
93+
7894
def getMessageTemplate(errorClass: String): String = {
7995
val errorClasses = errorClass.split("\\.")
8096
assert(errorClasses.length == 1 || errorClasses.length == 2)
@@ -128,7 +144,7 @@ private object ErrorClassesJsonReader {
128144
val map = mapper.readValue(url, new TypeReference[Map[String, ErrorInfo]]() {})
129145
val errorClassWithDots = map.collectFirst {
130146
case (errorClass, _) if errorClass.contains('.') => errorClass
131-
case (_, ErrorInfo(_, Some(map), _)) if map.keys.exists(_.contains('.')) =>
147+
case (_, ErrorInfo(_, Some(map), _, _)) if map.keys.exists(_.contains('.')) =>
132148
map.keys.collectFirst { case s if s.contains('.') => s }.get
133149
}
134150
if (errorClassWithDots.isEmpty) {
@@ -147,28 +163,59 @@ private object ErrorClassesJsonReader {
147163
* @param subClass SubClass associated with this class.
148164
* @param message Message format with optional placeholders (e.g. &lt;parm&gt;).
149165
* The error message is constructed by concatenating the lines with newlines.
166+
* @param breakingChangeInfo Additional metadata if the error is due to a breaking change.
150167
*/
151168
private case class ErrorInfo(
152169
message: Seq[String],
153170
subClass: Option[Map[String, ErrorSubInfo]],
154-
sqlState: Option[String]) {
171+
sqlState: Option[String],
172+
breakingChangeInfo: Option[BreakingChangeInfo] = None) {
155173
// For compatibility with multi-line error messages
156174
@JsonIgnore
157-
val messageTemplate: String = message.mkString("\n")
175+
val messageTemplate: String = message.mkString("\n") +
176+
breakingChangeInfo.map(_.migrationMessage.mkString(" ", "\n", "")).getOrElse("")
158177
}
159178

160179
/**
161180
* Information associated with an error subclass.
162181
*
163182
* @param message Message format with optional placeholders (e.g. &lt;parm&gt;).
164183
* The error message is constructed by concatenating the lines with newlines.
184+
* @param breakingChangeInfo Additional metadata if the error is due to a breaking change.
165185
*/
166-
private case class ErrorSubInfo(message: Seq[String]) {
186+
private case class ErrorSubInfo(
187+
message: Seq[String],
188+
breakingChangeInfo: Option[BreakingChangeInfo] = None) {
167189
// For compatibility with multi-line error messages
168190
@JsonIgnore
169-
val messageTemplate: String = message.mkString("\n")
191+
val messageTemplate: String = message.mkString("\n") +
192+
breakingChangeInfo.map(_.migrationMessage.mkString(" ", "\n", "")).getOrElse("")
170193
}
171194

195+
/**
196+
* Additional information if the error was caused by a breaking change.
197+
*
198+
* @param migrationMessage A message explaining how the user can migrate their job to work
199+
* with the breaking change.
200+
* @param mitigationConfig A spark config flag that can be used to mitigate the
201+
* breaking change.
202+
* @param needsAudit If true, the breaking change should be inspected manually.
203+
* If false, the spark job should be retried by setting the
204+
* mitigationConfig.
205+
*/
206+
case class BreakingChangeInfo(
207+
migrationMessage: Seq[String],
208+
mitigationConfig: Option[MitigationConfig] = None,
209+
needsAudit: Boolean = true
210+
)
211+
212+
/**
213+
* A spark config flag that can be used to mitigate a breaking change.
214+
* @param key The spark config key.
215+
* @param value The spark config value that mitigates the breaking change.
216+
*/
217+
case class MitigationConfig(key: String, value: String)
218+
172219
/**
173220
* Information associated with an error state / SQLSTATE.
174221
*

common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ private[spark] object SparkThrowableHelper {
7373
errorReader.getMessageParameters(errorClass)
7474
}
7575

76+
def getBreakingChangeInfo(errorClass: String): Option[BreakingChangeInfo] = {
77+
if (errorClass == null) {
78+
None
79+
} else {
80+
errorReader.getBreakingChangeInfo(errorClass)
81+
}
82+
}
83+
7684
def isInternalError(errorClass: String): Boolean = {
7785
errorClass != null && errorClass.startsWith("INTERNAL_ERROR")
7886
}
@@ -99,6 +107,19 @@ private[spark] object SparkThrowableHelper {
99107
g.writeStringField("errorClass", errorClass)
100108
if (format == STANDARD) {
101109
g.writeStringField("messageTemplate", errorReader.getMessageTemplate(errorClass))
110+
errorReader.getBreakingChangeInfo(errorClass).foreach { breakingChangeInfo =>
111+
g.writeObjectFieldStart("breakingChangeInfo")
112+
g.writeStringField("migrationMessage",
113+
breakingChangeInfo.migrationMessage.mkString("\n"))
114+
breakingChangeInfo.mitigationConfig.foreach { mitigationConfig =>
115+
g.writeObjectFieldStart("mitigationConfig")
116+
g.writeStringField("key", mitigationConfig.key)
117+
g.writeStringField("value", mitigationConfig.value)
118+
g.writeEndObject()
119+
}
120+
g.writeBooleanField("needsAudit", breakingChangeInfo.needsAudit)
121+
g.writeEndObject()
122+
}
102123
}
103124
val sqlState = e.getSqlState
104125
if (sqlState != null) g.writeStringField("sqlState", sqlState)

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ private[kafka010] class KafkaMicroBatchStream(
9191

9292
private var allDataForTriggerAvailableNow: PartitionOffsetMap = _
9393

94+
private var isTriggerAvailableNow: Boolean = false
95+
9496
/**
9597
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
9698
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
@@ -126,8 +128,14 @@ private[kafka010] class KafkaMicroBatchStream(
126128
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
127129

128130
// Use the pre-fetched list of partition offsets when Trigger.AvailableNow is enabled.
129-
latestPartitionOffsets = if (allDataForTriggerAvailableNow != null) {
130-
allDataForTriggerAvailableNow
131+
latestPartitionOffsets = if (isTriggerAvailableNow) {
132+
if (allDataForTriggerAvailableNow != null) {
133+
allDataForTriggerAvailableNow
134+
} else {
135+
allDataForTriggerAvailableNow =
136+
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
137+
allDataForTriggerAvailableNow
138+
}
131139
} else {
132140
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
133141
}
@@ -359,8 +367,7 @@ private[kafka010] class KafkaMicroBatchStream(
359367
}
360368

361369
override def prepareForTriggerAvailableNow(): Unit = {
362-
allDataForTriggerAvailableNow = kafkaOffsetReader.fetchLatestOffsets(
363-
Some(getOrCreateInitialPartitionOffsets()))
370+
isTriggerAvailableNow = true
364371
}
365372
}
366373

0 commit comments

Comments
 (0)