From 435ccfff44995ca5ad487e77128b2cae4ff1cfd5 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Tue, 22 May 2018 11:09:44 -0400 Subject: [PATCH 1/3] Remove the Arrow allocation cleanup routine in TaskCompletionListen to avoid race condition --- .../execution/python/ArrowPythonRunner.scala | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 5fcdcddca7d5..7c7e1f7ddeb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -70,19 +70,13 @@ class ArrowPythonRunner( val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"stdout writer for $pythonExec", 0, Long.MaxValue) - val root = VectorSchemaRoot.create(arrowSchema, allocator) - val arrowWriter = ArrowWriter.create(root) - - context.addTaskCompletionListener { _ => - root.close() - allocator.close() - } - - val writer = new ArrowStreamWriter(root, null, dataOut) - writer.start() Utils.tryWithSafeFinally { + val arrowWriter = ArrowWriter.create(root) + val writer = new ArrowStreamWriter(root, null, dataOut) + writer.start() + while (inputIterator.hasNext) { val nextBatch = inputIterator.next() @@ -94,8 +88,18 @@ class ArrowPythonRunner( writer.writeBatch() arrowWriter.reset() } - } { writer.end() + } { + // If we close root and allocator in TaskCompletionListener, there could be a race + // condition where the writer thread keeps writing to the VectorSchemaRoot while + // it's being closed by the TaskCompletion listener. + // Closing root and allocator here is cleaner because root and allocator is owned + // by the writer thread and is only visible to the writer thread. + // + // If the writer thread is interrupted by TaskCompletionListener, it should either + // (1) in the try block, in which case it will get an InterruptedException when + // performing io, and goes into the finally block or (2) in the finally block, + // in which case it will ignore the interruption and close the resources. root.close() allocator.close() } From 69c91043981056a732328b474986fe4128936c62 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Tue, 22 May 2018 15:21:56 -0400 Subject: [PATCH 2/3] Add unit test --- python/pyspark/sql/tests.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a1b6db71782b..196889d720f7 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -4931,6 +4931,30 @@ def foo3(key, pdf): expected4 = udf3.func((), pdf) self.assertPandasEqual(expected4, result4) + # Regression test for SPARK-24334 + def test_memory_leak(self): + from pyspark.sql.functions import pandas_udf, col, PandasUDFType, array, lit, explode + + # Have all data in a single executor thread so it can trigger the race condition easier + with self.sql_conf({'spark.sql.shuffle.partitions': 1}): + df = self.spark.range(0, 1000) + df = df.withColumn('id', array([lit(i) for i in range(0, 300)])) \ + .withColumn('id', explode(col('id'))) \ + .withColumn('v', array([lit(i) for i in range(0, 1000)])) + + @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP) + def foo(pdf): + # Throw exception in the UDF + xxx + return pdf + + result = df.groupby('id').apply(foo) + + with QuietTest(self.sc): + with self.assertRaises(py4j.protocol.Py4JJavaError) as context: + result.count() + self.assertTrue('Memory leaked' not in str(context.exception)) + @unittest.skipIf( not _have_pandas or not _have_pyarrow, From 756a73aea843e8d5d90994d127c0d9d4c357c67b Mon Sep 17 00:00:00 2001 From: Li Jin Date: Sun, 27 May 2018 09:19:46 -0400 Subject: [PATCH 3/3] Remove tests. Add comments --- python/pyspark/sql/tests.py | 24 ------------------- .../execution/python/ArrowPythonRunner.scala | 3 +++ 2 files changed, 3 insertions(+), 24 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 196889d720f7..a1b6db71782b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -4931,30 +4931,6 @@ def foo3(key, pdf): expected4 = udf3.func((), pdf) self.assertPandasEqual(expected4, result4) - # Regression test for SPARK-24334 - def test_memory_leak(self): - from pyspark.sql.functions import pandas_udf, col, PandasUDFType, array, lit, explode - - # Have all data in a single executor thread so it can trigger the race condition easier - with self.sql_conf({'spark.sql.shuffle.partitions': 1}): - df = self.spark.range(0, 1000) - df = df.withColumn('id', array([lit(i) for i in range(0, 300)])) \ - .withColumn('id', explode(col('id'))) \ - .withColumn('v', array([lit(i) for i in range(0, 1000)])) - - @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP) - def foo(pdf): - # Throw exception in the UDF - xxx - return pdf - - result = df.groupby('id').apply(foo) - - with QuietTest(self.sc): - with self.assertRaises(py4j.protocol.Py4JJavaError) as context: - result.count() - self.assertTrue('Memory leaked' not in str(context.exception)) - @unittest.skipIf( not _have_pandas or not _have_pyarrow, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 7c7e1f7ddeb9..01e19bddbfb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -88,6 +88,9 @@ class ArrowPythonRunner( writer.writeBatch() arrowWriter.reset() } + // end writes footer to the output stream and doesn't clean any resources. + // It could throw exception if the output stream is closed, so it should be + // in the try block. writer.end() } { // If we close root and allocator in TaskCompletionListener, there could be a race