From f552d49127d9e43799d5728f52682a1609fdedb8 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Tue, 19 May 2015 12:01:30 +0100 Subject: [PATCH 01/22] Catch non-zero exit from pipe commands This will allow problems with piped commands to be detected. This will also allow tasks to be retried where errors are rare (such as network problems in piped commands). --- python/pyspark/rdd.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 70db4bbe4cbc..f83945a105ef 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -704,7 +704,11 @@ def pipe_objs(out): out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() - return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) + result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) + pipe.wait() + if pipe.returncode: + raise Exception("Pipe function `%s' exited with error code %d" %(command, pipe.returncode) ) + return result return self.mapPartitions(func) def foreach(self, f): From 5745d85db436238320055224a9754419c4fa5951 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Wed, 3 Jun 2015 15:38:59 +0100 Subject: [PATCH 02/22] Remove space to fix style --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f83945a105ef..5ff11de71112 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -707,7 +707,7 @@ def pipe_objs(out): result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) pipe.wait() if pipe.returncode: - raise Exception("Pipe function `%s' exited with error code %d" %(command, pipe.returncode) ) + raise Exception("Pipe function `%s' exited with error code %d" %(command, pipe.returncode)) return result return self.mapPartitions(func) From 45f4977fc8bf1971abc85b264a0ff4bae794abce Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Thu, 4 Jun 2015 09:53:16 +0100 Subject: [PATCH 03/22] fix line too long style error --- python/pyspark/rdd.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 5ff11de71112..2b38289a1356 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -707,7 +707,8 @@ def pipe_objs(out): result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) pipe.wait() if pipe.returncode: - raise Exception("Pipe function `%s' exited with error code %d" %(command, pipe.returncode)) + raise Exception("Pipe function `%s' exited" + "with error code %d" %(command, pipe.returncode)) return result return self.mapPartitions(func) From 0974f98b3f3c2fb28a67c9a93d2e60dd59cfe54a Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Thu, 4 Jun 2015 09:54:21 +0100 Subject: [PATCH 04/22] add space between words in multiline string --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 2b38289a1356..a86fae6d2edb 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -707,7 +707,7 @@ def pipe_objs(out): result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) pipe.wait() if pipe.returncode: - raise Exception("Pipe function `%s' exited" + raise Exception("Pipe function `%s' exited " "with error code %d" %(command, pipe.returncode)) return result return self.mapPartitions(func) From 1b3dc4eaa33b3126403a7d905b5cf31676837cf6 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Thu, 4 Jun 2015 10:16:40 +0100 Subject: [PATCH 05/22] fix missing space around operator style This is an error in PEP8 but not in pylint. --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a86fae6d2edb..da9d11f689ae 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -708,7 +708,7 @@ def pipe_objs(out): pipe.wait() if pipe.returncode: raise Exception("Pipe function `%s' exited " - "with error code %d" %(command, pipe.returncode)) + "with error code %d" % (command, pipe.returncode)) return result return self.mapPartitions(func) From 8db4073df2fc1f9d98771d273f5e38542aa8f31a Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Mon, 15 Jun 2015 16:47:19 +0100 Subject: [PATCH 06/22] Add a test for rdd pipe functions --- python/pyspark/tests.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index d8e319994cc9..e38872a9f9b4 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -873,6 +873,13 @@ def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): sizes = sort.glom().map(len).collect() for size in sizes: self.assertGreater(size, 0) + + def test_pipe_functions(self): + data = ['1','2','3'] + rdd = self.sc.parallelize(data) + self.assertRaises(Exception, rdd.pipe('cc').collect()) + result = rdd.pipe('cat').collect().sort() + [self.assertEqual(x, y) for x, y in zip(data, result)] class ProfilerTests(PySparkTestCase): From cc1a73daf24f579b2314b1d52099cae27995520e Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Mon, 15 Jun 2015 17:07:20 +0100 Subject: [PATCH 07/22] fix style issues in pipe test --- python/pyspark/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index e38872a9f9b4..0046e53a5fc3 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -875,7 +875,7 @@ def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): self.assertGreater(size, 0) def test_pipe_functions(self): - data = ['1','2','3'] + data = ['1', '2', '3'] rdd = self.sc.parallelize(data) self.assertRaises(Exception, rdd.pipe('cc').collect()) result = rdd.pipe('cat').collect().sort() From 3ab8c7a4e666fc0b9d60b1462e8f233b94ce783e Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Thu, 18 Jun 2015 12:08:54 +0100 Subject: [PATCH 08/22] remove whitespace for style --- python/pyspark/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 0046e53a5fc3..aba5606ab71b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -873,7 +873,7 @@ def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): sizes = sort.glom().map(len).collect() for size in sizes: self.assertGreater(size, 0) - + def test_pipe_functions(self): data = ['1', '2', '3'] rdd = self.sc.parallelize(data) From 3344a2171eeb54c07e9b8af036e327e4e4de143f Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Thu, 18 Jun 2015 15:55:24 +0100 Subject: [PATCH 09/22] wrap assertRaises with QuietTest --- python/pyspark/tests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index aba5606ab71b..9e5095734a07 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -877,7 +877,8 @@ def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): def test_pipe_functions(self): data = ['1', '2', '3'] rdd = self.sc.parallelize(data) - self.assertRaises(Exception, rdd.pipe('cc').collect()) + with QuietTest(self.sc): + self.assertRaises(Exception, rdd.pipe('cc').collect()) result = rdd.pipe('cat').collect().sort() [self.assertEqual(x, y) for x, y in zip(data, result)] From 491d3fcc4b15ffa5c18b74896107afc8a27f20c8 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Thu, 18 Jun 2015 17:28:55 +0100 Subject: [PATCH 10/22] Pass a function handle to assertRaises Also be more specific about the Exception we expect to see --- python/pyspark/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 9e5095734a07..4c067f970ec7 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -878,7 +878,7 @@ def test_pipe_functions(self): data = ['1', '2', '3'] rdd = self.sc.parallelize(data) with QuietTest(self.sc): - self.assertRaises(Exception, rdd.pipe('cc').collect()) + self.assertRaises(Py4JJavaError, rdd.pipe('cc').collect) result = rdd.pipe('cat').collect().sort() [self.assertEqual(x, y) for x, y in zip(data, result)] From 4153b02fdb0ce1c691ec6893c6ad70e990a4e02b Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Thu, 18 Jun 2015 18:45:21 +0100 Subject: [PATCH 11/22] fix list.sort returns None --- python/pyspark/tests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 4c067f970ec7..ca0fca297286 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -879,7 +879,8 @@ def test_pipe_functions(self): rdd = self.sc.parallelize(data) with QuietTest(self.sc): self.assertRaises(Py4JJavaError, rdd.pipe('cc').collect) - result = rdd.pipe('cat').collect().sort() + result = rdd.pipe('cat').collect() + result.sort() [self.assertEqual(x, y) for x, y in zip(data, result)] From 8ed89a672f5d701ac918ada3f76cad1d3bb5e757 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Fri, 19 Jun 2015 09:41:33 +0100 Subject: [PATCH 12/22] Chain generators to prevent potential deadlock --- python/pyspark/rdd.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index da9d11f689ae..45575f14fa79 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -704,12 +704,15 @@ def pipe_objs(out): out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() - result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) - pipe.wait() - if pipe.returncode: - raise Exception("Pipe function `%s' exited " - "with error code %d" % (command, pipe.returncode)) - return result + def check_return_code(): + pipe.wait() + if pipe.returncode: + raise Exception("Pipe function `%s' exited " + "with error code %d" % (command, pipe.returncode)) + else: + return None + return (x.rstrip(b'\n').decode('utf-8') for x in + chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None))) return self.mapPartitions(func) def foreach(self, f): From 0486ae33a5b7117c471312b35521d32ca8c9b1aa Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Fri, 19 Jun 2015 10:27:43 +0100 Subject: [PATCH 13/22] style fixes --- python/pyspark/rdd.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 45575f14fa79..3093de75a8fe 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -704,15 +704,16 @@ def pipe_objs(out): out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() + def check_return_code(): pipe.wait() if pipe.returncode: raise Exception("Pipe function `%s' exited " - "with error code %d" % (command, pipe.returncode)) + "with error code %d" % (command, pipe.returncode)) else: return None return (x.rstrip(b'\n').decode('utf-8') for x in - chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None))) + chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None))) return self.mapPartitions(func) def foreach(self, f): From 8a9ef9ce0127028ebcb9ef96f1281a0197b39998 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Wed, 24 Jun 2015 13:34:34 +0100 Subject: [PATCH 14/22] make check_return_code an iterator --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3093de75a8fe..33eb1c865c34 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -711,7 +711,7 @@ def check_return_code(): raise Exception("Pipe function `%s' exited " "with error code %d" % (command, pipe.returncode)) else: - return None + yield None return (x.rstrip(b'\n').decode('utf-8') for x in chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None))) return self.mapPartitions(func) From a0c0161e76aa8916ff6342dd87ecbebdb8ff6fb8 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Wed, 24 Jun 2015 16:05:36 +0100 Subject: [PATCH 15/22] fix generator issue --- python/pyspark/rdd.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 33eb1c865c34..95449eb43e58 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -711,9 +711,10 @@ def check_return_code(): raise Exception("Pipe function `%s' exited " "with error code %d" % (command, pipe.returncode)) else: - yield None + for i in range(0): + yield i return (x.rstrip(b'\n').decode('utf-8') for x in - chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None))) + chain(iter(pipe.stdout.readline, b''), check_return_code())) return self.mapPartitions(func) def foreach(self, f): From 34fcdc3430a3483dc4fa1a3b40acbb4c5b5c3e69 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Tue, 30 Jun 2015 23:46:20 +0100 Subject: [PATCH 16/22] add optional argument 'mode' for rdd.pipe permissive - do not check returncode strict - only allow returncode 0 grep - allow returncode 0 or 1 --- python/pyspark/rdd.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 95449eb43e58..10c9950051bb 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -687,13 +687,25 @@ def groupBy(self, f, numPartitions=None): return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) @ignore_unicode_prefix - def pipe(self, command, env={}): + def pipe(self, command, env={}, mode='permissive'): """ Return an RDD created by piping elements to a forked external process. >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() [u'1', u'2', u'', u'3'] """ + if mode == 'permissive': + def fail_condition(x): + return False + elif mode == 'strict': + def fail_condition(x): + return x == 0 + elif mode == 'grep': + def fail_condition(x): + return x == 0 or x == 1 + else: + raise ValueError("mode must be one of 'permissive', 'strict' or 'grep'.") + def func(iterator): pipe = Popen( shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) @@ -707,7 +719,7 @@ def pipe_objs(out): def check_return_code(): pipe.wait() - if pipe.returncode: + if fail_condition(pipe.returncode): raise Exception("Pipe function `%s' exited " "with error code %d" % (command, pipe.returncode)) else: From a307d13f69ad615c79a0f1efdf5377707828a5f8 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Tue, 30 Jun 2015 23:54:09 +0100 Subject: [PATCH 17/22] update rdd tests to test pipe modes --- python/pyspark/tests.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ca0fca297286..42a14bf6dd29 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -878,10 +878,14 @@ def test_pipe_functions(self): data = ['1', '2', '3'] rdd = self.sc.parallelize(data) with QuietTest(self.sc): - self.assertRaises(Py4JJavaError, rdd.pipe('cc').collect) + self.assertEqual([], rdd.pipe('cc').collect()) + self.assertRaises(Py4JJavaError, rdd.pipe('cc', mode='strict').collect) result = rdd.pipe('cat').collect() result.sort() [self.assertEqual(x, y) for x, y in zip(data, result)] + self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', mode='strict').collect) + self.assertEqual([], rdd.pipe('grep 4').collect()) + self.assertEqual([], rdd.pipe('grep 4', mode='grep').collect()) class ProfilerTests(PySparkTestCase): From eb4801c78e143698bc2c3ebaecce73f455786a1c Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Wed, 1 Jul 2015 00:33:49 +0100 Subject: [PATCH 18/22] fix fail_condition return True when an exception should be raised --- python/pyspark/rdd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 10c9950051bb..a56f061bef62 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -699,10 +699,10 @@ def fail_condition(x): return False elif mode == 'strict': def fail_condition(x): - return x == 0 + return x != 0 elif mode == 'grep': def fail_condition(x): - return x == 0 or x == 1 + return x != 0 and x != 1 else: raise ValueError("mode must be one of 'permissive', 'strict' or 'grep'.") From ab9a2e1db49071bf2d636960a65e93b589d681e0 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Tue, 7 Jul 2015 09:41:30 +0100 Subject: [PATCH 19/22] Update rdd pipe tests for checkCode (rather than mode) --- python/pyspark/tests.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 42a14bf6dd29..46368c20d44b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -879,13 +879,12 @@ def test_pipe_functions(self): rdd = self.sc.parallelize(data) with QuietTest(self.sc): self.assertEqual([], rdd.pipe('cc').collect()) - self.assertRaises(Py4JJavaError, rdd.pipe('cc', mode='strict').collect) + self.assertRaises(Py4JJavaError, rdd.pipe('cc', checkCode=True).collect) result = rdd.pipe('cat').collect() result.sort() [self.assertEqual(x, y) for x, y in zip(data, result)] - self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', mode='strict').collect) + self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect) self.assertEqual([], rdd.pipe('grep 4').collect()) - self.assertEqual([], rdd.pipe('grep 4', mode='grep').collect()) class ProfilerTests(PySparkTestCase): From 0c1e762bced68f0dfb8b3f7d386d1264fadfc5f4 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Tue, 7 Jul 2015 09:46:10 +0100 Subject: [PATCH 20/22] Update rdd pipe method for checkCode use boolean checkCode rather than more complicated mode optional argument. Also add param to docstring --- python/pyspark/rdd.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a56f061bef62..ce4cf3d5142b 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -687,24 +687,15 @@ def groupBy(self, f, numPartitions=None): return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) @ignore_unicode_prefix - def pipe(self, command, env={}, mode='permissive'): + def pipe(self, command, env={}, checkCode=False): """ Return an RDD created by piping elements to a forked external process. >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() [u'1', u'2', u'', u'3'] + + :param checkCode: whether or not to check the return value of the shell command. """ - if mode == 'permissive': - def fail_condition(x): - return False - elif mode == 'strict': - def fail_condition(x): - return x != 0 - elif mode == 'grep': - def fail_condition(x): - return x != 0 and x != 1 - else: - raise ValueError("mode must be one of 'permissive', 'strict' or 'grep'.") def func(iterator): pipe = Popen( @@ -719,7 +710,7 @@ def pipe_objs(out): def check_return_code(): pipe.wait() - if fail_condition(pipe.returncode): + if checkCode and pipe.returncode: raise Exception("Pipe function `%s' exited " "with error code %d" % (command, pipe.returncode)) else: From 98fa1016bf474ba11487a494fb21dcb542d806b4 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Tue, 7 Jul 2015 09:53:08 +0100 Subject: [PATCH 21/22] fix blank line style error --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ce4cf3d5142b..328e1d18d451 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -693,7 +693,7 @@ def pipe(self, command, env={}, checkCode=False): >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() [u'1', u'2', u'', u'3'] - + :param checkCode: whether or not to check the return value of the shell command. """ From 04ae1d5a8820afc3332a66f715b4fa0e154062ed Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Wed, 8 Jul 2015 12:56:19 +0100 Subject: [PATCH 22/22] Remove spurious empty line --- python/pyspark/rdd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 328e1d18d451..c3bcaf9f3de2 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -696,7 +696,6 @@ def pipe(self, command, env={}, checkCode=False): :param checkCode: whether or not to check the return value of the shell command. """ - def func(iterator): pipe = Popen( shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)