Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-25004: Fix problems from review.
  • Loading branch information
rdblue committed Aug 23, 2018
commit 5288f5b1c5b5845f5bd6a2d62082176e3d4b0bda
17 changes: 9 additions & 8 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,18 @@ def main(infile, outfile):
memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1"))
total_memory = resource.RLIMIT_AS
try:
(total_memory_limit, max_total_memory) = resource.getrlimit(total_memory)
msg = "Current mem: {0} of max {1}\n".format(total_memory_limit, max_total_memory)
print(msg, file=sys.stderr)
if memory_limit_mb > 0:
(soft_limit, hard_limit) = resource.getrlimit(total_memory)
msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit)
print(msg, file=sys.stderr)

if memory_limit_mb > 0 and total_memory_limit == resource.RLIM_INFINITY:
# convert to bytes
total_memory_limit = memory_limit_mb * 1024 * 1024
new_limit = memory_limit_mb * 1024 * 1024

msg = "Setting mem to {0} of max {1}\n".format(total_memory_limit, max_total_memory)
print(msg, file=sys.stderr)
resource.setrlimit(total_memory, (total_memory_limit, total_memory_limit))
if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit:
msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit)
print(msg, file=sys.stderr)
resource.setrlimit(total_memory, (new_limit, new_limit))

except (resource.error, OSError, ValueError) as e:
# not all systems support resource limits, so warn instead of failing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ case class AggregateInPandasExec(
aggInputSchema,
sessionLocalTimeZone,
pythonRunnerConf,
sparkContext.conf).compute(projectedRowIter, context.partitionId(), context)
SparkEnv.get.conf).compute(projectedRowIter, context.partitionId(), context)

val joinedAttributes =
groupingExpressions.map(_.toAttribute) ++ udfExpressions.map(_.resultAttribute)
Expand Down