From bb7ccf3a103644b5701b8c667397130d807ce7db Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 15:40:42 -0700 Subject: [PATCH] remove unused import in python --- python/pyspark/streaming/context.py | 9 ------ python/pyspark/streaming/dstream.py | 30 +++---------------- python/pyspark/streaming/duration.py | 17 ++++++++++- python/pyspark/streaming/jtime.py | 24 ++++++++++++++- python/pyspark/streaming/pyprint.py | 19 ++++++++++++ .../streaming/api/python/PythonDStream.scala | 2 +- 6 files changed, 63 insertions(+), 38 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 40e9d98942e2e..d3ff16fca764f 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -15,15 +15,6 @@ # limitations under the License. # -import os -import shutil -import sys -from threading import Lock -from tempfile import NamedTemporaryFile - -from pyspark import accumulators -from pyspark.accumulators import Accumulator -from pyspark.broadcast import Broadcast from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 3df6e5e09b0c1..5766cca39bdee 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -1,28 +1,8 @@ -from base64 import standard_b64encode as b64enc -import copy from collections import defaultdict -from collections import namedtuple from itertools import chain, ifilter, imap -import operator -import os -import sys -import shlex -import traceback -from subprocess import Popen, PIPE -from tempfile import NamedTemporaryFile -from threading import Thread -import warnings -import heapq -from random import Random - -from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ - BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long -from pyspark.join import python_join, python_left_outer_join, \ - python_right_outer_join, python_cogroup -from pyspark.statcounter import StatCounter -from pyspark.rddsampler import RDDSampler -from pyspark.storagelevel import StorageLevel -#from pyspark.resultiterable import ResultIterable + +from pyspark.serializers import NoOpSerializer,\ + BatchedSerializer, CloudPickleSerializer, pack_long from pyspark.rdd import _JavaStackTrace from py4j.java_collections import ListConverter, MapConverter @@ -47,7 +27,7 @@ def generatedRDDs(self): def print_(self): """ """ - # print is a resrved name of Python. We cannot give print to function name + # print is a reserved name of Python. We cannot give print to function name getattr(self._jdstream, "print")() def pyprint(self): @@ -55,7 +35,6 @@ def pyprint(self): """ self._jdstream.pyprint() - def filter(self, f): """ """ @@ -140,7 +119,6 @@ def add_shuffle_key(split, iterator): keyed._bypass_serializer = True with _JavaStackTrace(self.ctx) as st: #JavaDStream - #pairRDD = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairRDD() pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream() partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, id(partitionFunc)) diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py index ef1b4f6cef237..5982146e69026 100644 --- a/python/pyspark/streaming/duration.py +++ b/python/pyspark/streaming/duration.py @@ -1,4 +1,19 @@ -__author__ = 'ktakagiw' +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# from pyspark.streaming import utils diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py index 41670af659ea3..32ef741051283 100644 --- a/python/pyspark/streaming/jtime.py +++ b/python/pyspark/streaming/jtime.py @@ -1,8 +1,30 @@ -__author__ = 'ktakagiw' +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# from pyspark.streaming import utils from pyspark.streaming.duration import Duration +""" +The name of this file, time is not good naming for python +because if we do import time when we want to use native python time package, it does +not import python time package. +""" + + class Time(object): """ Time for Spark Streaming application. Used to set Time diff --git a/python/pyspark/streaming/pyprint.py b/python/pyspark/streaming/pyprint.py index 6e87c985a57e3..1aeb8e50375ed 100644 --- a/python/pyspark/streaming/pyprint.py +++ b/python/pyspark/streaming/pyprint.py @@ -1,5 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + import sys from itertools import chain + from pyspark.serializers import PickleSerializer def collect(binary_file_path): diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 0f4a58eef4a78..cecb10fc30eb4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -55,7 +55,7 @@ class PythonDStream[T: ClassTag]( case None => None } } - + val asJavaDStream = JavaDStream.fromDStream(this) /**