Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
// each python worker gets an equal part of the allocation. the worker pool will grow to the
// number of concurrent tasks, which is determined by the number of cores in this executor.
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
private val memoryMb = if (Utils.isWindows) {
// Windows currently does not have 'resource' Python module that is required in worker.py
None
} else {
conf.get(PYSPARK_EXECUTOR_MEMORY)
.map(_ / conf.getInt("spark.executor.cores", 1))
}

// All the Python functions should have the same exec, version and envvars.
protected val envVars = funcs.head.funcs.head.envVars
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ of the most common options to set are:
limited to this amount. If not set, Spark will not limit Python's memory use
and it is up to the application to avoid exceeding the overhead memory space
shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory
is added to executor resource requests.
is added to executor resource requests. This configuration is not supported on Windows.
</td>
</tr>
<tr>
Expand Down
21 changes: 14 additions & 7 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import os
import sys
import time
import resource
# 'resource' is a Unix specific package.
has_resource_package = True
try:
import resource
except ImportError:
has_resource_package = False
import socket
import traceback

Expand Down Expand Up @@ -268,9 +273,11 @@ def main(infile, outfile):

# set up memory limits
memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1"))
total_memory = resource.RLIMIT_AS
try:
if memory_limit_mb > 0:
# 'PYSPARK_EXECUTOR_MEMORY_MB' should be undefined on Windows because it depends on
# resource package which is a Unix specific package.
if memory_limit_mb > 0 and has_resource_package:
total_memory = resource.RLIMIT_AS
try:
(soft_limit, hard_limit) = resource.getrlimit(total_memory)
msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit)
print(msg, file=sys.stderr)
Expand All @@ -283,9 +290,9 @@ def main(infile, outfile):
print(msg, file=sys.stderr)
resource.setrlimit(total_memory, (new_limit, new_limit))

except (resource.error, OSError, ValueError) as e:
# not all systems support resource limits, so warn instead of failing
print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr)
except (resource.error, OSError, ValueError) as e:
# not all systems support resource limits, so warn instead of failing
print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr)

# initialize global state
taskContext = None
Expand Down