Skip to content

Commit

Permalink
Expose an API to handle file offset tables (#1207)
Browse files Browse the repository at this point in the history
With this commit we encapsulate handling of file offset tables for data
files in a dedicated class. This class implements the necessary
low-level handling only exposing a higher-level API for consumers.

Closes #1204
  • Loading branch information
danielmitterdorfer authored Mar 15, 2021
1 parent beb543a commit 90198c7
Showing 1 changed file with 117 additions and 20 deletions.
137 changes: 117 additions & 20 deletions esrally/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,109 @@ def has_extension(file_name, extension):
return ext == extension


class FileOffsetTable:
"""
The FileOffsetTable represents a persistent mapping from lines in a data file to their offset in bytes in the
data file. This helps bulk-indexing clients to advance quickly to a certain position in a large data file.
"""
def __init__(self, data_file_path, offset_table_path, mode):
"""
Creates a new FileOffsetTable instance. The constructor should not be called directly but instead the
respective factory methods should be used.
:param data_file_path: The absolute path to the data file. This file is assumed to exist at this point.
:param offset_table_path: The absolute path to the corresponding offset table file. Only required to exist
for read operations on the data file.
:param mode: The mode in which the file offset table should be opened.
"""
self.data_file_path = data_file_path
self.offset_table_path = offset_table_path
self.mode = mode
self.offset_file = None

def exists(self):
"""
:return: True iff the file offset table already exists.
"""
return os.path.exists(self.offset_table_path)

def is_valid(self):
"""
:return: True iff the file offset table exists and it is up-to-date.
"""
return self.exists() and os.path.getmtime(self.offset_table_path) >= os.path.getmtime(self.data_file_path)

def __enter__(self):
self.offset_file = open(self.offset_table_path, self.mode)
return self

def add_offset(self, line_number, offset):
"""
Adds a new offset mapping to the file offset table. This method has to be called inside a context-manager block.
:param line_number: A line number to add.
:param offset: The corresponding offset in bytes.
"""
print(f"{line_number};{offset}", file=self.offset_file)

def find_closest_offset(self, target_line_number):
"""
Determines the offset in bytes for the line L in the corresponding data file with the following properties:
* L <= target_line_number
* For any line M, where M != L and M <= target_line_number: M > L (i.e. L is the closest match)
:param target_line_number: A positive number representing a line number in the data file.
:return: A tuple of file offset in bytes to the line with the closest match and the number of lines that
still need to be skipped.
"""
prior_offset = 0
prior_remaining_lines = target_line_number

for line in self.offset_file:
line_number, offset_in_bytes = [int(i) for i in line.strip().split(";")]
if line_number <= target_line_number:
prior_offset = offset_in_bytes
prior_remaining_lines = target_line_number - line_number
else:
break

return prior_offset, prior_remaining_lines

def __exit__(self, exc_type, exc_val, exc_tb):
self.offset_file.close()
self.offset_file = None
return False

@classmethod
def create_for_data_file(cls, data_file_path):
"""
Factory method to create a new file offset table.
:param data_file_path: The absolute path to the data file for which a file offset table should be created.
"""
return cls(data_file_path, f"{data_file_path}.offset", "wt")

@classmethod
def read_for_data_file(cls, data_file_path):
"""
Factory method to read from an existing file offset table.
:param data_file_path: The absolute path to the data file for which the file offset table should be read.
"""
return cls(data_file_path, f"{data_file_path}.offset", "rt")

@staticmethod
def remove(data_file_path):
"""
Removes a file offset table for the provided data path.
:param data_file_path: The absolute path to the data file for which the file offset table should be deleted.
"""
os.remove(f"{data_file_path}.offset")


def prepare_file_offset_table(data_file_path):
"""
Creates a file that contains a mapping from line numbers to file offsets for the provided path. This file is used internally by
Expand All @@ -413,20 +516,19 @@ def prepare_file_offset_table(data_file_path):
:param data_file_path: The path to a text file that is readable by this process.
:return The number of lines read or ``None`` if it did not have to build the file offset table.
"""
offset_file_path = "%s.offset" % data_file_path
# recreate only if necessary as this can be time-consuming
if not os.path.exists(offset_file_path) or os.path.getmtime(offset_file_path) < os.path.getmtime(data_file_path):
file_offset_table = FileOffsetTable.create_for_data_file(data_file_path)
if not file_offset_table.is_valid():
console.info("Preparing file offset table for [%s] ... " % data_file_path, end="", flush=True)
line_number = 0
with open(offset_file_path, mode="wt", encoding="utf-8") as offset_file:
with file_offset_table:
with open(data_file_path, mode="rt", encoding="utf-8") as data_file:
while True:
line = data_file.readline()
if len(line) == 0:
break
line_number += 1
if line_number % 50000 == 0:
print("%d;%d" % (line_number, data_file.tell()), file=offset_file)
file_offset_table.add_offset(line_number, data_file.tell())
console.println("[OK]")
return line_number
else:
Expand All @@ -440,8 +542,7 @@ def remove_file_offset_table(data_file_path):
:param data_file_path: The path to a text file that is readable by this process.
"""
offset_file_path = "%s.offset" % data_file_path
os.remove(offset_file_path)
FileOffsetTable.remove(data_file_path)


def skip_lines(data_file_path, data_file, number_of_lines_to_skip):
Expand All @@ -455,24 +556,20 @@ def skip_lines(data_file_path, data_file, number_of_lines_to_skip):
if number_of_lines_to_skip == 0:
return

offset_file_path = "%s.offset" % data_file_path
offset = 0
remaining_lines = number_of_lines_to_skip
file_offset_table = FileOffsetTable.read_for_data_file(data_file_path)
# can we fast forward?
if os.path.exists(offset_file_path):
with open(offset_file_path, mode="rt", encoding="utf-8") as offsets:
for line in offsets:
line_number, offset_in_bytes = [int(i) for i in line.strip().split(";")]
if line_number <= number_of_lines_to_skip:
offset = offset_in_bytes
remaining_lines = number_of_lines_to_skip - line_number
else:
break
if file_offset_table.exists():
with file_offset_table:
offset, remaining_lines = file_offset_table.find_closest_offset(number_of_lines_to_skip)
else:
offset = 0
remaining_lines = number_of_lines_to_skip

# fast forward to the last known file offset
data_file.seek(offset)
# forward the last remaining lines if needed
if remaining_lines > 0:
for line in range(remaining_lines):
for _ in range(remaining_lines):
data_file.readline()


Expand Down

0 comments on commit 90198c7

Please sign in to comment.