Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
169 commits
Select commit Hold shift + click to select a range
fb41b90
[SPARK-34361][K8S] In case of downscaling avoid killing of executors …
attilapiros Mar 3, 2021
b8b6f88
[SPARK-34948][K8S] Add ownerReference to executor configmap to fix le…
dongjoon-hyun Apr 3, 2021
e852a3c
[SPARK-35482][K8S] Use `spark.blockManager.port` not the wrong `spark…
yaooqinn May 21, 2021
5625c45
[SPARK-35493][K8S] make `spark.blockManager.port` fallback for `spark…
yaooqinn May 23, 2021
7c3e411
[SPARK-32975][K8S] Add config for driver readiness timeout before exe…
cchriswu Jun 4, 2021
38ec106
[SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception
yaooqinn Jun 10, 2021
ee142c0
[MINOR][K8S] Print the driver pod name instead of Some(name) if absent
yaooqinn Jun 13, 2021
fd14c30
[SPARK-37049][K8S] executorIdleTimeout should check `creationTimestam…
yangwwei Oct 20, 2021
8e57cfe
[SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapsho…
dongjoon-hyun Dec 1, 2021
3c35c38
[SPARK-34949][CORE] Prevent BlockManager reregister when Executor is …
sumeetgajjar Apr 5, 2021
c352398
[SPARK-34674][CORE][K8S] Close SparkContext after the Main method has…
kotlovs Apr 22, 2021
a004fb6
[SPARK-36193][CORE] Recover SparkSubmit.runMain not to stop SparkCont…
dongjoon-hyun Jul 19, 2021
a92ed12
[SPARK-35879][CORE][SHUFFLE] Fix performance regression caused by col…
yaooqinn Jun 26, 2021
d6b5c39
[SPARK-34473][SQL] Avoid NPE in DataFrameReader.schema(StructType)
cloud-fan Feb 22, 2021
866e683
[SPARK-34490][SQL] Analysis should fail if the view refers a dropped …
linhongliu-db Feb 23, 2021
6003f7c
[SPARK-34515][SQL] Fix NPE if InSet contains null value during getPar…
ulysses-you Feb 24, 2021
eee08d7
[SPARK-34436][SQL] DPP support LIKE ANY/ALL expression
wangyum Feb 25, 2021
91f2a9e
[SPARK-34550][SQL] Skip InSet null value during push filter to Hive m…
ulysses-you Feb 26, 2021
d78ae65
[SPARK-34556][SQL] Checking duplicate static partition columns should…
zsxwing Mar 1, 2021
5399700
[SPARK-34417][SQL] org.apache.spark.sql.DataFrameNaFunctions.fillMap …
amandeep-sharma Mar 2, 2021
59bd127
[SPARK-34547][SQL] Only use metadata columns for resolution as last r…
karenfeng Mar 2, 2021
bcf662e
[SPARK-34596][SQL] Use Utils.getSimpleName to avoid hitting Malformed…
rednaxelafx Mar 3, 2021
0ba7767
[SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch…
Mar 3, 2021
b3500ce
[SPARK-34555][SQL] Resolve metadata output from DataFrame
karenfeng Mar 3, 2021
c41b543
[SPARK-34584][SQL] Static partition should also follow StoreAssignmen…
cloud-fan Mar 4, 2021
463e130
[SPARK-34599][SQL] Fix the issue that INSERT INTO OVERWRITE doesn't s…
zsxwing Mar 4, 2021
f3fdc8d
[SPARK-34567][SQL] CreateTableAsSelect should update metrics too
AngersZhuuuu Mar 4, 2021
c9700c9
[SPARK-34482][SS] Correct the active SparkSession for StreamExecution…
Ngone51 Mar 4, 2021
db5fe1d
[SPARK-32924][WEBUI] Make duration column in master UI sorted in the …
Mar 4, 2021
4e48ab4
[SPARK-34613][SQL] Fix view does not capture disable hint config
ulysses-you Mar 5, 2021
81d34b4
[SPARK-34607][SQL][3.1] Add `Utils.isMemberClass` to fix a malformed …
maropu Mar 5, 2021
e0ef75a
[SPARK-34681][SQL] Fix bug for full outer shuffled hash join when bui…
c21 Mar 10, 2021
b82a653
[SPARK-34682][SQL] Fix regression in canonicalization error check in …
andygrove Mar 10, 2021
32624a7
[SPARK-34682][SQL] Use PrivateMethodTester instead of reflection
andygrove Mar 10, 2021
6f94a88
[MINOR][SQL] Remove unnecessary extend from BroadcastHashJoinExec
c21 Mar 11, 2021
f5474b5
[SPARK-34713][SQL] Fix group by CreateStruct with ExtractValue
cloud-fan Mar 11, 2021
b52570a
[SPARK-34724][SQL] Fix Interpreted evaluation by using getMethod inst…
dongjoon-hyun Mar 12, 2021
52395a7
[SPARK-34723][SQL] Correct parameter type for subexpression eliminati…
viirya Mar 13, 2021
cf311da
[SPARK-34727][SQL] Fix discrepancy in casting float to timestamp
MaxGekk Mar 14, 2021
bd2e15c
[SPARK-34504][SQL] Avoid unnecessary resolving of SQL temp views for …
cloud-fan Mar 17, 2021
a91c0d8
[SPARK-34770][SQL] InMemoryCatalog.tableExists should not fail if dat…
cloud-fan Mar 17, 2021
ff792c4
[SPARK-34749][SQL][3.1] Simplify ResolveCreateNamedStruct
cloud-fan Mar 17, 2021
d2b4abf
[SPARK-34737][SQL][3.1] Cast input float to double in `TIMESTAMP_SECO…
MaxGekk Mar 18, 2021
acfe003
[SPARK-34731][CORE] Avoid ConcurrentModificationException when redact…
bersprockets Mar 18, 2021
24369bd
[SPARK-34776][SQL] Nested column pruning should not prune Window prod…
viirya Mar 19, 2021
f3c14c5
[SPARK-34796][SQL][3.1] Initialize counter variable for LIMIT code-ge…
c21 Mar 21, 2021
1d45ac7
[SPARK-34811][CORE] Redact fs.s3a.access.key like secret and token
dongjoon-hyun Mar 21, 2021
20be38f
[SPARK-34225][CORE] Don't encode further when a URI form string is pa…
sarutak Mar 22, 2021
519a6fa
[SPARK-34790][CORE] Disable fetching shuffle blocks in batch when io …
hezuojiao Mar 22, 2021
5a22ad8
[SPARK-33482][SPARK-34756][SQL] Fix FileScan equality check
peter-toth Mar 23, 2021
e139042
[SPARK-34833][SQL] Apply right-padding correctly for correlated subqu…
maropu Mar 24, 2021
544a035
[SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …
otterc Mar 25, 2021
975baa6
[SPARK-34833][SQL][FOLLOWUP] Handle outer references in all the places
cloud-fan Mar 26, 2021
dcb2bad
[SPARK-34829][SQL] Fix higher order function results
peter-toth Mar 28, 2021
31fa680
[SPARK-34876][SQL] Fill defaultResult of non-nullable aggregates
tanelk Mar 29, 2021
ee91028
[SPARK-34814][SQL] LikeSimplification should handle NULL
AngersZhuuuu Mar 29, 2021
891e873
[SPARK-34845][CORE] ProcfsMetricsGetter shouldn't return partial proc…
Mar 29, 2021
3a53e8c
[SPARK-34909][SQL] Fix conversion of negative to unsigned in conv()
timarmstrong Mar 31, 2021
d56aabe
[SPARK-34939][CORE] Throw fetch failure exception when unable to dese…
viirya Apr 4, 2021
21a727d
[SPARK-34923][SQL] Metadata output should be empty for more plans
karenfeng Apr 6, 2021
58af553
[SPARK-34922][SQL][3.1] Use a relative cost comparison function in th…
tanelk Apr 8, 2021
9ad6a06
[SPARK-34963][SQL] Fix nested column pruning for extracting case-inse…
viirya Apr 9, 2021
8715cf9
[SPARK-34926][SQL][3.1] PartitioningUtils.getPathFragment() should re…
AngersZhuuuu Apr 12, 2021
8accb94
[SPARK-35014] Fix the PhysicalAggregation pattern to not rewrite fold…
sigmod Apr 13, 2021
9e89876
[SPARK-34834][NETWORK] Fix a potential Netty memory leak in Transport…
weixiuli Apr 14, 2021
844beff
[SPARK-34225][CORE][FOLLOWUP] Replace Hadoop's Path with Utils.resolv…
sarutak Apr 15, 2021
8afe6b2
[SPARK-35136] Remove initial null value of LiveStage.info
sander-goos Apr 19, 2021
363a5bd
[SPARK-35117][UI] Change progress bar back to highlight ratio of task…
Kimahriman Apr 20, 2021
95acf41
[SPARK-35080][SQL] Only allow a subset of correlated equality predica…
allisonwang-db Apr 20, 2021
a41cebe
[SPARK-34639][SQL][3.1] RelationalGroupedDataset.alias should not cre…
cloud-fan Apr 21, 2021
06e5e2a
[SPARK-35096][SQL] SchemaPruning should adhere spark.sql.caseSensitiv…
sandeep-katta Apr 21, 2021
a657b0a
[SPARK-35127][UI] When we switch between different stage-detail pages…
echohlne Apr 22, 2021
60867dd
[SPARK-34897][SQL][3.1] Support reconcile schemas based on index afte…
wangyum Apr 23, 2021
b1e56bd
[SPARK-35168][SQL] mapred.reduce.tasks should be shuffle.partitions n…
yaooqinn Apr 25, 2021
bc789f2
[SPARK-35087][UI] Some columns in table Aggregated Metrics by Executo…
echohlne Apr 26, 2021
7b26b0c
[SPARK-35213][SQL] Keep the correct ordering of nested structs in cha…
Kimahriman Apr 26, 2021
a6e8f53
[SPARK-35244][SQL] Invoke should throw the original exception
cloud-fan Apr 28, 2021
5d4ab0e
[SPARK-35278][SQL] Invoke should find the method with correct number …
viirya May 1, 2021
4e2198c
[SPARK-34794][SQL] Fix lambda variable name issues in nested DataFram…
May 5, 2021
80b41db
[SPARK-35288][SQL] StaticInvoke should find the method without exact …
viirya May 7, 2021
da70670
[SPARK-35359][SQL] Insert data with char/varchar datatype will fail w…
fhygh May 17, 2021
53c98d8
[SPARK-35411][SQL] Add essential information while serializing TreeNo…
ivoson May 18, 2021
18c468b
[SPARK-35106][CORE][SQL] Avoid failing rename caused by destination d…
May 19, 2021
fd47c5d
[SPARK-35093][SQL] AQE now uses newQueryStage plan as key for looking…
andygrove May 19, 2021
84c53cb
[SPARK-35287][SQL] Allow RemoveRedundantProjects to preserve ProjectE…
sarutak May 24, 2021
81ad2a6
[SPARK-35449][SQL][3.1] Only extract common expressions from CaseWhen…
Kimahriman May 24, 2021
3e01a01
[SPARK-35454][SQL][3.1] One LogicalPlan can match multiple dataset ids
Ngone51 May 28, 2021
eeb5e9c
[SPARK-35411][SQL][FOLLOWUP] Handle Currying Product while serializin…
ivoson May 31, 2021
5465d29
[SPARK-35610][CORE] Fix the memory leak introduced by the Executor's …
attilapiros Jun 2, 2021
6953e0c
[SPARK-35589][CORE][3.1] BlockManagerMasterEndpoint should not ignore…
dongjoon-hyun Jun 3, 2021
d40b540
[SPARK-35679][SQL] instantToMicros overflow
dchvn Jun 10, 2021
c387856
[SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in sub…
cfmcgrady Jun 10, 2021
d1523a2
[SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails f…
Jun 10, 2021
f8e19bc
[SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetri…
sarutak Jun 10, 2021
c42759c
[SPARK-35652][SQL] joinWith on two table generated from same one
dchvn Jun 11, 2021
f1a1e2e
[SPARK-35695][SQL] Collect observed metrics from cached and adaptive …
tanelk Jun 11, 2021
f87ad51
[SPARK-35746][UI] Fix taskid in the stage page task event timeline
shahidki31 Jun 12, 2021
e934d57
[SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
wankunde Jun 13, 2021
763c27c
[SPARK-35767][SQL] Avoid executing child plan twice in CoalesceExec
andygrove Jun 15, 2021
6ae88cf
[SPARK-35791][SQL] Release on-going map properly for NULL-aware ANTI …
c21 Jun 17, 2021
bd86d35
[SPARK-34766][SQL][3.1] Do not capture maven config for views
ulysses-you Mar 18, 2021
07802bb
[SPARK-35792][SQL] View should not capture configs used in `RelationC…
linhongliu-db Jun 17, 2021
c33950e
[SPARK-35391] Fix memory leak in ExecutorAllocationListener
VasilyKolpakov Jun 21, 2021
5fe864a
[SPARK-35695][SQL][FOLLOWUP] Use AQE helper to simplify the code in C…
cloud-fan Jun 23, 2021
d2cbf6e
[SPARK-35841][SQL] Casting string to decimal type doesn't work if the…
dchvn Jun 24, 2021
70ec5ed
[SPARK-35886][SQL][3.1] PromotePrecision should not overwrite genCode…
viirya Jun 27, 2021
e48023a
[SPARK-35898][SQL] Fix arrays and maps in RowToColumnConverter
tomvanbussel Jun 28, 2021
1c89ae7
[SPARK-35935][SQL][3.1][3.0] Prevent failure of `MSCK REPAIR TABLE` o…
MaxGekk Jun 30, 2021
7295568
[SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for WorkerW…
Ngone51 Jul 1, 2021
8ae868c
[SPARK-36020][SQL][3.1] Check logical link in remove redundant projects
ulysses-you Jul 7, 2021
ccfbc17
[SQL][MINOR] EquivalentExpressions.commonChildrenToRecurse should ski…
cloud-fan Jul 13, 2021
a27f021
[SPARK-36076][SQL][3.1] ArrayIndexOutOfBounds in Cast string to times…
dchvn Jul 13, 2021
2bbd9bf
[SPARK-36034][SQL][3.1] Rebase datetime in pushed down filters to par…
MaxGekk Jul 16, 2021
a151a0c
[SPARK-35972][SQL][3.1] When replace ExtractValue in NestedColumnAlia…
AngersZhuuuu Jul 16, 2021
8eef6e8
[SPARK-36093][SQL][3.1] RemoveRedundantAliases should not change Comm…
AngersZhuuuu Jul 19, 2021
e304be4
[SPARK-36079][SQL] Null-based filter estimate should always be in the…
karenfeng Jul 20, 2021
bcdfe8b
[SPARK-36210][SQL] Preserve column insertion order in Dataset.withCol…
koertkuipers Jul 20, 2021
c50043d
[SPARK-35027][CORE] Close the inputStream in FileAppender when writin…
jhu-chang Jul 21, 2021
e031a6e
[SPARK-36020][SQL][FOLLOWUP] RemoveRedundantProjects should retain th…
cloud-fan Jul 21, 2021
51146da
[SPARK-28266][SQL] convertToLogicalRelation should not interpret `pat…
shardulm94 Jul 21, 2021
3f99be0
[SPARK-36213][SQL] Normalize PartitionSpec for Describe Table Command…
yaooqinn Jul 21, 2021
9c2f8b3
[SPARK-36242][CORE][3.1] Ensure spill file closed before set success …
LuciferYang Jul 27, 2021
7d1ba37
[SPARK-36354][CORE] EventLogFileReader should skip rolling event log …
dongjoon-hyun Aug 4, 2021
d2e9151
[SPARK-36086][SQL][3.1] CollapseProject project replace alias should …
AngersZhuuuu Aug 9, 2021
67d5aa9
[SPARK-36339][SQL][3.0] References to grouping that not part of aggre…
gaoyajun02 Aug 9, 2021
ca5113d
[SPARK-36464][CORE] Fix Underlying Size Variable Initialization in Ch…
kazuyukitanimura Aug 10, 2021
24c7d5e
[SPARK-36489][SQL] Aggregate functions over no grouping keys, on tabl…
IonutBoicuAms Aug 12, 2021
4688592
[SPARK-36353][SQL][3.1] RemoveNoopOperators should keep output schema
AngersZhuuuu Aug 13, 2021
6ad9939
[SPARK-36500][CORE] Fix temp_shuffle file leaking when a task is inte…
jiangxb1987 Aug 13, 2021
83d8105
[SPARK-35876][SQL][3.1] ArraysZip should retain field names to avoid …
AngersZhuuuu Aug 24, 2021
8ffb3f0
[SPARK-36564][CORE] Fix NullPointerException in LiveRDDDistribution.t…
Ngone51 Aug 24, 2021
6d49a31
[SPARK-36352][SQL][3.1] Spark should check result plan's output schem…
AngersZhuuuu Aug 25, 2021
491e1ed
[SPARK-36509][CORE] Fix the issue that executors are never re-schedul…
sarutak Aug 28, 2021
f001d98
[SPARK-36614][CORE][UI] Correct executor loss reason caused by decomm…
Ngone51 Aug 30, 2021
cabf1bf
[SPARK-36639][SQL] Fix an issue that sequence builtin function causes…
sarutak Sep 3, 2021
051073b
[SPARK-36704][CORE] Expand exception handling to more Java 9 cases wh…
srowen Sep 11, 2021
589d0d5
[SPARK-36715][SQL] InferFiltersFromGenerate should not infer filter f…
cfmcgrady Sep 14, 2021
ebe5b88
[SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan
AngersZhuuuu Sep 14, 2021
29b38d3
[SPARK-36702][SQL][FOLLOWUP] ArrayUnion handle duplicated Double.NaN …
AngersZhuuuu Sep 15, 2021
402ca59
[SPARK-36755][SQL] ArraysOverlap should handle duplicated Double.NaN …
AngersZhuuuu Sep 15, 2021
8e3a919
[SPARK-36783][SQL] ScanOperation should not push Filter through nonde…
cloud-fan Sep 17, 2021
37a1871
[SPARK-36789][SQL] Use the correct constant type as the null value ho…
cloud-fan Sep 17, 2021
27ca66c
[SPARK-36741][SQL] ArrayDistinct handle duplicated Double.NaN and Flo…
AngersZhuuuu Sep 17, 2021
0b41e99
[SPARK-35985][SQL][3.1] push partitionFilters for empty readDataSchema
huaxingao Sep 19, 2021
43b00a5
[SPARK-36754][SQL] ArrayIntersect handle duplicated Double.NaN and Fl…
AngersZhuuuu Sep 20, 2021
6129ca4
[SPARK-36706][SQL][3.1] OverwriteByExpression conversion in DataSourc…
huaxingao Sep 20, 2021
401e641
[SPARK-36803][SQL] Fix ArrayType conversion when reading Parquet file…
sadikovi Sep 22, 2021
821dd6a
[SPARK-36753][SQL] ArrayExcept handle duplicated Double.NaN and Float…
AngersZhuuuu Sep 22, 2021
90a1cf9
[SPARK-36782][CORE] Avoid blocking dispatcher-BlockManagerMaster duri…
f-thiele Sep 23, 2021
b5eb1a4
[SPARK-36782][CORE][FOLLOW-UP] Only handle shuffle block in separate …
Ngone51 Sep 23, 2021
329d2ed
[SPARK-36792][SQL] InSet should handle NaN
AngersZhuuuu Sep 24, 2021
0498e2e
[SPARK-36874][SPARK-34634][SQL][3.1] ResolveReference.dedupRight shou…
sarutak Oct 7, 2021
74ce639
[SPARK-36717][CORE] Incorrect order of variable initialization may le…
Oct 8, 2021
650d951
[SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when…
JoshRosen Oct 14, 2021
640d88b
[SPARK-37098][SQL][3.1] Alter table properties should invalidate cache
ulysses-you Oct 26, 2021
fa3c90b
[SPARK-37203][SQL] Fix NotSerializableException when observe with Typ…
beliefer Nov 5, 2021
51ca7c0
[SPARK-37196][SQL] HiveDecimal enforcePrecisionScale failed return null
AngersZhuuuu Nov 8, 2021
86e8177
[SPARK-37388][SQL] Fix NPE in WidthBucket in WholeStageCodegenExec
tomvanbussel Nov 22, 2021
508d6f4
[SPARK-37452][SQL][3.1] Char and Varchar break backward compatibility…
yaooqinn Nov 30, 2021
04f31f4
[SPARK-37556][SQL] Deser void class fail with Java serialization
daijyc Dec 7, 2021
9157b18
[SPARK-37392][SQL] Fix the performance bug when inferring constraints…
cloud-fan Dec 8, 2021
a9e99bd
[SPARK-37451][3.1][SQL] Fix cast string type to decimal type if spark…
wangyum Dec 9, 2021
4c01c47
[SPARK-37060][CORE][3.1] Handle driver status response from backup ma…
testsgmr Dec 16, 2021
918c222
[SPARK-37654][SQL] Fix NPE in Row.getSeq when field is Null
huaxingao Dec 17, 2021
3f541d1
[SPARK-37784][SQL] Correctly handle UDTs in CodeGenerator.addBuffered…
JoshRosen Jan 4, 2022
d8dfbf7
[SPARK-35714][FOLLOW-UP][CORE] WorkerWatcher should run System.exit i…
Ngone51 Jan 5, 2022
dd437c0
[SPARK-37860][UI] Fix taskindex in the stage page task event timeline
Jan 11, 2022
4778af0
[SPARK-34555][SQL][FOLLOWUP] code format
fishcus Jan 12, 2022
5f31fe1
[SPARK-38075][SQL][3.1] Fix `hasNext` in `HiveScriptTransformationExe…
bersprockets Feb 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in apache#30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes apache#31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <[email protected]>
Co-authored-by: Min Shen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 6d88212)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
  • Loading branch information
2 people authored and fishcus committed Jan 12, 2022
commit 544a0353167a3b8601376da841ce3ac4cbf975e9
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -445,9 +444,9 @@ static class PushBlockStreamCallback implements StreamCallbackWithID {
private final AppShufflePartitionInfo partitionInfo;
private int length = 0;
// This indicates that this stream got the opportunity to write the blocks to the merged file.
// Once this is set to true and the stream encounters a failure then it will take necessary
// action to overwrite any partial written data. This is reset to false when the stream
// completes without any failures.
// Once this is set to true and the stream encounters a failure then it will unset the
// currentMapId of the partition so that another stream can start merging the blocks to the
// partition. This is reset to false when the stream completes.
private boolean isWriting = false;
// Use on-heap instead of direct ByteBuffer since these buffers will be GC'ed very quickly
private List<ByteBuffer> deferredBufs;
Expand Down Expand Up @@ -477,16 +476,11 @@ public String getID() {
*/
private void writeBuf(ByteBuffer buf) throws IOException {
while (buf.hasRemaining()) {
if (partitionInfo.isEncounteredFailure()) {
long updatedPos = partitionInfo.getDataFilePos() + length;
logger.debug(
"{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos);
length += partitionInfo.dataChannel.write(buf, updatedPos);
} else {
length += partitionInfo.dataChannel.write(buf);
}
long updatedPos = partitionInfo.getDataFilePos() + length;
logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos {}",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos);
length += partitionInfo.dataChannel.write(buf, updatedPos);
}
}

Expand Down Expand Up @@ -581,7 +575,6 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
}
// Check whether we can write to disk
if (allowedToWrite()) {
isWriting = true;
// Identify duplicate block generated by speculative tasks. We respond success to
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
Expand All @@ -598,6 +591,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {

// If we got here, it's safe to write the block data to the merged shuffle file. We
// first write any deferred block.
isWriting = true;
try {
if (deferredBufs != null && !deferredBufs.isEmpty()) {
writeDeferredBufs();
Expand All @@ -609,16 +603,6 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
// back to the client so the block could be retried.
throw ioe;
}
// If we got here, it means we successfully write the current chunk of block to merged
// shuffle file. If we encountered failure while writing the previous block, we should
// reset the file channel position and the status of partitionInfo to indicate that we
// have recovered from previous disk write failure. However, we do not update the
// position tracked by partitionInfo here. That is only updated while the entire block
// is successfully written to merged shuffle file.
if (partitionInfo.isEncounteredFailure()) {
partitionInfo.dataChannel.position(partitionInfo.getDataFilePos() + length);
partitionInfo.setEncounteredFailure(false);
}
} else {
logger.trace("{} shuffleId {} reduceId {} onData deferred",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
Expand All @@ -639,7 +623,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
// written to disk due to this reason. We thus decide to optimize for server
// throughput and memory usage.
if (deferredBufs == null) {
deferredBufs = new LinkedList<>();
deferredBufs = new ArrayList<>();
}
// Write the buffer to the in-memory deferred cache. Since buf is a slice of a larger
// byte buffer, we cache only the relevant bytes not the entire large buffer to save
Expand Down Expand Up @@ -670,7 +654,6 @@ public void onComplete(String streamId) throws IOException {
}
// Check if we can commit this block
if (allowedToWrite()) {
isWriting = true;
// Identify duplicate block generated by speculative tasks. We respond success to
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
Expand All @@ -681,6 +664,7 @@ public void onComplete(String streamId) throws IOException {
try {
if (deferredBufs != null && !deferredBufs.isEmpty()) {
abortIfNecessary();
isWriting = true;
writeDeferredBufs();
}
} catch (IOException ioe) {
Expand Down Expand Up @@ -738,14 +722,14 @@ public void onFailure(String streamId, Throwable throwable) throws IOException {
Map<Integer, AppShufflePartitionInfo> shufflePartitions =
mergeManager.partitions.get(partitionInfo.appShuffleId);
if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) {
logger.debug("{} shuffleId {} reduceId {} set encountered failure",
logger.debug("{} shuffleId {} reduceId {} encountered failure",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId);
partitionInfo.setCurrentMapIndex(-1);
partitionInfo.setEncounteredFailure(true);
}
}
}
isWriting = false;
}

@VisibleForTesting
Expand Down Expand Up @@ -802,8 +786,6 @@ public static class AppShufflePartitionInfo {
public FileChannel dataChannel;
// Location offset of the last successfully merged block for this shuffle partition
private long dataFilePos;
// Indicating whether failure was encountered when merging the previous block
private boolean encounteredFailure;
// Track the map index whose block is being merged for this shuffle partition
private int currentMapIndex;
// Bitmap tracking which mapper's blocks have been merged for this shuffle partition
Expand Down Expand Up @@ -836,7 +818,6 @@ public static class AppShufflePartitionInfo {
// Writing 0 offset so that we can reuse ShuffleIndexInformation.getIndex()
updateChunkInfo(0L, -1);
this.dataFilePos = 0;
this.encounteredFailure = false;
this.mapTracker = new RoaringBitmap();
this.chunkTracker = new RoaringBitmap();
}
Expand All @@ -851,14 +832,6 @@ public void setDataFilePos(long dataFilePos) {
this.dataFilePos = dataFilePos;
}

boolean isEncounteredFailure() {
return encounteredFailure;
}

void setEncounteredFailure(boolean encounteredFailure) {
this.encounteredFailure = encounteredFailure;
}

int getCurrentMapIndex() {
return currentMapIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -292,18 +293,32 @@ public void testTooLateArrival() throws IOException {
@Test
public void testIncompleteStreamsAreOverwritten() throws IOException {
registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
byte[] expectedBytes = new byte[4];
ThreadLocalRandom.current().nextBytes(expectedBytes);

StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
byte[] data = new byte[10];
ThreadLocalRandom.current().nextBytes(data);
stream1.onData(stream1.getID(), ByteBuffer.wrap(data));
// There is a failure
stream1.onFailure(stream1.getID(), new RuntimeException("forced error"));
StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
ByteBuffer nextBuf= ByteBuffer.wrap(expectedBytes, 0, 2);
stream2.onData(stream2.getID(), nextBuf);
stream2.onComplete(stream2.getID());
StreamCallbackWithID stream3 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0));
nextBuf = ByteBuffer.wrap(expectedBytes, 2, 2);
stream3.onData(stream3.getID(), nextBuf);
stream3.onComplete(stream3.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{5}, new int[][]{{1}});
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{1, 2}});
FileSegmentManagedBuffer mb =
(FileSegmentManagedBuffer) pushResolver.getMergedBlockData(TEST_APP, 0, 0, 0);
assertArrayEquals(expectedBytes, mb.nioByteBuffer().array());
}

@Test (expected = RuntimeException.class)
Expand Down Expand Up @@ -740,6 +755,72 @@ public void testFailureWhileTruncatingFiles() throws IOException {
validateChunks(TEST_APP, 0, 1, meta, new int[]{5, 3}, new int[][]{{0},{1}});
}

@Test
public void testOnFailureInvokedMoreThanOncePerBlock() throws IOException {
StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onFailure(stream1.getID(), new RuntimeException("forced error"));
StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
// On failure on stream1 gets invoked again and should cause no interference
stream1.onFailure(stream1.getID(), new RuntimeException("2nd forced error"));
StreamCallbackWithID stream3 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 3, 0, 0));
// This should be deferred as stream 2 is still the active stream
stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
// Stream 2 writes more and completes
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4]));
stream2.onComplete(stream2.getID());
stream3.onComplete(stream3.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {9, 2}, new int[][] {{1},{3}});
removeApplication(TEST_APP);
}

@Test (expected = RuntimeException.class)
public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws IOException {
StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
StreamCallbackWithID stream1Duplicate =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onComplete(stream1.getID());
stream1Duplicate.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));

StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
// Should not change the current map id of the reduce partition
stream1Duplicate.onFailure(stream2.getID(), new RuntimeException("forced error"));

StreamCallbackWithID stream3 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0));
// This should be deferred as stream 2 is still the active stream
stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
RuntimeException failedEx = null;
try {
stream3.onComplete(stream3.getID());
} catch (RuntimeException re) {
assertEquals(
"Couldn't find an opportunity to write block shufflePush_0_2_0 to merged shuffle",
re.getMessage());
failedEx = re;
}
// Stream 2 writes more and completes
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4]));
stream2.onComplete(stream2.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {11}, new int[][] {{0, 1}});
removeApplication(TEST_APP);
if (failedEx != null) {
throw failedEx;
}
}

private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
Expand Down