Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/pyspark/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ def test_with_different_versions_of_python(self):
finally:
self.sc.pythonVer = version

def test_reuse_worker(self):
def get_worker_pid(input_rdd):
return input_rdd.map(lambda x: os.getpid()).collect()
rdd = self.sc.parallelize(range(20), 20)
worker_pids = get_worker_pid(rdd)
pids = get_worker_pid(rdd)
for pid in pids:
self.assertTrue(pid in worker_pids)


if __name__ == "__main__":
import unittest
Expand Down
7 changes: 6 additions & 1 deletion python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,12 @@ def process():
pickleSer._write_with_length((aid, accum._value), outfile)

# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
res = read_int(infile)
if sys.version >= '3' and res == SpecialLengths.END_OF_DATA_SECTION:
# skip the END_OF_DATA_SECTION for Python3, otherwise the worker reuse will take
# no effect, see SPARK-26549 for more details.
res = read_int(infile)
if res == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
Expand Down