Skip to content

Commit

Permalink
Introduce a worker global cache
Browse files Browse the repository at this point in the history
introduce a global_cache to reduce downloads from github and the net server if
multiple workers having a shared filesystem are run by one user. This should
allow a fleet to run without overloading github.

If the user specifies the path that corresponds to the global cache, the
network and the source zipball are cached.  They are used if available, and
only downloaded (and stored) if not.

Storing the data in the cache has been done in such a way that it becomes
available atomically on most file systems.
  • Loading branch information
vondele authored and ppigazzini committed Jul 3, 2024
1 parent 61ec81b commit 2655607
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 19 deletions.
2 changes: 1 addition & 1 deletion server/fishtest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
according to the route/URL mapping defined in `__init__.py`.
"""

WORKER_VERSION = 239
WORKER_VERSION = 240


@exception_view_config(HTTPException)
Expand Down
97 changes: 82 additions & 15 deletions worker/games.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,41 @@ def send_sigint(p):
p.send_signal(signal.SIGINT)


def cache_read(cache, name):
"""Read a binary blob of data from a global cache on disk, None if not available"""
if cache == "":
return None

try:
return (Path(cache) / name).read_bytes()
except Exception as e:
return None


def cache_write(cache, name, data):
"""Write a binary blob of data to a global cache on disk in an atomic way, skip if not available"""
if cache == "":
return

try:
temp_file = tempfile.NamedTemporaryFile(dir=cache, delete=False)
temp_file.write(data)
temp_file.flush()
os.fsync(temp_file.fileno()) # Ensure data is written to disk
temp_file.close()

# try linking, which is atomic, and will fail if the file exists
try:
os.link(temp_file.name, Path(cache) / name)
except OSError:
pass

# Remove the temporary file
os.remove(temp_file.name)
except Exception as e:
return


# See https://stackoverflow.com/questions/16511337/correct-way-to-try-except-using-python-requests-module
# for background.
# It may be useful to introduce more refined http exception handling in the future.
Expand Down Expand Up @@ -286,25 +321,32 @@ def required_nets_from_source():
return nets


def download_net(remote, testing_dir, net):
url = remote + "/api/nn/" + net
print("Downloading {}".format(net))
r = requests_get(url, allow_redirects=True, timeout=HTTP_TIMEOUT)
(testing_dir / net).write_bytes(r.content)
def download_net(remote, testing_dir, net, global_cache):
content = cache_read(global_cache, net)

if content is None:
url = remote + "/api/nn/" + net
print("Downloading {}".format(net))
content = requests_get(url, allow_redirects=True, timeout=HTTP_TIMEOUT).content
cache_write(global_cache, net, content)
else:
print("Using {} from global cache".format(net))

(testing_dir / net).write_bytes(content)


def validate_net(testing_dir, net):
hash = hashlib.sha256((testing_dir / net).read_bytes()).hexdigest()
return hash[:12] == net[3:15]


def establish_validated_net(remote, testing_dir, net):
def establish_validated_net(remote, testing_dir, net, global_cache):
if not (testing_dir / net).exists() or not validate_net(testing_dir, net):
attempt = 0
while True:
try:
attempt += 1
download_net(remote, testing_dir, net)
download_net(remote, testing_dir, net, global_cache)
if not validate_net(testing_dir, net):
raise WorkerException(
"Failed to validate the network: {}".format(net)
Expand Down Expand Up @@ -666,22 +708,37 @@ def find_arch(compiler):


def setup_engine(
destination, worker_dir, testing_dir, remote, sha, repo_url, concurrency, compiler
destination,
worker_dir,
testing_dir,
remote,
sha,
repo_url,
concurrency,
compiler,
global_cache,
):
"""Download and build sources in a temporary directory then move exe to destination"""
tmp_dir = Path(tempfile.mkdtemp(dir=worker_dir))

try:
item_url = github_api(repo_url) + "/zipball/" + sha
print("Downloading {}".format(item_url))
blob = requests_get(item_url).content
blob = cache_read(global_cache, sha + ".zip")

if blob is None:
item_url = github_api(repo_url) + "/zipball/" + sha
print("Downloading {}".format(item_url))
blob = requests_get(item_url).content
cache_write(global_cache, sha + ".zip", blob)
else:
print("Using {} from global cache".format(sha + ".zip"))

file_list = unzip(blob, tmp_dir)
prefix = os.path.commonprefix([n.filename for n in file_list])
os.chdir(tmp_dir / prefix / "src")

for net in required_nets_from_source():
print("Build uses default net:", net)
establish_validated_net(remote, testing_dir, net)
establish_validated_net(remote, testing_dir, net, global_cache)
shutil.copyfile(testing_dir / net, net)

arch = find_arch(compiler)
Expand Down Expand Up @@ -1182,7 +1239,15 @@ def launch_cutechess(


def run_games(
worker_info, current_state, password, remote, run, task_id, pgn_file, clear_binaries
worker_info,
current_state,
password,
remote,
run,
task_id,
pgn_file,
clear_binaries,
global_cache,
):
# This is the main cutechess-cli driver.
# It is ok, and even expected, for this function to
Expand Down Expand Up @@ -1316,6 +1381,7 @@ def parse_options(s):
repo_url,
worker_info["concurrency"],
worker_info["compiler"],
global_cache,
)
if not base_engine.exists():
setup_engine(
Expand All @@ -1327,6 +1393,7 @@ def parse_options(s):
repo_url,
worker_info["concurrency"],
worker_info["compiler"],
global_cache,
)

os.chdir(testing_dir)
Expand Down Expand Up @@ -1360,11 +1427,11 @@ def parse_options(s):
# Add EvalFile* with full path to cutechess options, and download the networks if missing.
for option, net in required_nets(base_engine).items():
base_options.append("option.{}={}".format(option, net))
establish_validated_net(remote, testing_dir, net)
establish_validated_net(remote, testing_dir, net, global_cache)

for option, net in required_nets(new_engine).items():
new_options.append("option.{}={}".format(option, net))
establish_validated_net(remote, testing_dir, net)
establish_validated_net(remote, testing_dir, net, global_cache)

# PGN files output setup.
pgn_name = "results-" + worker_info["unique_key"] + ".pgn"
Expand Down
2 changes: 1 addition & 1 deletion worker/sri.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"__version": 239, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "+ubGHk3rIV0ILVhg1dxpsOkRF8GUaO5otPVnAyw8kKKq9Rqzksv02xj6wjYpSTmA", "games.py": "6vKH51UtL56oNvA539hLXRzgE1ADXy3QZNJohoK94RntM72+iMancSJZHaNjEb5+"}
{"__version": 240, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "UfgMIw5Cx6YM7wplqw+zQ6fqQQnaaUmFDG+1AQ4pY5+dw7fnAvUjzbw4i8lmooUc", "games.py": "9dFaa914vpqT7q4LLx2LlDdYwK6QFVX3h7+XRt18ATX0lt737rvFeBIiqakkttNC"}
25 changes: 23 additions & 2 deletions worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
MIN_CLANG_MAJOR = 8
MIN_CLANG_MINOR = 0

WORKER_VERSION = 239
WORKER_VERSION = 240
FILE_LIST = ["updater.py", "worker.py", "games.py"]
HTTP_TIMEOUT = 30.0
INITIAL_RETRY_TIME = 15.0
Expand Down Expand Up @@ -661,6 +661,7 @@ def setup_parameters(worker_dir):
("parameters", "uuid_prefix", "_hw", _alpha_numeric, None),
("parameters", "min_threads", "1", int, None),
("parameters", "fleet", "False", _bool, None),
("parameters", "global_cache", "", str, None),
("parameters", "compiler", default_compiler, compiler_names, None),
("private", "hw_seed", str(random.randint(0, 0xFFFFFFFF)), int, None),
]
Expand Down Expand Up @@ -748,6 +749,17 @@ def _get_help_string(self, action):
choices=[False, True], # useful for usage message
help="if 'True', quit in case of errors or if no task is available",
)
parser.add_argument(
"-g",
"--global_cache",
dest="global_cache",
default=config.get("parameters", "global_cache"),
type=str,
help="""Useful only when running multiple workers concurrently:
an existing absolute path to be used to globally cache on disk
certain downloads, reducing load on github or net server.
A empty string ("") disables using a cache.""",
)
parser.add_argument(
"-C",
"--compiler",
Expand Down Expand Up @@ -869,6 +881,7 @@ def my_error(e):
)
config.set("parameters", "min_threads", str(options.min_threads))
config.set("parameters", "fleet", str(options.fleet))
config.set("parameters", "global_cache", str(options.global_cache))
config.set("parameters", "compiler", options.compiler_)

with open(config_file, "w") as f:
Expand Down Expand Up @@ -1345,7 +1358,13 @@ def verify_worker_version(remote, username, password):


def fetch_and_handle_task(
worker_info, password, remote, lock_file, current_state, clear_binaries
worker_info,
password,
remote,
lock_file,
current_state,
clear_binaries,
global_cache,
):
# This function should normally not raise exceptions.
# Unusual conditions are handled by returning False.
Expand Down Expand Up @@ -1443,6 +1462,7 @@ def fetch_and_handle_task(
task_id,
pgn_file,
clear_binaries,
global_cache,
)
success = True
except FatalException as e:
Expand Down Expand Up @@ -1675,6 +1695,7 @@ def worker():
lock_file,
current_state,
clear_binaries,
options.global_cache,
)
if not current_state["alive"]: # the user may have pressed Ctrl-C...
break
Expand Down

0 comments on commit 2655607

Please sign in to comment.