Skip to content

Commit

Permalink
clean up codes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Aug 18, 2014
1 parent 9fa249b commit aeaf8a5
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 17 deletions.
4 changes: 2 additions & 2 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ DEPLOY_MODE=${DEPLOY_MODE:-"client"}
# This will be removed after pyprint is moved to PythonDStream.
# Problem is that print function is in (Scala)DStream.
# Whenever python code is executed, we call PythonDStream which passes
# pythonExec(which python Spark should execute).
# pythonExec(which python Spark should execute). pythonExec is used to call python.
# Since pyprint is located in DStream, Spark does not know which python should use.
# In that case, get python path from PYSPARK_PYTHON, environmental variable.
# This fix is ongoing in print branch in https://github.com/giwa/spark/tree/print.

# Figure out which Python executable to use
if [[ -z "$PYSPARK_PYTHON" ]]; then
PYSPARK_PYTHON="python"
fi
export PYSPARK_PYTHON


if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
fi
Expand Down
7 changes: 1 addition & 6 deletions examples/src/main/python/streaming/network_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,15 @@
exit(-1)
conf = SparkConf()
conf.setAppName("PythonStreamingNetworkWordCount")
conf.set("spark.default.parallelism", 1)
ssc = StreamingContext(conf=conf, duration=Seconds(1))

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
fm_lines = lines.flatMap(lambda x: x.split(" "))
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
mapped_lines = fm_lines.map(lambda x: (x, 1))
reduced_lines = mapped_lines.reduce(add)
counted_lines = reduced_lines.count()
reduced_lines = mapped_lines.reduceByKey(add)

fm_lines.pyprint()
filtered_lines.pyprint()
mapped_lines.pyprint()
reduced_lines.pyprint()
counted_lines.pyprint()
ssc.start()
ssc.awaitTermination()
2 changes: 1 addition & 1 deletion examples/src/main/python/streaming/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
fm_lines = lines.flatMap(lambda x: x.split(" "))
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
mapped_lines = fm_lines.map(lambda x: (x, 1))
reduced_lines = mapped_lines.reduce(add)
reduced_lines = mapped_lines.reduceByKey(add)

fm_lines.pyprint()
filtered_lines.pyprint()
Expand Down
14 changes: 7 additions & 7 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ def count(self):
"""
"""
#TODO make sure count implementation, thiis different from what pyspark does
return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
pass
#TODO: make sure count implementation, thiis different from what pyspark does
#return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))

def _sum(self):
"""
"""
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
pass
#return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

def print_(self):
"""
Expand Down Expand Up @@ -85,7 +87,6 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
return PipelinedDStream(self, f, preservesPartitioning)


def reduceByKey(self, func, numPartitions=None):
"""
Merge the value for each key using an associative reduce function.
Expand Down Expand Up @@ -121,7 +122,7 @@ def combineLocally(iterator):
else:
combiners[k] = mergeValue(combiners[k], v)
return combiners.iteritems()
locally_combined = self.mapPartitions(combineLocally)
locally_combined = self._mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)
def _mergeCombiners(iterator):
combiners = {}
Expand All @@ -131,12 +132,11 @@ def _mergeCombiners(iterator):
else:
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)
return shuffled._mapPartitions(_mergeCombiners)

def partitionBy(self, numPartitions, partitionFunc=None):
"""
Return a copy of the DStream partitioned using the specified partitioner.
"""
if numPartitions is None:
numPartitions = self.ctx._defaultReducePartitions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ abstract class DStream[T: ClassTag] (
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

//TODO move pyprint to PythonDStream and executed by py4j call back function
//TODO: move pyprint to PythonDStream and executed by py4j call back function
/**
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
* operator, so this PythonDStream will be registered as an output stream and there materialized.
Expand All @@ -647,6 +647,7 @@ abstract class DStream[T: ClassTag] (

// pythonExec should be passed from python. Move pyprint to PythonDStream
val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")

val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
// Call python script to deserialize and print result in stdout
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath)
Expand Down

0 comments on commit aeaf8a5

Please sign in to comment.