From 8b1626c6f26b4fde670834c0e77baa66ab9d2081 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 9 Jun 2022 16:00:43 -0400 Subject: [PATCH 1/2] Fix Hadoop Downloader Range not correct * This fixes ValueError happens when reading a hdfs of file size larger than buffer size * Added related unit test --- .../python/apache_beam/io/hadoopfilesystem.py | 5 ++-- .../apache_beam/io/hadoopfilesystem_test.py | 25 +++++++++++++++++-- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py index 825e9ca8326b..9a153c0984f8 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem.py @@ -71,9 +71,8 @@ def size(self): return self._size def get_range(self, start, end): - with self._hdfs_client.read(self._path, - offset=start, - length=end - start + 1) as reader: + with self._hdfs_client.read(self._path, offset=start, + length=end - start) as reader: return reader.read() diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index 2486b1c8717d..49ef44ddb68d 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -114,8 +114,11 @@ def read(self, path, offset=0, length=None): # old_file is closed and can't be operated upon. Return a copy instead. new_file = FakeFile(path, 'rb') if old_file.saved_data: - new_file.write(old_file.saved_data) - new_file.seek(0) + if length is None: + new_file.write(old_file.saved_data) + else: + new_file.write(old_file.saved_data[:offset + length]) + new_file.seek(offset) return new_file def list(self, path, status=False): @@ -386,6 +389,24 @@ def test_create_write_read_compressed(self): self.assertEqual(data, read_data) handle.close() + def test_random_read_large_file(self): + # this tests HdfsDownloader.get_range() works property with + # filesystemio.readinto() when reading a file of size larger than buffer. + url = self.fs.join(self.tmpdir, 'read_length') + handle = self.fs.create(url) + data = b'test' * 10_000_000 + handle.write(data) + handle.close() + + handle = self.fs.open(url) + handle.seek(100) + # read 3 bytes + read_data = handle.read(3) + self.assertEqual(data[100:103], read_data) + # read 4 bytes + read_data = handle.read(4) + self.assertEqual(data[103:107], read_data) + def test_open(self): url = self.fs.join(self.tmpdir, 'old_file1') handle = self.fs.open(url) From 912e5bca7f1e373bdf1885c5d5e960f28d396266 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 9 Jun 2022 16:27:56 -0400 Subject: [PATCH 2/2] fix typo in comment --- sdks/python/apache_beam/io/hadoopfilesystem_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index 49ef44ddb68d..8c21effc8823 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -390,8 +390,8 @@ def test_create_write_read_compressed(self): handle.close() def test_random_read_large_file(self): - # this tests HdfsDownloader.get_range() works property with - # filesystemio.readinto() when reading a file of size larger than buffer. + # this tests HdfsDownloader.get_range() works properly with + # filesystemio.readinto when reading a file of size larger than the buffer. url = self.fs.join(self.tmpdir, 'read_length') handle = self.fs.create(url) data = b'test' * 10_000_000