Skip to content

Commit

Permalink
add ability perform rbd pool migration during cold boot
Browse files Browse the repository at this point in the history
This uses Ceph's RBD live migration and thus is quite fast and
we continuously maintain the flattening during subsequent ensure()
calls. Existing snapshots are kept.

This includes a large refactoring to make the Ceph abstraction code
easier to understand with the different requirements we now juggle.

We basically have a relatively abstract "Ceph" object that is the
entrance point to the cluster and some convenience APIs. Volumes are
specific RBD volumes and their API and "VolumeSpecs" capture the
logic we have around Root/Tmp/Swap volumes and their details.

They get embedded into a streamlined process of
starting/ensuring/stopping so the agent and ceph manager objects
can handle them pretty "blindly". They are reachable as well known
names, though, to make it easier to handle the few special cases
in the higher layers.

re PL-131857
  • Loading branch information
ctheune committed Oct 17, 2024
1 parent 2576ca5 commit 482bd55
Show file tree
Hide file tree
Showing 4 changed files with 501 additions and 222 deletions.
59 changes: 36 additions & 23 deletions src/fc/qemu/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,13 +898,15 @@ def _update_from_enc(self):
# reflect back into the config file.
self.cfg = copy.copy(self.enc["parameters"])
self.cfg["name"] = self.enc["name"]
self.cfg["root_size"] = self.cfg["disk"] * (1024**3)
self.cfg["swap_size"] = swap_size(self.cfg["memory"])
self.cfg["tmp_size"] = tmp_size(self.cfg["disk"])
self.cfg["ceph_id"] = self.ceph_id
self.cfg["cpu_model"] = self.cfg.get("cpu_model", "qemu64")
self.cfg["binary_generation"] = self.binary_generation
self.consul_generation = self.enc["consul-generation"]
self.qemu = Qemu(self.cfg)
self.ceph = Ceph(self.cfg)
self.ceph = Ceph(self.cfg, self.enc)
self.contexts = [self.qemu, self.ceph]
for attr in ["migration_ctl_address"]:
setattr(self, attr, getattr(self, attr).format(**self.cfg))
Expand Down Expand Up @@ -1080,6 +1082,7 @@ def ensure_online_local(self):
self.ensure_online_disk_size()
self.ensure_online_disk_throttle()
self.ensure_watchdog()
self.ceph.ensure()
# Be aggressive/opportunistic about re-acquiring locks in case
# they were taken away.
self.ceph.lock()
Expand Down Expand Up @@ -1117,19 +1120,20 @@ def mark_qemu_binary_generation(self):

def ensure_online_disk_size(self):
"""Trigger block resize action for the root disk."""
target_size = self.cfg["disk"] * (1024**3)
if self.ceph.root.size >= target_size:
target_size = self.cfg["root_size"]
current_size = self.ceph.volumes["root"].size
if current_size >= target_size:
self.log.info(
"check-disk-size",
wanted=target_size,
found=self.ceph.root.size,
found=current_size,
action="none",
)
return
self.log.info(
"check-disk-size",
wanted=target_size,
found=self.ceph.root.size,
found=current_size,
action="resize",
)
self.qemu.resize_root(target_size)
Expand Down Expand Up @@ -1204,8 +1208,8 @@ def consul_deregister(self):
@locked()
@running(False)
def start(self):
self.ceph.start()
self.generate_config()
self.ceph.start(self.enc, self.binary_generation)
self.qemu.start()
self.consul_register()
self.ensure_online_disk_throttle()
Expand Down Expand Up @@ -1262,42 +1266,50 @@ def snapshot(self, snapshot, keep=0):
if keep:
until = util.today() + datetime.timedelta(days=keep)
snapshot = snapshot + "-keep-until-" + until.strftime("%Y%m%d")
if snapshot in [x.snapname for x in self.ceph.root.snapshots]:
if snapshot in [
x.snapname for x in self.ceph.volumes["root"].snapshots
]:
self.log.info("snapshot-exists", snapshot=snapshot)
return
self.log.info("snapshot-create", name=snapshot)
with self.frozen_vm() as frozen:
if frozen:
self.ceph.root.snapshots.create(snapshot)
self.ceph.volumes["root"].snapshots.create(snapshot)
else:
self.log.error("snapshot-ignore", reason="not frozen")
raise RuntimeError("VM not frozen, not making snapshot.")

# This must be locked because we're going to use the
# QMP socket and that only supports talking to one person at a time.
# Alternatively we'd had to connect/disconnect and do weird things
# for every single command ...
@locked()
def status(self):
"""Determine status of the VM."""
if self.qemu.is_running():
status = 0
self.log.info("vm-status", result="online")
for device in list(self.qemu.block_info().values()):
self.log.info(
"disk-throttle",
device=device["device"],
iops=device["inserted"]["iops"],
)
else:
status = 1
self.log.info("vm-status", result="offline")
for volume in self.ceph.volumes:
locker = volume.lock_status()
self.log.info("rbd-status", volume=volume.fullname, locker=locker)
try:
if self.qemu.is_running():
status = 0
self.log.info("vm-status", result="online")
for device in list(self.qemu.block_info().values()):
self.log.info(
"disk-throttle",
device=device["device"],
iops=device["inserted"]["iops"],
)
else:
status = 1
self.log.info("vm-status", result="offline")
except VMStateInconsistent:
self.log.exception("vm-status", result="inconsistent")
self.ceph.status()
consul = locate_live_service(self.consul, "qemu-" + self.name)
if consul:
self.log.info(
"consul", service=consul["Service"], address=consul["Address"]
)
else:
self.log.info("consul", service="<not registered>")

return status

def telnet(self):
Expand Down Expand Up @@ -1526,5 +1538,6 @@ def generate_config(self):
accelerator=accelerator,
machine_type=machine_type,
network="".join(netconfig),
ceph=self.ceph,
**self.cfg,
)
Loading

0 comments on commit 482bd55

Please sign in to comment.