Skip to content

Commit

Permalink
largely reworked how volumes are attached to servers to be more explicit
Browse files Browse the repository at this point in the history
  • Loading branch information
XaverStiensmeier committed Nov 5, 2024
1 parent eda7ae9 commit 4a598ab
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 56 deletions.
52 changes: 28 additions & 24 deletions bibigrid/core/actions/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import threading
import traceback
from functools import partial
from itertools import count

import paramiko
import sympy
Expand Down Expand Up @@ -219,6 +220,7 @@ def start_vpn_or_master(self, configuration, provider): # pylint: disable=too-m
boot_volume=bool(boot_volume),
terminate_boot_volume=boot_volume.get("terminate", True),
volume_size=boot_volume.get("size", 50))
# description=instance.get("description", configuration.get("description")))
self.add_volume_device_info_to_instance(provider, server, instance)

configuration["private_v4"] = server["private_v4"]
Expand Down Expand Up @@ -267,7 +269,8 @@ def start_worker(self, worker, worker_count, configuration, provider): # pylint
boot_from_volume=boot_volume.get("name", False),
boot_volume=bool(boot_volume),
terminate_boot_volume=boot_volume.get("terminateBoot", True),
volume_size=boot_volume.get("size", 50))
volume_size=boot_volume.get("size", 50),
description=worker.get("description", configuration.get("description")))
self.add_volume_device_info_to_instance(provider, server, worker)

self.log.info(f"Worker {name} started on {provider.cloud_specification['identifier']}.")
Expand All @@ -294,32 +297,30 @@ def create_server_volumes(self, provider, instance, name):
"""
self.log.info("Creating volumes ...")
return_volumes = []

for i, volume in enumerate(instance.get("volumes", [])):
if volume.get("semiPermanent"):
base_volume_name = f"{name}-semiperm-{i}"
else:
base_volume_name = f"{name}-{i}"
if volume.get('snapshot'):
if not volume.get("name"):
volume["name"] = base_volume_name
if not volume.get("exists"):
if volume.get("semiPermanent"):
infix = "semiperm"
elif volume.get("permanent"):
infix = "perm"
else:
volume["name"] = f"{base_volume_name}-{volume['name']}"
return_volume = provider.create_volume_from_snapshot(volume['snapshot'], volume["name"])
if not return_volume:
raise ConfigurationException(f"Snapshot {volume['snapshot']} not found!")
else:
if volume.get('name'):
self.log.debug(f"Trying to find volume {volume['name']}")
return_volume = provider.get_volume_by_id_or_name(volume["name"])
infix = "tmp"
volume["name"] = f"{name}-{infix}-{i}-{volume.get('name')}"

self.log.debug(f"Trying to find volume {volume['name']}")
return_volume = provider.get_volume_by_id_or_name(volume["name"])
if not return_volume:
self.log.debug(f"Volume {volume['name']} not found.")
if volume.get('snapshot'):
self.log.debug("Creating volume from snapshot...")
return_volume = provider.create_volume_from_snapshot(volume['snapshot'], volume["name"])
if not return_volume:
volume["name"] = f"{base_volume_name}-{volume['name']}"
return_volume = provider.create_volume(size=volume.get("size", 50), name=volume['name'])
return_volume["name"] = volume["name"]
raise ConfigurationException(f"Snapshot {volume['snapshot']} not found!")
else:
volume["name"] = base_volume_name
self.log.debug(f"Creating volume {volume['name']}")
return_volume = provider.create_volume(size=volume.get("size", 50), name=volume['name'])
self.log.debug("Passed the point")
self.log.debug("Creating volume...")
return_volume = provider.create_volume(size=volume.get("size", 50), name=volume["name"],
description=f"Created for {name}")
return_volumes.append(return_volume)
return return_volumes

Expand All @@ -341,6 +342,9 @@ def add_volume_device_info_to_instance(self, provider, server, instance):
for volume in volumes:
server_volume = next((server_volume for server_volume in server_volumes if
server_volume["name"] == volume["name"]), None)
if not server_volume:
raise RuntimeError(
f"Created server {server['name']} doesn't have attached volume {volume['name']}.")
device = server_volume.get("device")
final_volumes.append({**volume, "device": device})

Expand All @@ -349,7 +353,7 @@ def add_volume_device_info_to_instance(self, provider, server, instance):
f"{volume.get('mountPoint')}")

ansible_configurator.write_yaml(os.path.join(a_rp.HOST_VARS_FOLDER, f"{server['name']}.yaml"),
{"volumes":final_volumes},
{"volumes": final_volumes},
self.log)

def prepare_vpn_or_master_args(self, configuration):
Expand Down
4 changes: 3 additions & 1 deletion bibigrid/core/actions/terminate.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,16 @@ def delete_tmp_volumes(provider, cluster_id, log):
log.info("Deleting tmp volumes on provider %s...", provider.cloud_specification['identifier'])
volume_list = provider.list_volumes()
cluster_volume_state = []
volume_regex = re.compile(fr"^bibigrid-(master-{cluster_id}|(worker|vpngtw)-{cluster_id}-(\d+))-(\d+|semiperm.*)$")
volume_regex = re.compile(
fr"^bibigrid-(master-{cluster_id}|(worker|vpngtw)-{cluster_id}-(\d+))-(semiperm|tmp)-\d+(-.+)?$")
for volume in volume_list:
if volume_regex.match(volume["name"]):
log.info("Trying to delete volume %s on cloud %s.", volume['name'], provider.cloud_specification[
'identifier'])
cluster_volume_state.append(provider.delete_volume(volume))
return cluster_volume_state


# pylint: disable=too-many-branches
def terminate_output(cluster_server_state, cluster_keypair_state, cluster_security_group_state, cluster_volume_state,
ac_state, cluster_id, log):
Expand Down
14 changes: 12 additions & 2 deletions bibigrid/core/utility/ansible_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ def write_host_and_group_vars(configurations, providers, cluster_id, log): # py

pass_through(configuration, worker_dict, "waitForServices", "wait_for_services")
write_yaml(os.path.join(aRP.GROUP_VARS_FOLDER, f"{group_name}.yaml"), worker_dict, log)
if worker_dict["on_demand"]:
for worker_number in range(worker.get('count', 1)):
name = create.WORKER_IDENTIFIER(cluster_id=cluster_id, additional=worker_count + worker_number)
write_volumes = []
for i, volume in enumerate(worker.get("volumes")):
semiperm_infix = 'semiperm-' if volume.get("semiPermanent") else ''
write_volumes.append({**volume, "name":volume.get("name", f"{name}-{semiperm_infix}{i}")})
write_yaml(os.path.join(aRP.HOST_VARS_FOLDER, f"{name}.yaml"),
{"volumes": write_volumes},
log)
worker_count += worker.get('count', 1)

vpngtw = configuration.get("vpnInstance")
Expand Down Expand Up @@ -125,7 +135,6 @@ def write_host_and_group_vars(configurations, providers, cluster_id, log): # py
"network_cidrs": configuration["subnet_cidrs"], "floating_ip": configuration["floating_ip"],
"flavor": flavor_dict, "private_v4": configuration["private_v4"],
"cloud_identifier": configuration["cloud_identifier"],
"volumes": configuration["masterInstance"]["volumes"],
"fallback_on_other_image": configuration.get("fallbackOnOtherImage", False),
"state": "UNKNOWN" if configuration.get("useMasterAsCompute", True) else "DRAINED",
"on_demand": False,
Expand Down Expand Up @@ -212,8 +221,9 @@ def generate_ansible_hosts_yaml(ssh_user, configurations, cluster_id, log): # p
@return: ansible_hosts yaml (dict)
"""
log.info("Generating ansible hosts file...")
master_name = create.MASTER_IDENTIFIER(cluster_id=cluster_id)
ansible_hosts_yaml = {"vpn": {"hosts": {},
"children": {"master": {"hosts": {"localhost": to_instance_host_dict(ssh_user)}},
"children": {"master": {"hosts": {master_name: to_instance_host_dict(ssh_user)}},
"vpngtw": {"hosts": {}}}}, "workers": {"hosts": {}, "children": {}}}
# vpngtw are handled like workers on this level
workers = ansible_hosts_yaml["workers"]
Expand Down
4 changes: 2 additions & 2 deletions bibigrid/openstack/openstack_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ def get_server(self, name_or_id):
"""
return self.conn.get_server(name_or_id)

def create_volume(self, name, size):
return self.conn.create_volume(size=size, name=name)
def create_volume(self, name, size, description=None):
return self.conn.create_volume(size=size, name=name, description=description)

def delete_volume(self, name_or_id):
return self.conn.delete_volume(name_or_id=name_or_id)
Expand Down
52 changes: 26 additions & 26 deletions resources/playbook/roles/bibigrid/files/slurm/create_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,46 +111,43 @@ def get_server_vars(name):
server_vars = yaml.safe_load(host_vars_file)
logging.info(f"Loaded Vars: {server_vars}")
else:
logging.info(f"No host vars exist (group vars still apply). Using {server_vars}")
logging.info(f"No host vars exist (group vars still apply). Using {server_vars}.")
return server_vars


def create_server_volumes(connection, host_vars, name):
logging.info("Creating volumes ...")
volumes = host_vars.get('volumes', [])
return_volumes = []
host_vars_path = f"/opt/playbook/host_vars/{name}.yaml"

with FileLock(f"{host_vars_path}.lock"):
logging.info(f"Instance Volumes {volumes}")
for i, volume in enumerate(volumes):
logging.info(f"Instance Volumes {volumes}")
for i, volume in enumerate(volumes):
if not volume.get("exists"):
if volume.get("semiPermanent"):
base_volume_name = f"{name}-semiperm-{i}"
infix = "semiperm"
elif volume.get("permanent"):
infix = "perm"
else:
base_volume_name = f"{name}-{i}"
infix = "tmp"
volume_name = f"{name}-{infix}-{i}-{volume.get('name')}"
else:
volume_name = volume["name"]

logging.debug(f"Trying to find volume {volume['name']}")
return_volume = connection.get_volume(volume_name)
if not return_volume:
logging.debug(f"Volume {volume['name']} not found.")

if volume.get('snapshot'):
if not volume.get("name"):
volume["name"] = base_volume_name
else:
volume["name"] = f"{base_volume_name}-{volume['name']}"
return_volume = create_volume_from_snapshot(connection, volume['snapshot'], volume["name"])
logging.debug("Creating volume from snapshot...")
return_volume = create_volume_from_snapshot(connection, volume['snapshot'], volume_name)
if not return_volume:
raise ConfigurationException(f"Snapshot {volume['snapshot']} not found!")
else:
if volume.get('name'):
logging.debug(f"Trying to find volume {volume['name']}")
return_volume = connection.get_volume(volume["name"])
if not return_volume:
volume["name"] = f"{base_volume_name}-{volume['name']}"
return_volume = connection.create_volume(size=volume.get("size", 50), name=volume['name'])
return_volume["name"] = volume["name"]
else:
volume["name"] = base_volume_name
logging.debug(f"Creating volume {volume['name']}")
return_volume = connection.create_volume(size=volume.get("size", 50), name=volume['name'])
return_volumes.append(return_volume)
with open(host_vars_path, mode="w+", encoding="utf-8") as host_vars_file:
yaml.dump(host_vars, host_vars_file)
logging.debug("Creating volume...")
return_volume = connection.create_volume(size=volume.get("size", 50), name=volume_name,
description=f"Created for {name}")
return_volumes.append(return_volume)
return return_volumes


Expand All @@ -175,6 +172,9 @@ def volumes_host_vars_update(connection, server, host_vars):
logging.info(f"Finding device for {volume['name']}.")
server_volume = next((server_volume for server_volume in server_attachment if
server_volume["name"] == volume["name"]), None)
if not server_volume:
raise RuntimeError(
f"Created server {server['name']} doesn't have attached volume {volume['name']}.")
volume["device"] = server_volume.get("device")

logging.debug(f"Added Configuration: Instance {server['name']} has volume {volume['name']} "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
result = connection.delete_server(terminate_worker, wait=True)
logging.info(f"Deleting Volumes")
volume_list = connection.list_volumes()
volume_regex = re.compile(fr"^{terminate_worker}-(\d+)$")
volume_regex = re.compile(fr"^{terminate_worker}-(tmp)-\d+(-.+)?$")
for volume in volume_list:
if volume_regex.match(volume["name"]):
logging.info(f"Trying to delete volume {volume['name']}: {connection.delete_volume(volume)}")
Expand Down

0 comments on commit 4a598ab

Please sign in to comment.