Skip to content

Commit

Permalink
add an option to use str in textFile()
Browse files Browse the repository at this point in the history
str is much efficient than unicode
  • Loading branch information
davies committed Aug 14, 2014
1 parent 13f54e2 commit a0295e1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
8 changes: 4 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def pickleFile(self, name, minPartitions=None):
return RDD(self._jsc.objectFile(name, minPartitions), self,
BatchedSerializer(PickleSerializer()))

def textFile(self, name, minPartitions=None):
def textFile(self, name, minPartitions=None, use_unicode=True):
"""
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
Expand All @@ -329,9 +329,9 @@ def textFile(self, name, minPartitions=None):
"""
minPartitions = minPartitions or min(self.defaultParallelism, 2)
return RDD(self._jsc.textFile(name, minPartitions), self,
UTF8Deserializer())
UTF8Deserializer(use_unicode))

def wholeTextFiles(self, path, minPartitions=None):
def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
"""
Read a directory of text files from HDFS, a local file system
(available on all nodes), or any Hadoop-supported file system
Expand Down Expand Up @@ -369,7 +369,7 @@ def wholeTextFiles(self, path, minPartitions=None):
"""
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))

def _dictToJavaMap(self, d):
jm = self._jvm.java.util.HashMap()
Expand Down
23 changes: 15 additions & 8 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,25 @@ class UTF8Deserializer(Serializer):
Deserializes streams written by String.getBytes.
"""

def __init__(self, use_unicode=False):
self.use_unicode = use_unicode

def loads(self, stream):
length = read_int(stream)
return stream.read(length).decode('utf8')
return stream.read(length)

def load_stream(self, stream):
while True:
try:
yield self.loads(stream)
except struct.error:
return
except EOFError:
return
try:
if self.use_unicode:
while True:
yield self.loads(stream).decode("utf-8")
else:
while True:
yield self.loads(stream)
except struct.error:
return
except EOFError:
return


def read_long(stream):
Expand Down

0 comments on commit a0295e1

Please sign in to comment.