From a6796a979deb0fe3007cb708e1c3ce8a396e37d9 Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Wed, 15 May 2024 15:54:19 -0700 Subject: [PATCH 01/22] Initial Modifications, adding min-max heap to track min and max reader performantly --- codalab/lib/beam/MultiReaderFileStream.py | 93 +++++++++++++++++++---- 1 file changed, 77 insertions(+), 16 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index bab64cc78..d8743bb79 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -1,21 +1,82 @@ from io import BytesIO from threading import Lock +import time from codalab.worker.un_gzip_stream import BytesBuffer +import heapq -class MultiReaderFileStream(BytesIO): +class MinMaxHeap: + def __init__(self): + self.heap = [] + self.item_index = {} # Dictionary to store indices of elements + + def push(self, item): + heapq.heappush(self.heap, item) + index = len(self.heap) - 1 + self.item_index[item] = index + + def pop(self): + if self.heap: + item = heapq.heappop(self.heap) + del self.item_index[item] + return item + else: + raise IndexError("pop from an empty heap") + + def update(self, old_item, new_item): + index = self.item_index.pop(old_item) + self.heap[index] = new_item + self.item_index[new_item] = index + heapq._siftup(self.heap, index) + heapq._siftdown(self.heap, 0, index) + + def min(self): + if self.heap: + return self.heap[0] + + def max(self): + if self.heap: + if len(self.heap) == 1: + return self.heap[0] + elif len(self.heap) == 2: + return self.heap[1] + else: + return max(self.heap[1], self.heap[2]) + + def min_index(self): + if self.heap: + return self.item_index[self.min()] + + def max_index(self): + if self.heap: + return self.item_index[self.max()] + + def get_at_index(self, index): + if index < len(self.heap): + return self.heap[index] + else: + raise IndexError("Index out of range") + +class MultiReaderFileStream2(BytesIO): """ - FileStream that support multiple readers + FileStream that support multiple readers and seeks backwards """ NUM_READERS = 2 + LOOKBACK_LENGTH = 33554432 + MAX_THRESHOLD = LOOKBACK_LENGTH * 4 def __init__(self, fileobj): - self._bufs = [BytesBuffer() for _ in range(0, self.NUM_READERS)] - self._pos = [0 for _ in range(0, self.NUM_READERS)] + self._buffer = bytes() + self._buffer_pos = 0 # position in the fileobj (min reader position - LOOKBACK LENGTH) + self._size = 0 # size of bytes (for convenience) + self._pos = MinMaxHeap() # position of each reader self._fileobj = fileobj - self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffers. + self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffer. + for i in range(0, self.NUM_READERS): + self._pos.push(0) + assert self._pos.get_at_index(i) == 0 class FileStreamReader(BytesIO): def __init__(s, index): s._index = index @@ -28,22 +89,22 @@ def peek(s, num_bytes): self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)] - def _fill_buf_bytes(self, index: int, num_bytes=None): + def _fill_buf_bytes(self, num_bytes=None): with self._lock: - while num_bytes is None or len(self._bufs[index]) < num_bytes: - s = self._fileobj.read(num_bytes) - if not s: - break - for i in range(0, self.NUM_READERS): - self._bufs[i].write(s) - - def read(self, index: int, num_bytes=None): # type: ignore + s = self._fileobj.read(num_bytes) + if not s: + return + self._buffer += s + + def read(self, index: int, num_bytes=0): # type: ignore """Read the specified number of bytes from the associated file. index: index that specifies which reader is reading. """ self._fill_buf_bytes(index, num_bytes) - if num_bytes is None: - num_bytes = len(self._bufs[index]) + # if num_bytes is None: + # num_bytes = len(self._bufs[index]) + while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: + time.sleep(.1) # 100 ms s = self._bufs[index].read(num_bytes) self._pos[index] += len(s) return s From 17449a4b708b560137e158f752ab7ab1188d1292 Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Wed, 15 May 2024 19:41:59 -0700 Subject: [PATCH 02/22] Change update function --- codalab/lib/beam/MultiReaderFileStream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index d8743bb79..e3ad5e3e6 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -24,10 +24,11 @@ def pop(self): else: raise IndexError("pop from an empty heap") - def update(self, old_item, new_item): - index = self.item_index.pop(old_item) + def update(self, index, new_item): + old_item = self.heap[index] self.heap[index] = new_item self.item_index[new_item] = index + del self.item_index[old_item] heapq._siftup(self.heap, index) heapq._siftdown(self.heap, 0, index) From 6de3afb5a21e4984cb81193b0a3bc4929d006576 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 15 May 2024 22:25:41 -0700 Subject: [PATCH 03/22] Read done --- codalab/lib/beam/MultiReaderFileStream.py | 35 ++++++++++++++++++----- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index e3ad5e3e6..8c06a0bd3 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -1,4 +1,4 @@ -from io import BytesIO +from io import BytesIO, SEEK_SET, SEEK_END from threading import Lock import time @@ -53,13 +53,13 @@ def max_index(self): if self.heap: return self.item_index[self.max()] - def get_at_index(self, index): + def get_at_index(self, index: int): if index < len(self.heap): return self.heap[index] else: raise IndexError("Index out of range") -class MultiReaderFileStream2(BytesIO): +class MultiReaderFileStream(BytesIO): """ FileStream that support multiple readers and seeks backwards """ @@ -69,7 +69,7 @@ class MultiReaderFileStream2(BytesIO): def __init__(self, fileobj): self._buffer = bytes() - self._buffer_pos = 0 # position in the fileobj (min reader position - LOOKBACK LENGTH) + self._buffer_pos = 0 # start position of buffer in the fileobj (min reader position - LOOKBACK LENGTH) self._size = 0 # size of bytes (for convenience) self._pos = MinMaxHeap() # position of each reader self._fileobj = fileobj @@ -87,6 +87,9 @@ def read(s, num_bytes=None): def peek(s, num_bytes): return self.peek(s._index, num_bytes) + + def seek(s, offset, whence): + return self.seek(s._index, offset, whence) self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)] @@ -96,6 +99,7 @@ def _fill_buf_bytes(self, num_bytes=None): if not s: return self._buffer += s + self._size += len(s) def read(self, index: int, num_bytes=0): # type: ignore """Read the specified number of bytes from the associated file. @@ -104,10 +108,24 @@ def read(self, index: int, num_bytes=0): # type: ignore self._fill_buf_bytes(index, num_bytes) # if num_bytes is None: # num_bytes = len(self._bufs[index]) - while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: + while (self._pos.get_at_index(index) + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: time.sleep(.1) # 100 ms - s = self._bufs[index].read(num_bytes) - self._pos[index] += len(s) + + old_position = self._pos.get_at_index(index) + s = self._buffer[old_position:old_position + num_bytes] + + # Modify position + new_position = old_position + len(s) + self._pos.update(index, new_position) + + # Update buffer if this reader is the minimum reader + diff = (self._pos.min() - self.LOOKBACK_LENGTH) - self._buffer_pos # calculated min position of buffer minus current min position of buffer + # NOTE: it's possible for diff < 0 if seek backwards occur + if diff > 0: + self._buffer = self._buffer[diff:] + self._buffer_pos += diff + self._size -= diff + return s def peek(self, index: int, num_bytes): # type: ignore @@ -115,5 +133,8 @@ def peek(self, index: int, num_bytes): # type: ignore s = self._bufs[index].peek(num_bytes) return s + def seek(self, index: int, offset: int, whence=SEEK_SET): + pass + def close(self): self.__input.close() From 0b109c2e88ea8302f99cda139cd3ca55d8d303d3 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 15 May 2024 22:28:52 -0700 Subject: [PATCH 04/22] Some seek --- codalab/lib/beam/MultiReaderFileStream.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index 8c06a0bd3..9bcb51a31 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -134,7 +134,10 @@ def peek(self, index: int, num_bytes): # type: ignore return s def seek(self, index: int, offset: int, whence=SEEK_SET): - pass - + if whence == SEEK_END: + super().seek(offset, whence) + else: + pass + def close(self): self.__input.close() From 9490a012681ee691ff7c382b69d7eadecbcd6885 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 16 May 2024 00:09:38 -0700 Subject: [PATCH 05/22] Fixed race condition --- codalab/lib/beam/MultiReaderFileStream.py | 67 +++++++++++++---------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index 9bcb51a31..5129d05e2 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -25,10 +25,8 @@ def pop(self): raise IndexError("pop from an empty heap") def update(self, index, new_item): - old_item = self.heap[index] self.heap[index] = new_item self.item_index[new_item] = index - del self.item_index[old_item] heapq._siftup(self.heap, index) heapq._siftdown(self.heap, 0, index) @@ -71,13 +69,14 @@ def __init__(self, fileobj): self._buffer = bytes() self._buffer_pos = 0 # start position of buffer in the fileobj (min reader position - LOOKBACK LENGTH) self._size = 0 # size of bytes (for convenience) - self._pos = MinMaxHeap() # position of each reader + # self._pos = MinMaxHeap() # position of each reader + self._pos = [0 for _ in range(self.NUM_READERS)] # position of each reader self._fileobj = fileobj self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffer. - for i in range(0, self.NUM_READERS): - self._pos.push(0) - assert self._pos.get_at_index(i) == 0 + # for i in range(0, self.NUM_READERS): + # self._pos.push(0) + # assert self._pos.get_at_index(i) == 0 class FileStreamReader(BytesIO): def __init__(s, index): s._index = index @@ -88,7 +87,7 @@ def read(s, num_bytes=None): def peek(s, num_bytes): return self.peek(s._index, num_bytes) - def seek(s, offset, whence): + def seek(s, offset, whence=SEEK_SET): return self.seek(s._index, offset, whence) self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)] @@ -105,39 +104,51 @@ def read(self, index: int, num_bytes=0): # type: ignore """Read the specified number of bytes from the associated file. index: index that specifies which reader is reading. """ - self._fill_buf_bytes(index, num_bytes) + # Calculate how many new bytes need to be read + print("READING HERE with num_bytes, index, self._buffer_pos, self._size: ", num_bytes, index, self._buffer_pos, self._size) + num_bytes -= max(self._pos) - self._pos[index] + if num_bytes > 0: + print("new num bytes: ", num_bytes) + self._fill_buf_bytes(num_bytes) # if num_bytes is None: # num_bytes = len(self._bufs[index]) - while (self._pos.get_at_index(index) + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: + while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: + print("SLEEPING") time.sleep(.1) # 100 ms - old_position = self._pos.get_at_index(index) - s = self._buffer[old_position:old_position + num_bytes] - - # Modify position - new_position = old_position + len(s) - self._pos.update(index, new_position) - - # Update buffer if this reader is the minimum reader - diff = (self._pos.min() - self.LOOKBACK_LENGTH) - self._buffer_pos # calculated min position of buffer minus current min position of buffer - # NOTE: it's possible for diff < 0 if seek backwards occur - if diff > 0: - self._buffer = self._buffer[diff:] - self._buffer_pos += diff - self._size -= diff - + with self._lock: + old_position = self._pos[index] + s = self._buffer[old_position:old_position + num_bytes] + + # Modify position + self._pos[index] += len(s) + + # Update buffer if this reader is the minimum reader + diff = (min(self._pos) - self.LOOKBACK_LENGTH) - self._buffer_pos # calculated min position of buffer minus current min position of buffer + # NOTE: it's possible for diff < 0 if seek backwards occur + print("DIFF: ", diff) + if diff > 0: + self._buffer = self._buffer[diff:] + self._buffer_pos += diff + self._size -= diff + + print("Length of return: ", len(s)) return s def peek(self, index: int, num_bytes): # type: ignore - self._fill_buf_bytes(index, num_bytes) - s = self._bufs[index].peek(num_bytes) - return s + pass + # self._fill_buf_bytes(index, num_bytes) + # s = self._bufs[index].peek(num_bytes) + # return s def seek(self, index: int, offset: int, whence=SEEK_SET): + print("SEEKING SEEKING SEEKING") + print(index, offset) if whence == SEEK_END: super().seek(offset, whence) else: - pass + assert offset >= self._buffer_pos + self._pos[index] = offset def close(self): self.__input.close() From 035ba30ca81a1ce55792a4ce9e8cdf480d8787b9 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 16 May 2024 00:27:11 -0700 Subject: [PATCH 06/22] Implementation Done --- codalab/lib/beam/MultiReaderFileStream.py | 93 +++-------------------- 1 file changed, 12 insertions(+), 81 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index 5129d05e2..631f82c51 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -1,62 +1,6 @@ from io import BytesIO, SEEK_SET, SEEK_END from threading import Lock import time - -from codalab.worker.un_gzip_stream import BytesBuffer - -import heapq - -class MinMaxHeap: - def __init__(self): - self.heap = [] - self.item_index = {} # Dictionary to store indices of elements - - def push(self, item): - heapq.heappush(self.heap, item) - index = len(self.heap) - 1 - self.item_index[item] = index - - def pop(self): - if self.heap: - item = heapq.heappop(self.heap) - del self.item_index[item] - return item - else: - raise IndexError("pop from an empty heap") - - def update(self, index, new_item): - self.heap[index] = new_item - self.item_index[new_item] = index - heapq._siftup(self.heap, index) - heapq._siftdown(self.heap, 0, index) - - def min(self): - if self.heap: - return self.heap[0] - - def max(self): - if self.heap: - if len(self.heap) == 1: - return self.heap[0] - elif len(self.heap) == 2: - return self.heap[1] - else: - return max(self.heap[1], self.heap[2]) - - def min_index(self): - if self.heap: - return self.item_index[self.min()] - - def max_index(self): - if self.heap: - return self.item_index[self.max()] - - def get_at_index(self, index: int): - if index < len(self.heap): - return self.heap[index] - else: - raise IndexError("Index out of range") - class MultiReaderFileStream(BytesIO): """ FileStream that support multiple readers and seeks backwards @@ -70,13 +14,9 @@ def __init__(self, fileobj): self._buffer_pos = 0 # start position of buffer in the fileobj (min reader position - LOOKBACK LENGTH) self._size = 0 # size of bytes (for convenience) # self._pos = MinMaxHeap() # position of each reader - self._pos = [0 for _ in range(self.NUM_READERS)] # position of each reader + self._pos = [0 for _ in range(self.NUM_READERS)] # position of each reader in the fileobj self._fileobj = fileobj self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffer. - - # for i in range(0, self.NUM_READERS): - # self._pos.push(0) - # assert self._pos.get_at_index(i) == 0 class FileStreamReader(BytesIO): def __init__(s, index): s._index = index @@ -93,31 +33,27 @@ def seek(s, offset, whence=SEEK_SET): self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)] def _fill_buf_bytes(self, num_bytes=None): - with self._lock: - s = self._fileobj.read(num_bytes) - if not s: - return - self._buffer += s - self._size += len(s) + # with self._lock: + s = self._fileobj.read(num_bytes) + if not s: + return + self._buffer += s + self._size += len(s) def read(self, index: int, num_bytes=0): # type: ignore """Read the specified number of bytes from the associated file. index: index that specifies which reader is reading. """ # Calculate how many new bytes need to be read - print("READING HERE with num_bytes, index, self._buffer_pos, self._size: ", num_bytes, index, self._buffer_pos, self._size) - num_bytes -= max(self._pos) - self._pos[index] - if num_bytes > 0: - print("new num bytes: ", num_bytes) - self._fill_buf_bytes(num_bytes) - # if num_bytes is None: - # num_bytes = len(self._bufs[index]) + with self._lock: + new_bytes_needed = num_bytes - (max(self._pos) - self._pos[index]) + if new_bytes_needed > 0: + self._fill_buf_bytes(new_bytes_needed) while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: - print("SLEEPING") time.sleep(.1) # 100 ms with self._lock: - old_position = self._pos[index] + old_position = self._pos[index] - self._buffer_pos s = self._buffer[old_position:old_position + num_bytes] # Modify position @@ -126,13 +62,10 @@ def read(self, index: int, num_bytes=0): # type: ignore # Update buffer if this reader is the minimum reader diff = (min(self._pos) - self.LOOKBACK_LENGTH) - self._buffer_pos # calculated min position of buffer minus current min position of buffer # NOTE: it's possible for diff < 0 if seek backwards occur - print("DIFF: ", diff) if diff > 0: self._buffer = self._buffer[diff:] self._buffer_pos += diff self._size -= diff - - print("Length of return: ", len(s)) return s def peek(self, index: int, num_bytes): # type: ignore @@ -142,8 +75,6 @@ def peek(self, index: int, num_bytes): # type: ignore # return s def seek(self, index: int, offset: int, whence=SEEK_SET): - print("SEEKING SEEKING SEEKING") - print(index, offset) if whence == SEEK_END: super().seek(offset, whence) else: From 1c73adc723c0850da8463d73f5e5cbd8aa0f79e5 Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Tue, 21 May 2024 01:08:04 -0700 Subject: [PATCH 07/22] Added tests for tar gz --- tests/unit/lib/upload_manager_test.py | 45 +++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/tests/unit/lib/upload_manager_test.py b/tests/unit/lib/upload_manager_test.py index 216612a49..b7ee7395c 100644 --- a/tests/unit/lib/upload_manager_test.py +++ b/tests/unit/lib/upload_manager_test.py @@ -18,7 +18,7 @@ from tests.unit.server.bundle_manager import TestBase urlopen_real = urllib.request.urlopen - +LARGE_FILE_SIZE = 16777216 #16MB class UploadManagerTestBase(TestBase): """A class that contains the base for an UploadManager test. Subclasses @@ -42,6 +42,13 @@ def check_file_equals_string(self, file_subpath: str, expected_contents: str): def listdir(self): """List the files in the current bundle location.""" raise NotImplementedError + + def check_file_size(self): + """Check the file sizes in the current bundle location""" + with FileSystems.open( + self.bundle_location, compression_type=CompressionTypes.UNCOMPRESSED + ) as f, tarfile.open(fileobj=f, mode='r:gz') as tf: + return [tarinfo.size for tarinfo in tf] @property def bundle_location(self): @@ -78,6 +85,36 @@ def test_fileobj_gz(self): self.do_upload(('source.gz', BytesIO(gzip_bytestring(b'testing')))) self.check_file_equals_string('', 'testing') + def test_fileobj_tar_gz(self): + source = os.path.join(self.temp_dir, 'source_dir') + os.mkdir(source) + self.write_file_of_size(10, os.path.join(source, 'file')) + self.assertEqual(['file'], sorted(self.listdir())) + self.assertEqual([10], self.check_file_size()) + + def test_large_fileobj_tar_gz(self): + """ + Large bundles should not cause issues + """ + source = os.path.join(self.temp_dir, 'source_dir') + os.mkdir(source) + self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile')) + self.write_string_to_file('testing', os.path.join(source, 'README')) + self.do_upload(('source.tar.gz', tar_gzip_directory(source))) + self.assertEqual(['bigfile', 'README'], sorted(self.listdir())) + + def test_large_fileobj_tar_gz2(self): + """ + Large bundles should not cause issues + """ + source = os.path.join(self.temp_dir, 'source_dir') + os.mkdir(source) + self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile')) + self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile2')) + self.do_upload(('source.tar.gz', tar_gzip_directory(source))) + self.assertEqual(['bigfile', 'bigfile2'], sorted(self.listdir())) + self.assertEqual([LARGE_FILE_SIZE, LARGE_FILE_SIZE], self.check_file_size()) + def test_fileobj_tar_gz_should_not_simplify_archives(self): source = os.path.join(self.temp_dir, 'source_dir') os.mkdir(source) @@ -108,7 +145,7 @@ def test_fileobj_tar_gz_with_dsstore_should_not_simplify_archive_2(self): self.write_string_to_file('testing', os.path.join(source, '.DS_Store')) self.do_upload(('source.tar.gz', tar_gzip_directory(source))) self.assertEqual(['.DS_Store', 'README', 'README2'], sorted(self.listdir())) - + def mock_url_source(self, fileobj, ext=""): """Returns a URL that is mocked to return the contents of fileobj. The URL will end in the extension "ext", if given. @@ -153,6 +190,10 @@ def write_string_to_file(self, string, file_path): with open(file_path, 'w') as f: f.write(string) + def write_file_of_size(self, size: int, file_path: str): + with open(file_path, "wb") as f: + f.seek(size - 1) + f.write(b"\0") class UploadManagerDiskStorageTest(UploadManagerTestBase, unittest.TestCase): """Tests for UploadManager that upload files to disk storage.""" From 8450e30a312c719bc19fb6ba9be0e0333e1cd184 Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Wed, 22 May 2024 22:29:50 -0700 Subject: [PATCH 08/22] Fixed tar.gz test, added memory test --- tests/unit/lib/upload_manager_test.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/unit/lib/upload_manager_test.py b/tests/unit/lib/upload_manager_test.py index b7ee7395c..b42f1650b 100644 --- a/tests/unit/lib/upload_manager_test.py +++ b/tests/unit/lib/upload_manager_test.py @@ -19,6 +19,7 @@ urlopen_real = urllib.request.urlopen LARGE_FILE_SIZE = 16777216 #16MB +EXTRA_LARGE_FILE_SIZE = 134217728 #128MB for Memory Profiling Only class UploadManagerTestBase(TestBase): """A class that contains the base for an UploadManager test. Subclasses @@ -89,6 +90,7 @@ def test_fileobj_tar_gz(self): source = os.path.join(self.temp_dir, 'source_dir') os.mkdir(source) self.write_file_of_size(10, os.path.join(source, 'file')) + self.do_upload(('source.tar.gz', tar_gzip_directory(source))) self.assertEqual(['file'], sorted(self.listdir())) self.assertEqual([10], self.check_file_size()) @@ -186,6 +188,10 @@ def test_url_git(self): # change, then update this test. self.check_file_equals_string('testfile.md', '# test\nUsed for testing\n') + def test_upload_memory(self): + self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(self.temp_dir, 'bigfile')) + self.do_upload(('bigfile', os.path.join(self.temp_dir, 'bigfile'))) + def write_string_to_file(self, string, file_path): with open(file_path, 'w') as f: f.write(string) From 2e5e6ceec9993dbd2cd0bb1d7a295a1d5c54f000 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 22 May 2024 22:39:19 -0700 Subject: [PATCH 09/22] Added memory profiler --- requirements.txt | 1 + tests/unit/lib/upload_manager_test.py | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 6abd213e5..c9e45860a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,3 +45,4 @@ websockets==9.1 kubernetes==12.0.1 google-cloud-storage==2.0.0 httpio==0.3.0 +memory_profiler==0.61.0 diff --git a/tests/unit/lib/upload_manager_test.py b/tests/unit/lib/upload_manager_test.py index b42f1650b..fab22c503 100644 --- a/tests/unit/lib/upload_manager_test.py +++ b/tests/unit/lib/upload_manager_test.py @@ -10,6 +10,7 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems import FileSystems from io import BytesIO +from memory_profiler import memory_usage from typing import IO, cast from unittest.mock import MagicMock from urllib.response import addinfourl @@ -190,7 +191,12 @@ def test_url_git(self): def test_upload_memory(self): self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(self.temp_dir, 'bigfile')) - self.do_upload(('bigfile', os.path.join(self.temp_dir, 'bigfile'))) + mem_usage = memory_usage( + (self.do_upload(('bigfile', os.path.join(self.temp_dir, 'bigfile'))), ), + interval=0.1, + timeout=1 + ) + self.assertEqual(max(memory_usage) < 40000000, True) def write_string_to_file(self, string, file_path): with open(file_path, 'w') as f: From 28d642fc5986b8961f3886b8dbeba9e17f50c9c5 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 22 May 2024 23:19:26 -0700 Subject: [PATCH 10/22] Small test changes --- tests/unit/lib/upload_manager_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/lib/upload_manager_test.py b/tests/unit/lib/upload_manager_test.py index fab22c503..65f09a8b3 100644 --- a/tests/unit/lib/upload_manager_test.py +++ b/tests/unit/lib/upload_manager_test.py @@ -93,7 +93,7 @@ def test_fileobj_tar_gz(self): self.write_file_of_size(10, os.path.join(source, 'file')) self.do_upload(('source.tar.gz', tar_gzip_directory(source))) self.assertEqual(['file'], sorted(self.listdir())) - self.assertEqual([10], self.check_file_size()) + self.assertEqual([0, 10], self.check_file_size()) def test_large_fileobj_tar_gz(self): """ @@ -104,7 +104,7 @@ def test_large_fileobj_tar_gz(self): self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile')) self.write_string_to_file('testing', os.path.join(source, 'README')) self.do_upload(('source.tar.gz', tar_gzip_directory(source))) - self.assertEqual(['bigfile', 'README'], sorted(self.listdir())) + self.assertEqual(['README', 'bigfile'], sorted(self.listdir())) def test_large_fileobj_tar_gz2(self): """ @@ -116,7 +116,7 @@ def test_large_fileobj_tar_gz2(self): self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile2')) self.do_upload(('source.tar.gz', tar_gzip_directory(source))) self.assertEqual(['bigfile', 'bigfile2'], sorted(self.listdir())) - self.assertEqual([LARGE_FILE_SIZE, LARGE_FILE_SIZE], self.check_file_size()) + self.assertEqual([0, LARGE_FILE_SIZE, LARGE_FILE_SIZE], self.check_file_size()) def test_fileobj_tar_gz_should_not_simplify_archives(self): source = os.path.join(self.temp_dir, 'source_dir') From f491e0e0a9955221b67b09dfdacc109b46eea271 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 23 May 2024 00:32:52 -0700 Subject: [PATCH 11/22] Modifying Chunk size --- codalab/lib/beam/MultiReaderFileStream.py | 2 +- codalab/lib/upload_manager.py | 2 +- tests/unit/lib/upload_manager_test.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index 631f82c51..4cfec3f20 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -50,7 +50,7 @@ def read(self, index: int, num_bytes=0): # type: ignore if new_bytes_needed > 0: self._fill_buf_bytes(new_bytes_needed) while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: - time.sleep(.1) # 100 ms + time.sleep(10) # 100 ms with self._lock: old_position = self._pos[index] - self._buffer_pos diff --git a/codalab/lib/upload_manager.py b/codalab/lib/upload_manager.py index b2d1d2754..4efb1d358 100644 --- a/codalab/lib/upload_manager.py +++ b/codalab/lib/upload_manager.py @@ -255,7 +255,7 @@ def write_fileobj( conn_str = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '') os.environ['AZURE_STORAGE_CONNECTION_STRING'] = bundle_conn_str try: - CHUNK_SIZE = 16 * 1024 + CHUNK_SIZE = 1024 * 1024 def upload_file_content(): iteration = 0 diff --git a/tests/unit/lib/upload_manager_test.py b/tests/unit/lib/upload_manager_test.py index 65f09a8b3..ce2312324 100644 --- a/tests/unit/lib/upload_manager_test.py +++ b/tests/unit/lib/upload_manager_test.py @@ -196,7 +196,7 @@ def test_upload_memory(self): interval=0.1, timeout=1 ) - self.assertEqual(max(memory_usage) < 40000000, True) + self.assertEqual(max(memory_usage) < 90000000, True) def write_string_to_file(self, string, file_path): with open(file_path, 'w') as f: From 8b01add630d19fca8b1705f86404af54ab706479 Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Mon, 27 May 2024 00:05:22 -0700 Subject: [PATCH 12/22] Using getmembers to get tarinfo --- tests/unit/lib/upload_manager_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/lib/upload_manager_test.py b/tests/unit/lib/upload_manager_test.py index ce2312324..c94a8c283 100644 --- a/tests/unit/lib/upload_manager_test.py +++ b/tests/unit/lib/upload_manager_test.py @@ -50,7 +50,7 @@ def check_file_size(self): with FileSystems.open( self.bundle_location, compression_type=CompressionTypes.UNCOMPRESSED ) as f, tarfile.open(fileobj=f, mode='r:gz') as tf: - return [tarinfo.size for tarinfo in tf] + return [tarinfo.size for tarinfo in tf.getmembers()] @property def bundle_location(self): From dedf6ca7e39a0b15d8d1cef24e692b081dc30b7f Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Mon, 27 May 2024 01:11:12 -0700 Subject: [PATCH 13/22] Added multireaderfilestream tests --- codalab/lib/beam/MultiReaderFileStream.py | 2 +- tests/unit/beam/multireaderfilestream_test.py | 148 ++++++++++++++++++ tests/unit/lib/upload_manager_test.py | 2 +- 3 files changed, 150 insertions(+), 2 deletions(-) create mode 100644 tests/unit/beam/multireaderfilestream_test.py diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index 4cfec3f20..e4cc5fccd 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -7,7 +7,7 @@ class MultiReaderFileStream(BytesIO): """ NUM_READERS = 2 LOOKBACK_LENGTH = 33554432 - MAX_THRESHOLD = LOOKBACK_LENGTH * 4 + MAX_THRESHOLD = LOOKBACK_LENGTH * 2 def __init__(self, fileobj): self._buffer = bytes() diff --git a/tests/unit/beam/multireaderfilestream_test.py b/tests/unit/beam/multireaderfilestream_test.py new file mode 100644 index 000000000..286c69aa7 --- /dev/null +++ b/tests/unit/beam/multireaderfilestream_test.py @@ -0,0 +1,148 @@ +import tempfile +import time +import unittest + +from threading import Thread + +from codalab.lib.beam.MultiReaderFileStream import MultiReaderFileStream + +FILESIZE = 100000000 +CHUNKSIZE = FILESIZE/10 + +class MultiReaderFileStreamTest(unittest.TestCase): + def write_file_of_size(self, size: int, file_path: str): + with open(file_path, "wb") as f: + f.seek(size - 1) + f.write(b"\0") + + + def test_reader_distance(self): + with tempfile.NamedTemporaryFile(delete=True) as f: + f.seek(FILESIZE - 1) + f.write(b"\0") + + m_stream = MultiReaderFileStream(f) + reader_1 = m_stream[0] + reader_2 = m_stream[1] + + def thread1(): + while True: + status = reader_1.read(CHUNKSIZE) + if not status: + break + + def thread2(): + # This reader will only read 4/10 of the file + for _ in range(4): + status = reader_2.read(CHUNKSIZE) + + t1 = Thread(target=thread1) + t2 = Thread(target=thread2) + + t1.start() + + # Sleep a little for thread 1 to start reading + time.sleep(3) + + # Assert that the first reader has not read past the Maximum threshold + self.assertGreater(70000000, m_stream._pos[0]) + + t2.start() + + # Sleep a little for thread 2 to start reading + time.sleep(3) + + # Assert that the first reader is at 100000000, second reader is at 40000000 + self.assertEqual(100000000, m_stream._pos[0]) + self.assertEqual(40000000, m_stream._pos[1]) + + # Assert that the buffer is at 6445568 (40000000 - LOOKBACK_LENGTH) + self.assertEqual(6445568, m_stream._buffer_pos) + + # Assert that the buffer is length 100000000 - 6445568 + self.assertEqual(93554432, m_stream._size) + + t1.join() + t2.join() + + def test_seek(self): + with tempfile.NamedTemporaryFile(delete=True) as f: + f.seek(FILESIZE - 1) + f.write(b"\0") + + m_stream = MultiReaderFileStream(f) + reader_1 = m_stream[0] + reader_2 = m_stream[1] + + result = None + + def thread1(): + while True: + status = reader_1.read(CHUNKSIZE) + if not status: + break + + def thread2(): + # This reader will only read 4/10 of the file, then seek to 10000000 and read another 4/10 of the file + for _ in range(4): + reader_2.read(CHUNKSIZE) + + try: + reader_2.seek(10000000) + except AssertionError as e: + result = e + + for _ in range(4): + reader_2.read(CHUNKSIZE) + + t1 = Thread(target=thread1) + t2 = Thread(target=thread2) + t1.start() + t2.start() + + t1.join() + t2.join() + + self.assertIsNone(result) + + # Check that reader 2 is at 50000000 and buffer position is correct + self.assertEqual(50000000, m_stream._pos[1]) + self.assertEqual(16445568, m_stream._buffer_pos) + + + def test_toofar_seek(self): + with tempfile.NamedTemporaryFile(delete=True) as f: + f.seek(FILESIZE - 1) + f.write(b"\0") + + m_stream = MultiReaderFileStream(f) + reader_1 = m_stream[0] + reader_2 = m_stream[1] + + result = None + + def thread1(): + while True: + status = reader_1.read(CHUNKSIZE) + if not status: + break + + def thread2(): + # This reader will only read 4/10 of the file, then seek to the beginning + for _ in range(4): + status = reader_2.read(CHUNKSIZE) + + try: + reader_2.seek(0) + except AssertionError as e: + result = e + + t1 = Thread(target=thread1) + t2 = Thread(target=thread2) + t1.start() + t2.start() + + t1.join() + t2.join() + + self.assertIsInstance(result, AssertionError) diff --git a/tests/unit/lib/upload_manager_test.py b/tests/unit/lib/upload_manager_test.py index c94a8c283..761544b0c 100644 --- a/tests/unit/lib/upload_manager_test.py +++ b/tests/unit/lib/upload_manager_test.py @@ -196,7 +196,7 @@ def test_upload_memory(self): interval=0.1, timeout=1 ) - self.assertEqual(max(memory_usage) < 90000000, True) + self.assertEqual(max(memory_usage) < 100000000, True) def write_string_to_file(self, string, file_path): with open(file_path, 'w') as f: From 04b524bd3f90ba99a42a50060de85531d89f1f83 Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Mon, 27 May 2024 01:12:58 -0700 Subject: [PATCH 14/22] Cleanup --- tests/unit/beam/multireaderfilestream_test.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/unit/beam/multireaderfilestream_test.py b/tests/unit/beam/multireaderfilestream_test.py index 286c69aa7..8767408fc 100644 --- a/tests/unit/beam/multireaderfilestream_test.py +++ b/tests/unit/beam/multireaderfilestream_test.py @@ -10,12 +10,6 @@ CHUNKSIZE = FILESIZE/10 class MultiReaderFileStreamTest(unittest.TestCase): - def write_file_of_size(self, size: int, file_path: str): - with open(file_path, "wb") as f: - f.seek(size - 1) - f.write(b"\0") - - def test_reader_distance(self): with tempfile.NamedTemporaryFile(delete=True) as f: f.seek(FILESIZE - 1) From 7f33c6616b149ea7b1ee8ef5817d2a1a8f5f4bbe Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Mon, 27 May 2024 21:03:51 -0700 Subject: [PATCH 15/22] Test fixes --- tests/unit/beam/multireaderfilestream_test.py | 12 ++++++------ tests/unit/lib/upload_manager_test.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/unit/beam/multireaderfilestream_test.py b/tests/unit/beam/multireaderfilestream_test.py index 8767408fc..77a653b4f 100644 --- a/tests/unit/beam/multireaderfilestream_test.py +++ b/tests/unit/beam/multireaderfilestream_test.py @@ -16,8 +16,8 @@ def test_reader_distance(self): f.write(b"\0") m_stream = MultiReaderFileStream(f) - reader_1 = m_stream[0] - reader_2 = m_stream[1] + reader_1 = m_stream.readers[0] + reader_2 = m_stream.readers[1] def thread1(): while True: @@ -65,8 +65,8 @@ def test_seek(self): f.write(b"\0") m_stream = MultiReaderFileStream(f) - reader_1 = m_stream[0] - reader_2 = m_stream[1] + reader_1 = m_stream.readers[0] + reader_2 = m_stream.readers[1] result = None @@ -110,8 +110,8 @@ def test_toofar_seek(self): f.write(b"\0") m_stream = MultiReaderFileStream(f) - reader_1 = m_stream[0] - reader_2 = m_stream[1] + reader_1 = m_stream.readers[0] + reader_2 = m_stream.readers[1] result = None diff --git a/tests/unit/lib/upload_manager_test.py b/tests/unit/lib/upload_manager_test.py index 761544b0c..a357ff6fa 100644 --- a/tests/unit/lib/upload_manager_test.py +++ b/tests/unit/lib/upload_manager_test.py @@ -196,7 +196,7 @@ def test_upload_memory(self): interval=0.1, timeout=1 ) - self.assertEqual(max(memory_usage) < 100000000, True) + self.assertEqual(max(mem_usage) < 100000000, True) def write_string_to_file(self, string, file_path): with open(file_path, 'w') as f: From 8d2a3d63b5210184824ee0ae8bc529304ad0a848 Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Mon, 27 May 2024 22:12:03 -0700 Subject: [PATCH 16/22] Changes requested --- codalab/lib/beam/MultiReaderFileStream.py | 58 ++++++++++++------- tests/unit/beam/multireaderfilestream_test.py | 23 ++++++-- 2 files changed, 54 insertions(+), 27 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index e4cc5fccd..d39d732dd 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -3,19 +3,22 @@ import time class MultiReaderFileStream(BytesIO): """ - FileStream that support multiple readers and seeks backwards + FileStream that supports N readers with the following features and constraints: + - Each reader's postion is tracked + - A buffer of bytes() is stored which stores bytes from the position of the slowest reader + minus a lookback length of 32MiB to the fastest reader + - The fastest reader can be at most 64MiB ahead of the slowest reader, reads made + further than 64MiB will sleep until the slowest reader catches up """ NUM_READERS = 2 - LOOKBACK_LENGTH = 33554432 + LOOKBACK_LENGTH = 33554432 # 32 MiB MAX_THRESHOLD = LOOKBACK_LENGTH * 2 def __init__(self, fileobj): - self._buffer = bytes() + self._buffer = bytes() # Buffer of bytes read from the file object within the limits defined self._buffer_pos = 0 # start position of buffer in the fileobj (min reader position - LOOKBACK LENGTH) - self._size = 0 # size of bytes (for convenience) - # self._pos = MinMaxHeap() # position of each reader self._pos = [0 for _ in range(self.NUM_READERS)] # position of each reader in the fileobj - self._fileobj = fileobj + self._fileobj = fileobj # The original file object the readers are reading from self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffer. class FileStreamReader(BytesIO): def __init__(s, index): @@ -33,46 +36,57 @@ def seek(s, offset, whence=SEEK_SET): self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)] def _fill_buf_bytes(self, num_bytes=None): - # with self._lock: + """ + Fills the buffer with bytes from the fileobj + """ s = self._fileobj.read(num_bytes) if not s: return self._buffer += s - self._size += len(s) def read(self, index: int, num_bytes=0): # type: ignore """Read the specified number of bytes from the associated file. index: index that specifies which reader is reading. """ - # Calculate how many new bytes need to be read + while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: + time.sleep(.1) # 100 ms + with self._lock: + # Calculate how many new bytes need to be read new_bytes_needed = num_bytes - (max(self._pos) - self._pos[index]) if new_bytes_needed > 0: self._fill_buf_bytes(new_bytes_needed) - while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: - time.sleep(10) # 100 ms - with self._lock: - old_position = self._pos[index] - self._buffer_pos - s = self._buffer[old_position:old_position + num_bytes] + # Get the bytes in the buffer that correspond to the read function call + buffer_index = self._pos[index] - self._buffer_pos + s = self._buffer[buffer_index:buffer_index + num_bytes] - # Modify position + # Modify reader position in fileobj self._pos[index] += len(s) - # Update buffer if this reader is the minimum reader - diff = (min(self._pos) - self.LOOKBACK_LENGTH) - self._buffer_pos # calculated min position of buffer minus current min position of buffer + # If this reader is the minimum reader, we can remove some bytes from the beginning of the buffer + # Calculated min position of buffer minus current min position of buffer + diff = (min(self._pos) - self.LOOKBACK_LENGTH) - self._buffer_pos # NOTE: it's possible for diff < 0 if seek backwards occur if diff > 0: self._buffer = self._buffer[diff:] self._buffer_pos += diff - self._size -= diff return s def peek(self, index: int, num_bytes): # type: ignore - pass - # self._fill_buf_bytes(index, num_bytes) - # s = self._bufs[index].peek(num_bytes) - # return s + while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: + time.sleep(.1) # 100 ms + + with self._lock: + # Calculate how many new bytes need to be read + new_bytes_needed = num_bytes - (max(self._pos) - self._pos[index]) + if new_bytes_needed > 0: + self._fill_buf_bytes(new_bytes_needed) + + # Get the bytes in the buffer that correspond to the read function call + buffer_index = self._pos[index] - self._buffer_pos + s = self._buffer[buffer_index:buffer_index + num_bytes] + return s def seek(self, index: int, offset: int, whence=SEEK_SET): if whence == SEEK_END: diff --git a/tests/unit/beam/multireaderfilestream_test.py b/tests/unit/beam/multireaderfilestream_test.py index 77a653b4f..41f0b59e4 100644 --- a/tests/unit/beam/multireaderfilestream_test.py +++ b/tests/unit/beam/multireaderfilestream_test.py @@ -11,6 +11,10 @@ class MultiReaderFileStreamTest(unittest.TestCase): def test_reader_distance(self): + """ + This test verifies that both readers in the Multireaderfilestream + are within the limits defined in the class + """ with tempfile.NamedTemporaryFile(delete=True) as f: f.seek(FILESIZE - 1) f.write(b"\0") @@ -38,13 +42,13 @@ def thread2(): # Sleep a little for thread 1 to start reading time.sleep(3) - # Assert that the first reader has not read past the Maximum threshold + # Assert that the first reader has not read past the maximum threshold self.assertGreater(70000000, m_stream._pos[0]) t2.start() # Sleep a little for thread 2 to start reading - time.sleep(3) + time.sleep(1) # Assert that the first reader is at 100000000, second reader is at 40000000 self.assertEqual(100000000, m_stream._pos[0]) @@ -54,12 +58,16 @@ def thread2(): self.assertEqual(6445568, m_stream._buffer_pos) # Assert that the buffer is length 100000000 - 6445568 - self.assertEqual(93554432, m_stream._size) + self.assertEqual(93554432, len(m_stream._buffer)) t1.join() t2.join() - def test_seek(self): + def test_backwards_seek(self): + """ + This test verifies that a backwards seek within the lookback length + defined in the Multireaderfilestream class behaves as expected + """ with tempfile.NamedTemporaryFile(delete=True) as f: f.seek(FILESIZE - 1) f.write(b"\0") @@ -105,6 +113,11 @@ def thread2(): def test_toofar_seek(self): + """ + This test verifies that a backwards seek past the lookback length + defined in the Multireaderfilestream class behaves as expected with + an AssertionError + """ with tempfile.NamedTemporaryFile(delete=True) as f: f.seek(FILESIZE - 1) f.write(b"\0") @@ -124,7 +137,7 @@ def thread1(): def thread2(): # This reader will only read 4/10 of the file, then seek to the beginning for _ in range(4): - status = reader_2.read(CHUNKSIZE) + reader_2.read(CHUNKSIZE) try: reader_2.seek(0) From adf0f9e7f5a7a70431395a7aa21a9005af82a224 Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Mon, 27 May 2024 22:22:57 -0700 Subject: [PATCH 17/22] Changes for clarity --- codalab/lib/beam/MultiReaderFileStream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index d39d732dd..99c31b15b 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -53,7 +53,8 @@ def read(self, index: int, num_bytes=0): # type: ignore with self._lock: # Calculate how many new bytes need to be read - new_bytes_needed = num_bytes - (max(self._pos) - self._pos[index]) + new_pos = self._pos[index] + num_bytes + new_bytes_needed = new_pos - max(self._pos) if new_bytes_needed > 0: self._fill_buf_bytes(new_bytes_needed) From fae43356031554a8ad7f7a89106504a78ab9bc66 Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Mon, 27 May 2024 22:23:12 -0700 Subject: [PATCH 18/22] More changes for clarity --- codalab/lib/beam/MultiReaderFileStream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index 99c31b15b..f4fa8ef92 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -80,7 +80,8 @@ def peek(self, index: int, num_bytes): # type: ignore with self._lock: # Calculate how many new bytes need to be read - new_bytes_needed = num_bytes - (max(self._pos) - self._pos[index]) + new_pos = self._pos[index] + num_bytes + new_bytes_needed = new_pos - max(self._pos) if new_bytes_needed > 0: self._fill_buf_bytes(new_bytes_needed) From 9571b3042f289a8b1a87869f22183ba890354b4e Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 30 May 2024 01:23:13 -0700 Subject: [PATCH 19/22] Better code structure + comments --- codalab/lib/beam/MultiReaderFileStream.py | 32 ++++++++----------- codalab/lib/upload_manager.py | 1 + tests/unit/beam/multireaderfilestream_test.py | 4 +-- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index f4fa8ef92..0d985ffaf 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -35,7 +35,7 @@ def seek(s, offset, whence=SEEK_SET): self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)] - def _fill_buf_bytes(self, num_bytes=None): + def _fill_buf_bytes(self, num_bytes=0): """ Fills the buffer with bytes from the fileobj """ @@ -44,24 +44,17 @@ def _fill_buf_bytes(self, num_bytes=None): return self._buffer += s - def read(self, index: int, num_bytes=0): # type: ignore + + def read(self, index: int, num_bytes=None): # type: ignore """Read the specified number of bytes from the associated file. index: index that specifies which reader is reading. """ - while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: - time.sleep(.1) # 100 ms - - with self._lock: - # Calculate how many new bytes need to be read - new_pos = self._pos[index] + num_bytes - new_bytes_needed = new_pos - max(self._pos) - if new_bytes_needed > 0: - self._fill_buf_bytes(new_bytes_needed) - - # Get the bytes in the buffer that correspond to the read function call - buffer_index = self._pos[index] - self._buffer_pos - s = self._buffer[buffer_index:buffer_index + num_bytes] + if num_bytes == None: + # Read remaining in buffer + num_bytes = (self._buffer_pos + len(self._buffer)) - self._pos[index] + s = self.peek(index, num_bytes) + with self._lock: # Modify reader position in fileobj self._pos[index] += len(s) @@ -74,20 +67,21 @@ def read(self, index: int, num_bytes=0): # type: ignore self._buffer_pos += diff return s - def peek(self, index: int, num_bytes): # type: ignore - while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: + def peek(self, index: int, num_bytes: int): # type: ignore + new_pos = self._pos[index] + num_bytes + while (new_pos) - self._buffer_pos > self.MAX_THRESHOLD: time.sleep(.1) # 100 ms with self._lock: # Calculate how many new bytes need to be read - new_pos = self._pos[index] + num_bytes new_bytes_needed = new_pos - max(self._pos) if new_bytes_needed > 0: self._fill_buf_bytes(new_bytes_needed) - + # Get the bytes in the buffer that correspond to the read function call buffer_index = self._pos[index] - self._buffer_pos s = self._buffer[buffer_index:buffer_index + num_bytes] + return s def seek(self, index: int, offset: int, whence=SEEK_SET): diff --git a/codalab/lib/upload_manager.py b/codalab/lib/upload_manager.py index 4efb1d358..b45f3389e 100644 --- a/codalab/lib/upload_manager.py +++ b/codalab/lib/upload_manager.py @@ -255,6 +255,7 @@ def write_fileobj( conn_str = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '') os.environ['AZURE_STORAGE_CONNECTION_STRING'] = bundle_conn_str try: + # Chunk size set to 1MiB for performance CHUNK_SIZE = 1024 * 1024 def upload_file_content(): diff --git a/tests/unit/beam/multireaderfilestream_test.py b/tests/unit/beam/multireaderfilestream_test.py index 41f0b59e4..5369ec09e 100644 --- a/tests/unit/beam/multireaderfilestream_test.py +++ b/tests/unit/beam/multireaderfilestream_test.py @@ -40,7 +40,7 @@ def thread2(): t1.start() # Sleep a little for thread 1 to start reading - time.sleep(3) + time.sleep(.5) # Assert that the first reader has not read past the maximum threshold self.assertGreater(70000000, m_stream._pos[0]) @@ -48,7 +48,7 @@ def thread2(): t2.start() # Sleep a little for thread 2 to start reading - time.sleep(1) + time.sleep(.5) # Assert that the first reader is at 100000000, second reader is at 40000000 self.assertEqual(100000000, m_stream._pos[0]) From c3fd7ae96a914c188395146e951acd4877c8648b Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Thu, 30 May 2024 19:49:14 -0700 Subject: [PATCH 20/22] Even more changes for clarity --- codalab/lib/beam/MultiReaderFileStream.py | 29 +++++++++---------- tests/unit/beam/multireaderfilestream_test.py | 22 +++++++------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index 0d985ffaf..d3a9457a5 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -1,22 +1,21 @@ +import time + from io import BytesIO, SEEK_SET, SEEK_END from threading import Lock -import time class MultiReaderFileStream(BytesIO): """ - FileStream that supports N readers with the following features and constraints: + FileStream that takes an input stream fileobj, and supports N readers with the following features and constraints: - Each reader's postion is tracked - A buffer of bytes() is stored which stores bytes from the position of the slowest reader - minus a lookback length of 32MiB to the fastest reader - - The fastest reader can be at most 64MiB ahead of the slowest reader, reads made + minus a LOOKBACK_LENGTH (default 32 MiB) to the fastest reader + - The fastest reader can be at most MAX_THRESHOLD (default 64 MiB) ahead of the slowest reader, reads made further than 64MiB will sleep until the slowest reader catches up """ NUM_READERS = 2 - LOOKBACK_LENGTH = 33554432 # 32 MiB - MAX_THRESHOLD = LOOKBACK_LENGTH * 2 - def __init__(self, fileobj): + def __init__(self, fileobj, lookback_length=32*1024*1024): self._buffer = bytes() # Buffer of bytes read from the file object within the limits defined - self._buffer_pos = 0 # start position of buffer in the fileobj (min reader position - LOOKBACK LENGTH) + self._buffer_start_pos = 0 # start position of buffer in the fileobj (min reader position - LOOKBACK LENGTH) self._pos = [0 for _ in range(self.NUM_READERS)] # position of each reader in the fileobj self._fileobj = fileobj # The original file object the readers are reading from self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffer. @@ -34,6 +33,8 @@ def seek(s, offset, whence=SEEK_SET): return self.seek(s._index, offset, whence) self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)] + self.LOOKBACK_LENGTH = lookback_length + self.MAX_THRESHOLD = self.LOOKBACK_LENGTH * 2 def _fill_buf_bytes(self, num_bytes=0): """ @@ -45,14 +46,10 @@ def _fill_buf_bytes(self, num_bytes=0): self._buffer += s - def read(self, index: int, num_bytes=None): # type: ignore + def read(self, index: int, num_bytes: int): # type: ignore """Read the specified number of bytes from the associated file. index: index that specifies which reader is reading. """ - if num_bytes == None: - # Read remaining in buffer - num_bytes = (self._buffer_pos + len(self._buffer)) - self._pos[index] - s = self.peek(index, num_bytes) with self._lock: # Modify reader position in fileobj @@ -60,7 +57,7 @@ def read(self, index: int, num_bytes=None): # type: ignore # If this reader is the minimum reader, we can remove some bytes from the beginning of the buffer # Calculated min position of buffer minus current min position of buffer - diff = (min(self._pos) - self.LOOKBACK_LENGTH) - self._buffer_pos + diff = (min(self._pos) - self.LOOKBACK_LENGTH) - self._buffer_start_pos # NOTE: it's possible for diff < 0 if seek backwards occur if diff > 0: self._buffer = self._buffer[diff:] @@ -69,7 +66,7 @@ def read(self, index: int, num_bytes=None): # type: ignore def peek(self, index: int, num_bytes: int): # type: ignore new_pos = self._pos[index] + num_bytes - while (new_pos) - self._buffer_pos > self.MAX_THRESHOLD: + while new_pos - self._buffer_start_pos > self.MAX_THRESHOLD: time.sleep(.1) # 100 ms with self._lock: @@ -88,7 +85,7 @@ def seek(self, index: int, offset: int, whence=SEEK_SET): if whence == SEEK_END: super().seek(offset, whence) else: - assert offset >= self._buffer_pos + assert offset >= self._buffer_start_pos self._pos[index] = offset def close(self): diff --git a/tests/unit/beam/multireaderfilestream_test.py b/tests/unit/beam/multireaderfilestream_test.py index 5369ec09e..cfeb372ce 100644 --- a/tests/unit/beam/multireaderfilestream_test.py +++ b/tests/unit/beam/multireaderfilestream_test.py @@ -12,8 +12,9 @@ class MultiReaderFileStreamTest(unittest.TestCase): def test_reader_distance(self): """ - This test verifies that both readers in the Multireaderfilestream - are within the limits defined in the class + This test verifies that both readers in the MultiReaderFileStream + are within the limits defined in the class: + - Slowest reader is at most MAX_THRESHOLD behind the fastest reader """ with tempfile.NamedTemporaryFile(delete=True) as f: f.seek(FILESIZE - 1) @@ -43,7 +44,7 @@ def thread2(): time.sleep(.5) # Assert that the first reader has not read past the maximum threshold - self.assertGreater(70000000, m_stream._pos[0]) + self.assertGreater(m_stream.MAX_THRESHOLD + 1, m_stream._pos[0]) t2.start() @@ -51,14 +52,15 @@ def thread2(): time.sleep(.5) # Assert that the first reader is at 100000000, second reader is at 40000000 - self.assertEqual(100000000, m_stream._pos[0]) + self.assertEqual(FILESIZE, m_stream._pos[0]) self.assertEqual(40000000, m_stream._pos[1]) # Assert that the buffer is at 6445568 (40000000 - LOOKBACK_LENGTH) - self.assertEqual(6445568, m_stream._buffer_pos) + calculated_buffer_start_pos = 40000000 - m_stream.LOOKBACK_LENGTH + self.assertEqual(calculated_buffer_start_pos, m_stream._buffer_start_pos) # Assert that the buffer is length 100000000 - 6445568 - self.assertEqual(93554432, len(m_stream._buffer)) + self.assertEqual(FILESIZE - calculated_buffer_start_pos, len(m_stream._buffer)) t1.join() t2.join() @@ -66,7 +68,7 @@ def thread2(): def test_backwards_seek(self): """ This test verifies that a backwards seek within the lookback length - defined in the Multireaderfilestream class behaves as expected + defined in the MultiReaderFileStream class behaves as expected """ with tempfile.NamedTemporaryFile(delete=True) as f: f.seek(FILESIZE - 1) @@ -109,13 +111,13 @@ def thread2(): # Check that reader 2 is at 50000000 and buffer position is correct self.assertEqual(50000000, m_stream._pos[1]) - self.assertEqual(16445568, m_stream._buffer_pos) + self.assertEqual(50000000 - m_stream.LOOKBACK_LENGTH, m_stream._buffer_start_pos) - def test_toofar_seek(self): + def test_too_far_seek(self): """ This test verifies that a backwards seek past the lookback length - defined in the Multireaderfilestream class behaves as expected with + defined in the MultiReaderFileStream class behaves as expected with an AssertionError """ with tempfile.NamedTemporaryFile(delete=True) as f: From 7984f8a8993dd0786af3e3a363a3617a9d25410a Mon Sep 17 00:00:00 2001 From: Daniel Ma Date: Thu, 30 May 2024 20:11:57 -0700 Subject: [PATCH 21/22] Lower iterations per disk check to match chunk size --- codalab/lib/upload_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codalab/lib/upload_manager.py b/codalab/lib/upload_manager.py index b45f3389e..c4e167436 100644 --- a/codalab/lib/upload_manager.py +++ b/codalab/lib/upload_manager.py @@ -260,7 +260,7 @@ def write_fileobj( def upload_file_content(): iteration = 0 - ITERATIONS_PER_DISK_CHECK = 2000 + ITERATIONS_PER_DISK_CHECK = 32 bytes_uploaded = 0 with FileSystems.create( From 3e584df296075ea2615fa2ac5d8ec5a7845b7d8d Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 30 May 2024 22:53:12 -0700 Subject: [PATCH 22/22] Change names --- codalab/lib/beam/MultiReaderFileStream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codalab/lib/beam/MultiReaderFileStream.py b/codalab/lib/beam/MultiReaderFileStream.py index d3a9457a5..923b4abf1 100644 --- a/codalab/lib/beam/MultiReaderFileStream.py +++ b/codalab/lib/beam/MultiReaderFileStream.py @@ -61,7 +61,7 @@ def read(self, index: int, num_bytes: int): # type: ignore # NOTE: it's possible for diff < 0 if seek backwards occur if diff > 0: self._buffer = self._buffer[diff:] - self._buffer_pos += diff + self._buffer_start_pos += diff return s def peek(self, index: int, num_bytes: int): # type: ignore @@ -76,7 +76,7 @@ def peek(self, index: int, num_bytes: int): # type: ignore self._fill_buf_bytes(new_bytes_needed) # Get the bytes in the buffer that correspond to the read function call - buffer_index = self._pos[index] - self._buffer_pos + buffer_index = self._pos[index] - self._buffer_start_pos s = self._buffer[buffer_index:buffer_index + num_bytes] return s