Skip to content

Commit

Permalink
flatten the combined values when dumping into disks
Browse files Browse the repository at this point in the history
this will reduce the memory used when merging many files together.
  • Loading branch information
davies committed Aug 16, 2014
1 parent d05060d commit 250be4e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
10 changes: 8 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long
PickleSerializer, pack_long, FlattedValuesSerializer
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
Expand Down Expand Up @@ -1592,7 +1592,7 @@ def mergeCombiners(a, b):
a.extend(b)
return a

serializer = self.ctx.serializer
serializer = self._jrdd_deserializer
spill = self._can_spill()
memory = self._memory_limit()
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
Expand All @@ -1603,12 +1603,18 @@ def combineLocally(iterator):
merger.mergeValues(iterator)
return merger.iteritems()

# combine them before shuffle could reduce the comparison later
locally_combined = self.mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)

def groupByKey(it):
if spill:
# Flatten the combined values, so it will not consume huge
# memory during merging sort.
serializer = FlattedValuesSerializer(
BatchedSerializer(PickleSerializer(), 1024), 10)
sorted = ExternalSorter(memory * 0.9, serializer).sorted

it = sorted(it, key=operator.itemgetter(0))
for k, v in GroupByKey(it):
yield k, ResultIterable(v)
Expand Down
20 changes: 20 additions & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,26 @@ def __str__(self):
return "BatchedSerializer<%s>" % str(self.serializer)


class FlattedValuesSerializer(BatchedSerializer):

"""
Serializes a stream of list of pairs, split the list of values
which contain more than a certain number of objects to make them
have similar sizes.
"""
def __init__(self, serializer, batchSize=10):
BatchedSerializer.__init__(self, serializer, batchSize)

def _batched(self, iterator):
n = self.batchSize
for key, values in iterator:
for i in xrange(0, len(values), n):
yield key, values[i:i + n]

def load_stream(self, stream):
return self.serializer.load_stream(stream)


class CartesianDeserializer(FramedSerializer):

"""
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,9 @@ def sorted(self, iterator, key=None, reverse=False):
with open(path, 'w') as f:
self.serializer.dump_stream(current_chunk, f)
chunks.append(self.serializer.load_stream(open(path)))
os.unlink(path) # data will be deleted after close
current_chunk = []
gc.collect()

elif not chunks:
batch = min(batch * 2, 10000)
Expand All @@ -490,7 +492,6 @@ def sorted(self, iterator, key=None, reverse=False):

if current_chunk:
chunks.append(iter(current_chunk))

return heapq.merge(chunks, key=key, reverse=reverse)


Expand Down

0 comments on commit 250be4e

Please sign in to comment.