Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f552d49
Catch non-zero exit from pipe commands
megatron-me-uk May 19, 2015
5745d85
Remove space to fix style
megatron-me-uk Jun 3, 2015
45f4977
fix line too long style error
megatron-me-uk Jun 4, 2015
0974f98
add space between words in multiline string
megatron-me-uk Jun 4, 2015
1b3dc4e
fix missing space around operator style
megatron-me-uk Jun 4, 2015
8db4073
Add a test for rdd pipe functions
megatron-me-uk Jun 15, 2015
cc1a73d
fix style issues in pipe test
megatron-me-uk Jun 15, 2015
3ab8c7a
remove whitespace for style
megatron-me-uk Jun 18, 2015
3344a21
wrap assertRaises with QuietTest
megatron-me-uk Jun 18, 2015
491d3fc
Pass a function handle to assertRaises
megatron-me-uk Jun 18, 2015
4153b02
fix list.sort returns None
megatron-me-uk Jun 18, 2015
8ed89a6
Chain generators to prevent potential deadlock
megatron-me-uk Jun 19, 2015
0486ae3
style fixes
megatron-me-uk Jun 19, 2015
8a9ef9c
make check_return_code an iterator
megatron-me-uk Jun 24, 2015
a0c0161
fix generator issue
megatron-me-uk Jun 24, 2015
34fcdc3
add optional argument 'mode' for rdd.pipe
megatron-me-uk Jun 30, 2015
a307d13
update rdd tests to test pipe modes
megatron-me-uk Jun 30, 2015
b0ac3a4
Merge pull request #1 from megatron-me-uk/megatron-me-uk-patch-1
megatron-me-uk Jun 30, 2015
eb4801c
fix fail_condition
megatron-me-uk Jun 30, 2015
ab9a2e1
Update rdd pipe tests for checkCode
megatron-me-uk Jul 7, 2015
0c1e762
Update rdd pipe method for checkCode
megatron-me-uk Jul 7, 2015
574b564
Merge pull request #2 from megatron-me-uk/patch-4
megatron-me-uk Jul 7, 2015
98fa101
fix blank line style error
megatron-me-uk Jul 7, 2015
04ae1d5
Remove spurious empty line
megatron-me-uk Jul 8, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Chain generators to prevent potential deadlock
  • Loading branch information
megatron-me-uk committed Jun 19, 2015
commit 8ed89a672f5d701ac918ada3f76cad1d3bb5e757
15 changes: 9 additions & 6 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,12 +704,15 @@ def pipe_objs(out):
out.write(s.encode('utf-8'))
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
pipe.wait()
if pipe.returncode:
raise Exception("Pipe function `%s' exited "
"with error code %d" % (command, pipe.returncode))
return result
def check_return_code():
pipe.wait()
if pipe.returncode:
raise Exception("Pipe function `%s' exited "
"with error code %d" % (command, pipe.returncode))
else:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make check_return_code a generator, for example:

def check_return_code():
      # check return code
      for x in range(0):
         yield x

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point I will change it.

return (x.rstrip(b'\n').decode('utf-8') for x in
chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None)))
return self.mapPartitions(func)

def foreach(self, f):
Expand Down