Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Nov 30, 2021

What changes were proposed in this pull request?

This PR aims to improve Fallback Storage upload speed by randomizing the path in order to avoid S3 rate limiter.

Why are the changes needed?

Currently, Fallback Storage is using a single prefix per shuffle. This PR aims to randomize the upload prefixes even in a single shuffle to avoid S3 rate limiter.

Does this PR introduce any user-facing change?

No. This is used internally during the runtime.

How was this patch tested?

Pass the CIs to verify read and write operations. To check the layout, check the uploaded path manually with the following configs.

spark.decommission.enabled                          true
spark.storage.decommission.enabled                  true
spark.storage.decommission.shuffleBlocks.enabled    true
spark.storage.decommission.fallbackStorage.path     file:///tmp/fallback/

Start one master and worker. Connect with spark-shell and generate shuffle data.

scala> sc.parallelize(1 to 11, 10).map(x => (x % 3, 1)).reduceByKey(_ + _).count()
res0: Long = 3

Invoke decommission and check. Since we have only one worker, the shuffle data go to the fallback storage directly.

$ kill -PWR <CoarseGrainedExecutorBackend JVM PID>
$ tree /tmp/fallback
/tmp/fallback
└── app-20211130135922-0001
    └── 0
        ├── 103417883
        │   └── shuffle_0_7_0.data
        ├── 1036881592
        │   └── shuffle_0_4_0.data
        ├── 1094002679
        │   └── shuffle_0_7_0.index
        ├── 1393510154
        │   └── shuffle_0_6_0.index
        ├── 1515275369
        │   └── shuffle_0_3_0.data
        ├── 1541340402
        │   └── shuffle_0_2_0.index
        ├── 1639392452
        │   └── shuffle_0_8_0.data
        ├── 1774061049
        │   └── shuffle_0_9_0.index
        ├── 1846228218
        │   └── shuffle_0_6_0.data
        ├── 1970345301
        │   └── shuffle_0_1_0.data
        ├── 2073568524
        │   └── shuffle_0_4_0.index
        ├── 227534966
        │   └── shuffle_0_2_0.data
        ├── 266114061
        │   └── shuffle_0_3_0.index
        ├── 413944309
        │   └── shuffle_0_5_0.index
        ├── 581811660
        │   └── shuffle_0_0_0.data
        ├── 705928743
        │   └── shuffle_0_5_0.data
        ├── 713451784
        │   └── shuffle_0_8_0.index
        ├── 861282032
        │   └── shuffle_0_0_0.index
        ├── 912764509
        │   └── shuffle_0_9_0.data
        └── 946172431
            └── shuffle_0_1_0.index

@github-actions github-actions bot added the CORE label Nov 30, 2021
@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50249/

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50249/

@dongjoon-hyun
Copy link
Member Author

Could you review this please, @viirya ?

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Test build #145776 has finished for PR 34762 at commit 7cdcb44.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Nov 30, 2021

lgtm

@dongjoon-hyun
Copy link
Member Author

Thank you so much, @viirya !

@dongjoon-hyun dongjoon-hyun deleted the SPARK-37509 branch November 30, 2021 23:03
@steveloughran
Copy link
Contributor

Only just seen this.
How much throttling what you actually seeing, and, assuming the s3a client, have you set directory marker retention to keep? It is often the actual attempted delete of dir markers which trigger the problem ... Effectively it is a form of write amplification.

I'm actually thinking that feature to go into s3a this year should be configurable rate limiting through the guava RateLimiter; I'm using this in the abfs committer to keep committer io below limits where throttling starts to cause problems with renames. This is all per process; things like random filenames are still going to be critical to spread load on s3.

@dongjoon-hyun
Copy link
Member Author

I didn't set any configurations. When a customer does not make a special request, the pre-defined throttling is applied by default (3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per). In the worst notebook scenario, 3500 executors may start to decommission at the same time and it causes throttling.

@steveloughran
Copy link
Contributor

OK. you should try enabling directory marker retention everywhere and see what that does to your work. The auditing stuff in 3.3.2 will help you identify which jobs and operations are generating the IO.

@dongjoon-hyun
Copy link
Member Author

They are orthogonal dimension from this PR, aren't they?
This PR will give us benefit on top of whatever Apache Hadoop delivers, @steveloughran .

@steveloughran
Copy link
Contributor

yeah, you should do both. this is best as it will spread across all shards a bucket has

kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
… S3 rate limiter (apache#1379)

### What changes were proposed in this pull request?

This PR aims to improve `Fallback Storage` upload speed by randomizing the path in order to avoid S3 rate limiter.

### Why are the changes needed?

Currently, `Fallback Storage` is using `a single prefix per shuffle`. This PR aims to randomize the upload prefixes even in a single shuffle to avoid S3 rate limiter.
- https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/

### Does this PR introduce _any_ user-facing change?

No. This is used internally during the runtime.

### How was this patch tested?

Pass the CIs to verify read and write operations. To check the layout, check the uploaded path manually with the following configs.

```
spark.decommission.enabled                          true
spark.storage.decommission.enabled                  true
spark.storage.decommission.shuffleBlocks.enabled    true
spark.storage.decommission.fallbackStorage.path     file:///tmp/fallback/
```

Start one master and worker. Connect with `spark-shell` and generate shuffle data.
```
scala> sc.parallelize(1 to 11, 10).map(x => (x % 3, 1)).reduceByKey(_ + _).count()
res0: Long = 3
```

Invoke decommission and check. Since we have only one worker, the shuffle data go to the fallback storage directly.
```
$ kill -PWR <CoarseGrainedExecutorBackend JVM PID>
$ tree /tmp/fallback
/tmp/fallback
└── app-20211130135922-0001
    └── 0
        ├── 103417883
        │   └── shuffle_0_7_0.data
        ├── 1036881592
        │   └── shuffle_0_4_0.data
        ├── 1094002679
        │   └── shuffle_0_7_0.index
        ├── 1393510154
        │   └── shuffle_0_6_0.index
        ├── 1515275369
        │   └── shuffle_0_3_0.data
        ├── 1541340402
        │   └── shuffle_0_2_0.index
        ├── 1639392452
        │   └── shuffle_0_8_0.data
        ├── 1774061049
        │   └── shuffle_0_9_0.index
        ├── 1846228218
        │   └── shuffle_0_6_0.data
        ├── 1970345301
        │   └── shuffle_0_1_0.data
        ├── 2073568524
        │   └── shuffle_0_4_0.index
        ├── 227534966
        │   └── shuffle_0_2_0.data
        ├── 266114061
        │   └── shuffle_0_3_0.index
        ├── 413944309
        │   └── shuffle_0_5_0.index
        ├── 581811660
        │   └── shuffle_0_0_0.data
        ├── 705928743
        │   └── shuffle_0_5_0.data
        ├── 713451784
        │   └── shuffle_0_8_0.index
        ├── 861282032
        │   └── shuffle_0_0_0.index
        ├── 912764509
        │   └── shuffle_0_9_0.data
        └── 946172431
            └── shuffle_0_1_0.index
```

Closes apache#34762 from dongjoon-hyun/SPARK-37509.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit ca25534)
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit c88b258)
Signed-off-by: Dongjoon Hyun <[email protected]>

Co-authored-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants