Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4931,6 +4931,30 @@ def foo3(key, pdf):
expected4 = udf3.func((), pdf)
self.assertPandasEqual(expected4, result4)

# Regression test for SPARK-24334
def test_memory_leak(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think it's good enough to have it in PR description. Let's just put this in the PR description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM! Moved to PR description.

from pyspark.sql.functions import pandas_udf, col, PandasUDFType, array, lit, explode

# Have all data in a single executor thread so it can trigger the race condition easier
with self.sql_conf({'spark.sql.shuffle.partitions': 1}):
df = self.spark.range(0, 1000)
df = df.withColumn('id', array([lit(i) for i in range(0, 300)])) \
.withColumn('id', explode(col('id'))) \
.withColumn('v', array([lit(i) for i in range(0, 1000)]))

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def foo(pdf):
# Throw exception in the UDF
xxx
return pdf

result = df.groupby('id').apply(foo)

with QuietTest(self.sc):
with self.assertRaises(py4j.protocol.Py4JJavaError) as context:
result.count()
self.assertTrue('Memory leaked' not in str(context.exception))


@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,13 @@ class ArrowPythonRunner(
val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
s"stdout writer for $pythonExec", 0, Long.MaxValue)

val root = VectorSchemaRoot.create(arrowSchema, allocator)
val arrowWriter = ArrowWriter.create(root)

context.addTaskCompletionListener { _ =>
root.close()
allocator.close()
}

val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()

Utils.tryWithSafeFinally {
val arrowWriter = ArrowWriter.create(root)
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()

while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()

Expand All @@ -94,8 +88,18 @@ class ArrowPythonRunner(
writer.writeBatch()
arrowWriter.reset()
}
} {
writer.end()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved out of the finally block because this function will throw exception if the thread is interrupted. writer.end() also doesn't do any clean up - it just writes some final bits to the output channel so we don't need to call it in clean up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically your change makes sure that root and allocator is safely closed in the final block and hence we don't need the safe belt in TaskCompletionListener?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The routine in TaskCompletionListener is not really a safe belt, it actually causes the race condition. But you are right, I believe putting the cleanup routine in finally block is enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment to explain the same - that this should be outside of the finally?

} {
// If we close root and allocator in TaskCompletionListener, there could be a race
// condition where the writer thread keeps writing to the VectorSchemaRoot while
// it's being closed by the TaskCompletion listener.
// Closing root and allocator here is cleaner because root and allocator is owned
// by the writer thread and is only visible to the writer thread.
//
// If the writer thread is interrupted by TaskCompletionListener, it should either
// (1) in the try block, in which case it will get an InterruptedException when
// performing io, and goes into the finally block or (2) in the finally block,
// in which case it will ignore the interruption and close the resources.
root.close()
allocator.close()
}
Expand Down