Skip to content

Commit

Permalink
washing test tubes and making coffee
Browse files Browse the repository at this point in the history
  • Loading branch information
dorx committed Jun 12, 2014
1 parent dc699f3 commit 1481b01
Showing 1 changed file with 55 additions and 45 deletions.
100 changes: 55 additions & 45 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ def cache(self):

def persist(self, storageLevel):
"""
Set this RDD's storage level to persist its values across operations after the first time
it is computed. This can only be used to assign a new storage level if the RDD does not
have a storage level set yet.
Set this RDD's storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
"""
self.is_cached = True
javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
Expand All @@ -214,7 +214,8 @@ def persist(self, storageLevel):

def unpersist(self):
"""
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
Mark the RDD as non-persistent, and remove all blocks for it from
memory and disk.
"""
self.is_cached = False
self._jrdd.unpersist()
Expand Down Expand Up @@ -358,7 +359,8 @@ def sample(self, withReplacement, fraction, seed=None):
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed=None):
"""
Return a fixed-size sampled subset of this RDD (currently requires numpy).
Return a fixed-size sampled subset of this RDD (currently requires
numpy).
>>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
[4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
Expand Down Expand Up @@ -401,20 +403,24 @@ def takeSample(self, withReplacement, num, seed=None):
@staticmethod
def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement):
"""
Returns a sampling rate that guarantees a sample of size >= sampleSizeLowerBound 99.99% of
the time.
Returns a sampling rate that guarantees a sample of
size >= sampleSizeLowerBound 99.99% of the time.
How the sampling rate is determined:
Let p = num / total, where num is the sample size and total is the total number of
datapoints in the RDD. We're trying to compute q > p such that
- when sampling with replacement, we're drawing each datapoint with prob_i ~ Pois(q),
where we want to guarantee Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to
total), i.e. the failure rate of not having a sufficiently large sample < 0.0001.
Setting q = p + 5 * sqrt(p/total) is sufficient to guarantee 0.9999 success rate for
num > 12, but we need a slightly larger q (9 empirically determined).
- when sampling without replacement, we're drawing each datapoint with prob_i
~ Binomial(total, fraction) and our choice of q guarantees 1-delta, or 0.9999 success
rate, where success rate is defined the same as in sampling with replacement.
Let p = num / total, where num is the sample size and total is the
total number of data points in the RDD. We're trying to compute
q > p such that
- when sampling with replacement, we're drawing each data point
with prob_i ~ Pois(q), where we want to guarantee
Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to
total), i.e. the failure rate of not having a sufficiently large
sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient
to guarantee 0.9999 success rate for num > 12, but we need a
slightly larger q (9 empirically determined).
- when sampling without replacement, we're drawing each data point
with prob_i ~ Binomial(total, fraction) and our choice of q
guarantees 1-delta, or 0.9999 success rate, where success rate is
defined the same as in sampling with replacement.
"""
fraction = float(sampleSizeLowerBound) / total
if withReplacement:
Expand Down Expand Up @@ -449,8 +455,8 @@ def union(self, other):

def intersection(self, other):
"""
Return the intersection of this RDD and another one. The output will not
contain any duplicate elements, even if the input RDDs did.
Return the intersection of this RDD and another one. The output will
not contain any duplicate elements, even if the input RDDs did.
Note that this method performs a shuffle internally.
Expand Down Expand Up @@ -692,8 +698,8 @@ def aggregate(self, zeroValue, seqOp, combOp):
modify C{t2}.
The first function (seqOp) can return a different result type, U, than
the type of this RDD. Thus, we need one operation for merging a T into an U
and one operation for merging two U
the type of this RDD. Thus, we need one operation for merging a T into
an U and one operation for merging two U
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
Expand Down Expand Up @@ -786,8 +792,9 @@ def stdev(self):

def sampleStdev(self):
"""
Compute the sample standard deviation of this RDD's elements (which corrects for bias in
estimating the standard deviation by dividing by N-1 instead of N).
Compute the sample standard deviation of this RDD's elements (which
corrects for bias in estimating the standard deviation by dividing by
N-1 instead of N).
>>> sc.parallelize([1, 2, 3]).sampleStdev()
1.0
Expand All @@ -796,8 +803,8 @@ def sampleStdev(self):

def sampleVariance(self):
"""
Compute the sample variance of this RDD's elements (which corrects for bias in
estimating the variance by dividing by N-1 instead of N).
Compute the sample variance of this RDD's elements (which corrects
for bias in estimating the variance by dividing by N-1 instead of N).
>>> sc.parallelize([1, 2, 3]).sampleVariance()
1.0
Expand Down Expand Up @@ -849,8 +856,8 @@ def merge(a, b):

def takeOrdered(self, num, key=None):
"""
Get the N elements from a RDD ordered in ascending order or as specified
by the optional key function.
Get the N elements from a RDD ordered in ascending order or as
specified by the optional key function.
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
Expand Down Expand Up @@ -939,8 +946,9 @@ def first(self):

def saveAsPickleFile(self, path, batchSize=10):
"""
Save this RDD as a SequenceFile of serialized objects. The serializer used is
L{pyspark.serializers.PickleSerializer}, default batch size is 10.
Save this RDD as a SequenceFile of serialized objects. The serializer
used is L{pyspark.serializers.PickleSerializer}, default batch size
is 10.
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
Expand Down Expand Up @@ -1208,9 +1216,10 @@ def _mergeCombiners(iterator):

def foldByKey(self, zeroValue, func, numPartitions=None):
"""
Merge the values for each key using an associative function "func" and a neutral "zeroValue"
which may be added to the result an arbitrary number of times, and must not change
the result (e.g., 0 for addition, or 1 for multiplication.).
Merge the values for each key using an associative function "func"
and a neutral "zeroValue" which may be added to the result an
arbitrary number of times, and must not change the result
(e.g., 0 for addition, or 1 for multiplication.).
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> from operator import add
Expand All @@ -1227,8 +1236,8 @@ def groupByKey(self, numPartitions=None):
Hash-partitions the resulting RDD with into numPartitions partitions.
Note: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey will provide much better
performance.
sum or average) over each key, using reduceByKey will provide much
better performance.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
Expand Down Expand Up @@ -1288,8 +1297,8 @@ def groupWith(self, other):
def cogroup(self, other, numPartitions=None):
"""
For each key k in C{self} or C{other}, return a resulting RDD that
contains a tuple with the list of values for that key in C{self} as well
as C{other}.
contains a tuple with the list of values for that key in C{self} as
well as C{other}.
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
Expand All @@ -1300,8 +1309,8 @@ def cogroup(self, other, numPartitions=None):

def subtractByKey(self, other, numPartitions=None):
"""
Return each (key, value) pair in C{self} that has no pair with matching key
in C{other}.
Return each (key, value) pair in C{self} that has no pair with matching
key in C{other}.
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
Expand Down Expand Up @@ -1339,10 +1348,10 @@ def repartition(self, numPartitions):
"""
Return a new RDD that has exactly numPartitions partitions.
Can increase or decrease the level of parallelism in this RDD. Internally, this uses
a shuffle to redistribute data.
If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
which can avoid performing a shuffle.
Can increase or decrease the level of parallelism in this RDD.
Internally, this uses a shuffle to redistribute data.
If you are decreasing the number of partitions in this RDD, consider
using `coalesce`, which can avoid performing a shuffle.
>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
>>> sorted(rdd.glom().collect())
[[1], [2, 3], [4, 5], [6, 7]]
Expand All @@ -1367,9 +1376,10 @@ def coalesce(self, numPartitions, shuffle=False):

def zip(self, other):
"""
Zips this RDD with another one, returning key-value pairs with the first element in each RDD
second element in each RDD, etc. Assumes that the two RDDs have the same number of
partitions and the same number of elements in each partition (e.g. one was made through
Zips this RDD with another one, returning key-value pairs with the
first element in each RDD second element in each RDD, etc. Assumes
that the two RDDs have the same number of partitions and the same
number of elements in each partition (e.g. one was made through
a map on the other).
>>> x = sc.parallelize(range(0,5))
Expand Down

0 comments on commit 1481b01

Please sign in to comment.