Skip to content

Commit

Permalink
[SPARK-2627] [PySpark] have the build enforce PEP 8 automatically
Browse files Browse the repository at this point in the history
As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), we'd like Python code to automatically be checked for PEP 8 compliance by Jenkins. This pull request aims to do that.

Notes:
* We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the build server.
* I'm expecting tests to fail now that PEP 8 compliance is being checked as part of the build. I'm fine with cleaning up any remaining PEP 8 violations as part of this pull request.
* I did not understand why the RAT and scalastyle reports are saved to text files. I did the same for the PEP 8 check, but only so that the console output style can match those for the RAT and scalastyle checks. The PEP 8 report is removed right after the check is complete.
* Updates to the ["Contributing to Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) guide will be submitted elsewhere, as I don't believe that text is part of the Spark repo.

Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Author: nchammas <nicholas.chammas@gmail.com>

Closes #1744 from nchammas/master and squashes the following commits:

274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes
983d963 [nchammas] Merge pull request #5 from apache/master
1db5314 [nchammas] Merge pull request #4 from apache/master
0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes
bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing
6db9a44 [nchammas] Merge pull request #3 from apache/master
7b4750e [Nicholas Chammas] merge upstream changes
91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks
44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files
b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily
bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes
9da347f [nchammas] Merge pull request #2 from apache/master
aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks
d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine
dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime
a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections
21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8
6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes
fe57ed0 [Nicholas Chammas] removing merge conflict backups
9c01d4c [nchammas] Merge pull request #1 from apache/master
9a66cb0 [Nicholas Chammas] resolving merge conflicts
a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes
beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status
723ed39 [Nicholas Chammas] always delete the report file
0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests
12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter
61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter
75ad552 [Nicholas Chammas] make check output style consistent

(cherry picked from commit d614967)
Signed-off-by: Reynold Xin <rxin@apache.org>
  • Loading branch information
nchammas authored and rxin committed Aug 6, 2014
1 parent cf8e7fd commit 4c19614
Show file tree
Hide file tree
Showing 32 changed files with 348 additions and 136 deletions.
60 changes: 60 additions & 0 deletions dev/lint-python
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)"
PEP8_REPORT_PATH="$SPARK_ROOT_DIR/dev/pep8-report.txt"

cd $SPARK_ROOT_DIR

# Get pep8 at runtime so that we don't rely on it being installed on the build server.
#+ See: https://github.com/apache/spark/pull/1744#issuecomment-50982162
#+ TODOs:
#+ - Dynamically determine latest release version of pep8 and use that.
#+ - Download this from a more reliable source. (GitHub raw can be flaky, apparently. (?))
PEP8_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pep8.py"
PEP8_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/jcrocholl/pep8/1.5.7/pep8.py"

curl --silent -o "$PEP8_SCRIPT_PATH" "$PEP8_SCRIPT_REMOTE_PATH"
curl_status=$?

if [ $curl_status -ne 0 ]; then
echo "Failed to download pep8.py from \"$PEP8_SCRIPT_REMOTE_PATH\"."
exit $curl_status
fi


# There is no need to write this output to a file
#+ first, but we do so so that the check status can
#+ be output before the report, like with the
#+ scalastyle and RAT checks.
python $PEP8_SCRIPT_PATH ./python > "$PEP8_REPORT_PATH"
pep8_status=${PIPESTATUS[0]} #$?

if [ $pep8_status -ne 0 ]; then
echo "PEP 8 checks failed."
cat "$PEP8_REPORT_PATH"
else
echo "PEP 8 checks passed."
fi

rm -f "$PEP8_REPORT_PATH"
rm "$PEP8_SCRIPT_PATH"

exit $pep8_status
23 changes: 23 additions & 0 deletions dev/lint-scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)"

"$SCRIPT_DIR/scalastyle"
13 changes: 12 additions & 1 deletion dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,25 @@ fi
set -e
set -o pipefail

echo ""
echo "========================================================================="
echo "Running Apache RAT checks"
echo "========================================================================="
dev/check-license

echo ""
echo "========================================================================="
echo "Running Scala style checks"
echo "========================================================================="
dev/scalastyle
dev/lint-scala

echo ""
echo "========================================================================="
echo "Running Python style checks"
echo "========================================================================="
dev/lint-python

echo ""
echo "========================================================================="
echo "Running Spark unit tests"
echo "========================================================================="
Expand All @@ -89,11 +98,13 @@ fi
echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS clean package assembly/assembly test | \
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"

echo ""
echo "========================================================================="
echo "Running PySpark tests"
echo "========================================================================="
./python/run-tests

echo ""
echo "========================================================================="
echo "Detecting binary incompatibilites with MiMa"
echo "========================================================================="
Expand Down
2 changes: 1 addition & 1 deletion dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ if test ! -z "$ERRORS"; then
echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS"
exit 1
else
echo -e "Scalastyle checks passed.\n"
echo -e "Scalastyle checks passed."
fi
7 changes: 7 additions & 0 deletions python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _deserialize_accumulator(aid, zero_value, accum_param):


class Accumulator(object):

"""
A shared variable that can be accumulated, i.e., has a commutative and associative "add"
operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=}
Expand Down Expand Up @@ -166,6 +167,7 @@ def __repr__(self):


class AccumulatorParam(object):

"""
Helper object that defines how to accumulate values of a given type.
"""
Expand All @@ -186,6 +188,7 @@ def addInPlace(self, value1, value2):


class AddingAccumulatorParam(AccumulatorParam):

"""
An AccumulatorParam that uses the + operators to add values. Designed for simple types
such as integers, floats, and lists. Requires the zero value for the underlying type
Expand All @@ -210,6 +213,7 @@ def addInPlace(self, value1, value2):


class _UpdateRequestHandler(SocketServer.StreamRequestHandler):

"""
This handler will keep polling updates from the same socket until the
server is shutdown.
Expand All @@ -228,7 +232,9 @@ def handle(self):
# Write a byte in acknowledgement
self.wfile.write(struct.pack("!b", 1))


class AccumulatorServer(SocketServer.TCPServer):

"""
A simple TCP server that intercepts shutdown() in order to interrupt
our continuous polling on the handler.
Expand All @@ -239,6 +245,7 @@ def shutdown(self):
self.server_shutdown = True
SocketServer.TCPServer.shutdown(self)


def _start_update_server():
"""Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler)
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def _from_id(bid):


class Broadcast(object):

"""
A broadcast variable created with
L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}.
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@


class SparkConf(object):

"""
Configuration for a Spark application. Used to set various Spark
parameters as key-value pairs.
Expand Down
25 changes: 14 additions & 11 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@


class SparkContext(object):

"""
Main entry point for Spark functionality. A SparkContext represents the
connection to a Spark cluster, and can be used to create L{RDD}s and
Expand Down Expand Up @@ -213,7 +214,7 @@ def _ensure_initialized(cls, instance=None, gateway=None):

if instance:
if (SparkContext._active_spark_context and
SparkContext._active_spark_context != instance):
SparkContext._active_spark_context != instance):
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite
Expand Down Expand Up @@ -406,7 +407,7 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
keyConverter, valueConverter, minSplits, batchSize)
keyConverter, valueConverter, minSplits, batchSize)
return RDD(jrdd, self, ser)

def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
Expand Down Expand Up @@ -437,7 +438,8 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf, batchSize)
valueClass, keyConverter, valueConverter,
jconf, batchSize)
return RDD(jrdd, self, ser)

def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
Expand Down Expand Up @@ -465,7 +467,8 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf, batchSize)
valueClass, keyConverter, valueConverter,
jconf, batchSize)
return RDD(jrdd, self, ser)

def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
Expand Down Expand Up @@ -496,7 +499,8 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf, batchSize)
valueClass, keyConverter, valueConverter,
jconf, batchSize)
return RDD(jrdd, self, ser)

def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
Expand All @@ -523,8 +527,9 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
jconf = self._dictToJavaMap(conf)
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
keyConverter, valueConverter, jconf, batchSize)
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
return RDD(jrdd, self, ser)

def _checkpointFile(self, name, input_deserializer):
Expand Down Expand Up @@ -555,8 +560,7 @@ def union(self, rdds):
first = rdds[0]._jrdd
rest = [x._jrdd for x in rdds[1:]]
rest = ListConverter().convert(rest, self._gateway._gateway_client)
return RDD(self._jsc.union(first, rest), self,
rdds[0]._jrdd_deserializer)
return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer)

def broadcast(self, value):
"""
Expand All @@ -568,8 +572,7 @@ def broadcast(self, value):
pickleSer = PickleSerializer()
pickled = pickleSer.dumps(value)
jbroadcast = self._jsc.broadcast(bytearray(pickled))
return Broadcast(jbroadcast.id(), value, jbroadcast,
self._pickled_broadcast_vars)
return Broadcast(jbroadcast.id(), value, jbroadcast, self._pickled_broadcast_vars)

def accumulator(self, value, accum_param=None):
"""
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def worker(sock):
"""
# Redirect stdout to stderr
os.dup2(2, 1)
sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1
sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1

signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
Expand Down Expand Up @@ -134,8 +134,7 @@ def handle_sigchld(*args):
try:
os.kill(worker_pid, signal.SIGKILL)
except OSError:
pass # process already died

pass # process already died

if listen_sock in ready_fds:
sock, addr = listen_sock.accept()
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


class SparkFiles(object):

"""
Resolves paths to files added through
L{SparkContext.addFile()<pyspark.context.SparkContext.addFile>}.
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def preexec_func():
# Create a thread to echo output from the GatewayServer, which is required
# for Java log output to show up:
class EchoOutputThread(Thread):

def __init__(self, stream):
Thread.__init__(self)
self.daemon = True
Expand Down
5 changes: 3 additions & 2 deletions python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
# Python interpreter must agree on what endian the machine is.


DENSE_VECTOR_MAGIC = 1
DENSE_VECTOR_MAGIC = 1
SPARSE_VECTOR_MAGIC = 2
DENSE_MATRIX_MAGIC = 3
DENSE_MATRIX_MAGIC = 3
LABELED_POINT_MAGIC = 4


Expand Down Expand Up @@ -443,6 +443,7 @@ def _serialize_rating(r):


class RatingDeserializer(Serializer):

def loads(self, stream):
length = struct.unpack("!i", stream.read(4))[0]
ba = stream.read(length)
Expand Down
Loading

0 comments on commit 4c19614

Please sign in to comment.