Skip to content

Commit

Permalink
[SPARK-7735] [PYSPARK] Raise Exception on non-zero exit from pipe com…
Browse files Browse the repository at this point in the history
…mands

This will allow problems with piped commands to be detected.
This will also allow tasks to be retried where errors are rare (such as network problems in piped commands).

Author: Scott Taylor <github@megatron.me.uk>

Closes apache#6262 from megatron-me-uk/patch-2 and squashes the following commits:

04ae1d5 [Scott Taylor] Remove spurious empty line
98fa101 [Scott Taylor] fix blank line style error
574b564 [Scott Taylor] Merge pull request #2 from megatron-me-uk/patch-4
0c1e762 [Scott Taylor] Update rdd pipe method for checkCode
ab9a2e1 [Scott Taylor] Update rdd pipe tests for checkCode
eb4801c [Scott Taylor] fix fail_condition
b0ac3a4 [Scott Taylor] Merge pull request #1 from megatron-me-uk/megatron-me-uk-patch-1
a307d13 [Scott Taylor] update rdd tests to test pipe modes
34fcdc3 [Scott Taylor] add optional argument 'mode' for rdd.pipe
a0c0161 [Scott Taylor] fix generator issue
8a9ef9c [Scott Taylor] make check_return_code an iterator
0486ae3 [Scott Taylor] style fixes
8ed89a6 [Scott Taylor] Chain generators to prevent potential deadlock
4153b02 [Scott Taylor] fix list.sort returns None
491d3fc [Scott Taylor] Pass a function handle to assertRaises
3344a21 [Scott Taylor] wrap assertRaises with QuietTest
3ab8c7a [Scott Taylor] remove whitespace for style
cc1a73d [Scott Taylor] fix style issues in pipe test
8db4073 [Scott Taylor] Add a test for rdd pipe functions
1b3dc4e [Scott Taylor] fix missing space around operator style
0974f98 [Scott Taylor] add space between words in multiline string
45f4977 [Scott Taylor] fix line too long style error
5745d85 [Scott Taylor] Remove space to fix style
f552d49 [Scott Taylor] Catch non-zero exit from pipe commands
  • Loading branch information
megatron-me-uk authored and davies committed Jul 11, 2015

Verified

This commit was signed with the committer’s verified signature.
Vlatombe Vincent Latombe
1 parent 3363088 commit 6e1c7e2
Showing 2 changed files with 26 additions and 2 deletions.
16 changes: 14 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
@@ -700,12 +700,14 @@ def groupBy(self, f, numPartitions=None):
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)

@ignore_unicode_prefix
def pipe(self, command, env={}):
def pipe(self, command, env={}, checkCode=False):
"""
Return an RDD created by piping elements to a forked external process.
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
[u'1', u'2', u'', u'3']
:param checkCode: whether or not to check the return value of the shell command.
"""
def func(iterator):
pipe = Popen(
@@ -717,7 +719,17 @@ def pipe_objs(out):
out.write(s.encode('utf-8'))
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))

def check_return_code():
pipe.wait()
if checkCode and pipe.returncode:
raise Exception("Pipe function `%s' exited "
"with error code %d" % (command, pipe.returncode))
else:
for i in range(0):
yield i
return (x.rstrip(b'\n').decode('utf-8') for x in
chain(iter(pipe.stdout.readline, b''), check_return_code()))
return self.mapPartitions(func)

def foreach(self, f):
12 changes: 12 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
@@ -885,6 +885,18 @@ def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
for size in sizes:
self.assertGreater(size, 0)

def test_pipe_functions(self):
data = ['1', '2', '3']
rdd = self.sc.parallelize(data)
with QuietTest(self.sc):
self.assertEqual([], rdd.pipe('cc').collect())
self.assertRaises(Py4JJavaError, rdd.pipe('cc', checkCode=True).collect)
result = rdd.pipe('cat').collect()
result.sort()
[self.assertEqual(x, y) for x, y in zip(data, result)]
self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect)
self.assertEqual([], rdd.pipe('grep 4').collect())


class ProfilerTests(PySparkTestCase):

0 comments on commit 6e1c7e2

Please sign in to comment.