Skip to content

Commit

Permalink
feat: provide stable symlinks in disk resources
Browse files Browse the repository at this point in the history
This allows to grab various `/dev/disk` symlinks,
including in maintenance mode when `talosctl ls` is not allowed.

Samle output:

```yaml
node: 172.20.0.5
metadata:
    namespace: runtime
    type: Disks.block.talos.dev
    id: nvme0n2
    version: 2
    owner: block.DisksController
    phase: running
    created: 2025-01-23T12:57:08Z
    updated: 2025-01-23T12:57:09Z
spec:
    dev_path: /dev/nvme0n2
    size: 5368709120
    pretty_size: 5.4 GB
    io_size: 512
    sector_size: 512
    readonly: false
    cdrom: false
    model: QEMU NVMe Ctrl
    serial: deadbeef
    wwid: nvme.1b36-6465616462656566-51454d55204e564d65204374726c-00000002
    bus_path: /pci0000:00/0000:00:08.0/nvme
    sub_system: /sys/class/block
    transport: nvme
    symlinks:
        - /dev/disk/by-diskseq/11
        - /dev/disk/by-id/nvme-QEMU_NVMe_Ctrl_deadbeef_2
        - /dev/disk/by-id/nvme-nvme.1b36-6465616462656566-51454d55204e564d65204374726c-00000002
        - /dev/disk/by-path/pci-0000:00:08.0-nvme-2
```

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Jan 24, 2025
1 parent f407c88 commit 75673b6
Show file tree
Hide file tree
Showing 17 changed files with 1,012 additions and 237 deletions.
6 changes: 6 additions & 0 deletions api/resource/definitions/block/block.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ message DiskSpec {
string pretty_size = 15;
repeated string secondary_disks = 16;
string uuid = 17;
repeated string symlinks = 18;
}

// EncryptionKey is the spec for volume encryption key.
Expand Down Expand Up @@ -133,6 +134,11 @@ message ProvisioningSpec {
FilesystemSpec filesystem_spec = 4;
}

// SymlinkSpec is the spec for Symlinks resource.
message SymlinkSpec {
repeated string paths = 1;
}

// SystemDiskSpec is the spec for SystemDisks resource.
message SystemDiskSpec {
string disk_id = 1;
Expand Down
3 changes: 2 additions & 1 deletion internal/app/machined/pkg/controllers/block/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"
"golang.org/x/sys/unix"

"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/block/internal/inotify"
"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/block/internal/kobject"
Expand Down Expand Up @@ -217,7 +218,7 @@ func (ctrl *DevicesController) processEvent(ctx context.Context, r controller.Ru
return fmt.Errorf("failed to modify device %q: %w", id, err)
}

if err := inotifyWatcher.Add(devPath); err != nil {
if err := inotifyWatcher.Add(devPath, unix.IN_CLOSE_WRITE); err != nil {
return fmt.Errorf("failed to add inotify watch for %q: %w", devPath, err)
}
case kobject.ActionRemove:
Expand Down
51 changes: 50 additions & 1 deletion internal/app/machined/pkg/controllers/block/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/xslices"
blkdev "github.com/siderolabs/go-blockdevice/v2/block"
"go.uber.org/zap"
Expand All @@ -34,6 +35,11 @@ func (ctrl *DisksController) Inputs() []controller.Input {
Type: block.DeviceType,
Kind: controller.InputWeak,
},
{
Namespace: block.NamespaceName,
Type: block.SymlinkType,
Kind: controller.InputWeak,
},
}
}

Expand Down Expand Up @@ -82,8 +88,13 @@ func (ctrl *DisksController) Run(ctx context.Context, r controller.Runtime, logg
continue
}

// always update symlinks, but skip if the disk hasn't been created yet
if err = ctrl.updateSymlinks(ctx, r, device); err != nil {
return err
}

if lastObserved, ok := lastObservedGenerations[device.Metadata().ID()]; ok && device.TypedSpec().Generation == lastObserved {
// ignore disks which have some generation as before (don't query them once again)
// ignore disks which have same generation as before (don't query them once again)
touchedDisks[device.Metadata().ID()] = struct{}{}

continue
Expand Down Expand Up @@ -115,6 +126,33 @@ func (ctrl *DisksController) Run(ctx context.Context, r controller.Runtime, logg
}
}

func (ctrl *DisksController) updateSymlinks(ctx context.Context, r controller.Runtime, device *block.Device) error {
symlinks, err := safe.ReaderGetByID[*block.Symlink](ctx, r, device.Metadata().ID())
if err != nil {
if state.IsNotFoundError(err) {
return nil
}

return err
}

_, err = safe.ReaderGetByID[*block.Disk](ctx, r, device.Metadata().ID())
if err != nil {
if state.IsNotFoundError(err) {
// don't create disk entries even if we have symlinks, let analyze handle it
return nil
}

return err
}

return safe.WriterModify(ctx, r, block.NewDisk(block.NamespaceName, device.Metadata().ID()), func(d *block.Disk) error {
d.TypedSpec().Symlinks = symlinks.TypedSpec().Paths

return nil
})
}

//nolint:gocyclo
func (ctrl *DisksController) analyzeBlockDevice(
ctx context.Context, r controller.Runtime, logger *zap.Logger, device *block.Device, touchedDisks map[string]struct{}, allBlockdevices safe.List[*block.Device],
Expand Down Expand Up @@ -172,6 +210,11 @@ func (ctrl *DisksController) analyzeBlockDevice(
return devID
})

symlinks, err := safe.ReaderGetByID[*block.Symlink](ctx, r, device.Metadata().ID())
if err != nil && !state.IsNotFoundError(err) {
return err
}

touchedDisks[device.Metadata().ID()] = struct{}{}

return safe.WriterModify(ctx, r, block.NewDisk(block.NamespaceName, device.Metadata().ID()), func(d *block.Disk) error {
Expand All @@ -195,6 +238,12 @@ func (ctrl *DisksController) analyzeBlockDevice(

d.TypedSpec().SecondaryDisks = secondaryDisks

if symlinks != nil {
d.TypedSpec().Symlinks = symlinks.TypedSpec().Paths
} else {
d.TypedSpec().Symlinks = nil
}

return nil
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (w *Watcher) Run() (<-chan string, <-chan error) {
}

// Send the events that are not ignored on the events channel
if mask&unix.IN_IGNORED == 0 && mask&unix.IN_CLOSE_WRITE != 0 {
if mask&unix.IN_IGNORED == 0 && mask&^uint32(unix.IN_DELETE_SELF) != 0 {
eventCh <- name
}

Expand All @@ -224,8 +224,8 @@ func (w *Watcher) Run() (<-chan string, <-chan error) {
}

// Add a watch to the inotify watcher.
func (w *Watcher) Add(name string) error {
var flags uint32 = unix.IN_CLOSE_WRITE | unix.IN_DELETE_SELF
func (w *Watcher) Add(name string, flags uint32) error {
flags |= unix.IN_DELETE_SELF

return w.watches.updatePath(name, func(existing *watch) (*watch, error) {
if existing != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,37 @@ import (
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"

"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/block/internal/inotify"
)

//nolint:gocyclo
func TestWatcher(t *testing.T) {
func assertEvent(t *testing.T, watchCh <-chan string, errCh <-chan error, expected string) {
t.Helper()

select {
case path := <-watchCh:
require.Equal(t, expected, path)
case err := <-errCh:
require.FailNow(t, "unexpected error", "%s", err)
case <-time.After(time.Second):
require.FailNow(t, "timeout")
}
}

func assertNoEvent(t *testing.T, watchCh <-chan string, errCh <-chan error) {
t.Helper()

select {
case path := <-watchCh:
require.FailNow(t, "unexpected path", "%s", path)
case err := <-errCh:
require.FailNow(t, "unexpected error", "%s", err)
case <-time.After(100 * time.Millisecond):
}
}

func TestWatcherCloseWrite(t *testing.T) {
watcher, err := inotify.NewWatcher()
require.NoError(t, err)

Expand All @@ -25,61 +50,79 @@ func TestWatcher(t *testing.T) {
require.NoError(t, os.WriteFile(filepath.Join(d, "file1"), []byte("test1"), 0o644))
require.NoError(t, os.WriteFile(filepath.Join(d, "file2"), []byte("test2"), 0o644))

require.NoError(t, watcher.Add(filepath.Join(d, "file1")))
require.NoError(t, watcher.Add(filepath.Join(d, "file1"), unix.IN_CLOSE_WRITE))

watchCh, errCh := watcher.Run()

require.NoError(t, watcher.Add(filepath.Join(d, "file2")))
require.NoError(t, watcher.Add(filepath.Join(d, "file2"), unix.IN_CLOSE_WRITE))

select {
case path := <-watchCh:
require.FailNow(t, "unexpected path", "%s", path)
case err = <-errCh:
require.FailNow(t, "unexpected error", "%s", err)
case <-time.After(100 * time.Millisecond):
}
assertNoEvent(t, watchCh, errCh)

// open file1 for writing, should get inotify event
f1, err := os.OpenFile(filepath.Join(d, "file1"), os.O_WRONLY, 0)
require.NoError(t, err)

require.NoError(t, f1.Close())

select {
case path := <-watchCh:
require.Equal(t, filepath.Join(d, "file1"), path)
case err = <-errCh:
require.FailNow(t, "unexpected error", "%s", err)
case <-time.After(time.Second):
require.FailNow(t, "timeout")
}
assertEvent(t, watchCh, errCh, filepath.Join(d, "file1"))

// open file2 for reading, should not get inotify event
f2, err := os.OpenFile(filepath.Join(d, "file2"), os.O_RDONLY, 0)
require.NoError(t, err)

require.NoError(t, f2.Close())

select {
case path := <-watchCh:
require.FailNow(t, "unexpected path", "%s", path)
case err = <-errCh:
require.FailNow(t, "unexpected error", "%s", err)
case <-time.After(100 * time.Millisecond):
}
assertNoEvent(t, watchCh, errCh)

// remove file2
require.NoError(t, os.Remove(filepath.Join(d, "file2")))

select {
case path := <-watchCh:
require.FailNow(t, "unexpected path", "%s", path)
case err = <-errCh:
require.FailNow(t, "unexpected error", "%s", err)
case <-time.After(100 * time.Millisecond):
}
assertNoEvent(t, watchCh, errCh)

require.NoError(t, watcher.Remove(filepath.Join(d, "file2")))

require.NoError(t, watcher.Close())
}

func TestWatcherDirectory(t *testing.T) {
watcher, err := inotify.NewWatcher()
require.NoError(t, err)

d := t.TempDir()

require.NoError(t, os.Mkdir(filepath.Join(d, "dir1"), 0o755))

require.NoError(t, os.Symlink("a1", filepath.Join(d, "dir1", "link1")))
require.NoError(t, os.Symlink("a2", filepath.Join(d, "dir1", "link2")))

require.NoError(t, watcher.Add(d, unix.IN_CREATE|unix.IN_DELETE|unix.IN_MOVE))
require.NoError(t, watcher.Add(filepath.Join(d, "dir1"), unix.IN_CREATE|unix.IN_DELETE|unix.IN_MOVE))

watchCh, errCh := watcher.Run()

assertNoEvent(t, watchCh, errCh)

require.NoError(t, os.Remove(filepath.Join(d, "dir1", "link1")))

assertEvent(t, watchCh, errCh, filepath.Join(d, "dir1", "link1"))

require.NoError(t, os.Mkdir(filepath.Join(d, "dir2"), 0o755))

assertEvent(t, watchCh, errCh, filepath.Join(d, "dir2"))

require.NoError(t, os.Symlink("a3", filepath.Join(d, "dir1", "#.link3")))

assertEvent(t, watchCh, errCh, filepath.Join(d, "dir1", "#.link3"))

require.NoError(t, os.Rename(filepath.Join(d, "dir1", "#.link3"), filepath.Join(d, "dir1", "link3")))

assertEvent(t, watchCh, errCh, filepath.Join(d, "dir1", "#.link3"))
assertEvent(t, watchCh, errCh, filepath.Join(d, "dir1", "link3"))

// no more events
assertNoEvent(t, watchCh, errCh)

require.NoError(t, watcher.Remove(d))

require.NoError(t, watcher.Close())
}
Loading

0 comments on commit 75673b6

Please sign in to comment.