From 02d05751ea281d377ce52ad39ccd30e518d2ff5a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 10 Oct 2014 14:27:24 -0700 Subject: [PATCH] add wrapper for foreachRDD() --- python/pyspark/streaming/dstream.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 4533c5d541a51..5d0dface2f043 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -150,6 +150,9 @@ def foreachRDD(self, func): """ Apply a function to each RDD in this DStream. """ + if func.func_code.co_argcount == 1: + old_func = func + func = lambda t, rdd: old_func(rdd) jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) api = self._ssc._jvm.PythonDStream api.callForeachRDD(self._jdstream, jfunc)