Skip to content

Commit

Permalink
Merge pull request #1344 from stgraber/cluster
Browse files Browse the repository at this point in the history
Improve cluster instance placement
  • Loading branch information
tych0 authored Oct 31, 2024
2 parents fbb0d9e + 52f9140 commit 2cfb95c
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 88 deletions.
18 changes: 5 additions & 13 deletions cmd/incusd/api_cluster_evacuation.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,20 +607,12 @@ func evacuateClusterSelectTarget(ctx context.Context, s *state.State, inst insta

// If target member not specified yet, then find the least loaded cluster member which
// supports the instance's architecture.
if targetMemberInfo == nil {
var err error

err = s.DB.Cluster.Transaction(ctx, func(ctx context.Context, tx *db.ClusterTx) error {
targetMemberInfo, err = tx.GetNodeWithLeastInstances(ctx, candidateMembers)
if err != nil {
return err
}
if targetMemberInfo == nil && len(candidateMembers) > 0 {
targetMemberInfo = &candidateMembers[0]
}

return nil
})
if err != nil {
return nil, nil, err
}
if targetMemberInfo == nil {
return nil, nil, fmt.Errorf("Couldn't find a cluster member for the instance")
}

return sourceMemberInfo, targetMemberInfo, nil
Expand Down
10 changes: 4 additions & 6 deletions cmd/incusd/instance_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,11 @@ func instancePost(d *Daemon, r *http.Request) response.Response {
}
}

err := s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error {
targetMemberInfo, err = tx.GetNodeWithLeastInstances(ctx, filteredCandidateMembers)
return err
})
if err != nil {
return response.SmartError(err)
if len(filteredCandidateMembers) == 0 {
return response.InternalError(fmt.Errorf("Couldn't find a cluster member for the instance"))
}

targetMemberInfo = &filteredCandidateMembers[0]
}

if targetMemberInfo.IsOffline(s.GlobalConfig.OfflineThreshold()) {
Expand Down
12 changes: 5 additions & 7 deletions cmd/incusd/instances_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,14 +1130,12 @@ func instancesPost(d *Daemon, r *http.Request) response.Response {
}

// If no target member was selected yet, pick the member with the least number of instances.
if targetMemberInfo == nil && len(candidateMembers) > 0 {
targetMemberInfo = &candidateMembers[0]
}

if targetMemberInfo == nil {
err = s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error {
targetMemberInfo, err = tx.GetNodeWithLeastInstances(ctx, candidateMembers)
return err
})
if err != nil {
return response.SmartError(err)
}
return response.InternalError(fmt.Errorf("Couldn't find a cluster member for the instance"))
}
}

Expand Down
4 changes: 4 additions & 0 deletions doc/api-extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2627,3 +2627,7 @@ Allow for creating new LVM cluster pools by setting the `source` to the shared b
## `network_ovn_external_interfaces`

This adds support for `bridge.external_interfaces` on OVN networks.

## `instances_scriptlet_get_instances_count`

This allows the instance scriptlet to fetch the count instances given an optional Project or Location filter as well as including pending instances.
1 change: 1 addition & 0 deletions doc/explanation/clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ The following functions are available to the scriptlet (in addition to those pro
- `get_cluster_member_state(member_name)`: Get the cluster member's state. Returns an object with the cluster member's state in the form of [`api.ClusterMemberState`](https://pkg.go.dev/github.com/lxc/incus/shared/api#ClusterMemberState). `member_name` is the name of the cluster member to get the state for.
- `get_instance_resources()`: Get information about the resources the instance will require. Returns an object with the resource information in the form of [`scriptlet.InstanceResources`](https://pkg.go.dev/github.com/lxc/incus/shared/api/scriptlet/#InstanceResources).
- `get_instances(location, project)`: Get a list of instances based on project and/or location filters. Returns the list of instances in the form of [`[]api.Instance`](https://pkg.go.dev/github.com/lxc/incus/shared/api#Instance).
- `get_instances_count(location, project, pending)`: Get a count of the instances based on project and/or location filters. The count may include instances currently being created for which no database record exists yet..
- `get_cluster_members(group)`: Get a list of cluster members based on the cluster group. Returns the list of cluster members in the form of [`[]api.ClusterMember`](https://pkg.go.dev/github.com/lxc/incus/shared/api#ClusterMember).
- `get_project(name)`: Get a project object based on the project name. Returns a project object in the form of [`api.Project`](https://pkg.go.dev/github.com/lxc/incus/shared/api#Project).

Expand Down
100 changes: 100 additions & 0 deletions internal/server/db/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

internalInstance "github.com/lxc/incus/v6/internal/instance"
"github.com/lxc/incus/v6/internal/server/db/cluster"
"github.com/lxc/incus/v6/internal/server/db/operationtype"
"github.com/lxc/incus/v6/internal/server/db/query"
deviceConfig "github.com/lxc/incus/v6/internal/server/device/config"
"github.com/lxc/incus/v6/internal/server/instance/instancetype"
Expand Down Expand Up @@ -1103,3 +1104,102 @@ func UpdateInstance(tx *sql.Tx, id int, description string, architecture int, ep

return nil
}

// GetInstancesCount returns the number of instances with possible filtering for project or location.
// It also supports looking for instances currently being created.
func (c *ClusterTx) GetInstancesCount(ctx context.Context, projectName string, locationName string, includePending bool) (int, error) {
var err error

// Load the project ID if needed.
projectID := int64(-1)
if projectName != "" {
projectID, err = cluster.GetProjectID(ctx, c.Tx(), projectName)
if err != nil {
return -1, err
}
}

// Load the cluster member ID if needed.
nodeID := int64(-1)
if locationName != "" {
nodeID, err = cluster.GetNodeID(ctx, c.Tx(), locationName)
if err != nil {
return -1, err
}
}

// Count the instances.
var count int

if projectID != -1 && nodeID != -1 {
// Count for specified project and cluster member.
created, err := query.Count(ctx, c.tx, "instances", "project_id=? AND node_id=?", projectID, nodeID)
if err != nil {
return -1, fmt.Errorf("Failed to get instances count: %w", err)
}

count += created

if includePending {
pending, err := query.Count(ctx, c.tx, "operations", "project_id=? AND node_id=? AND type=?", projectID, nodeID, operationtype.InstanceCreate)
if err != nil {
return -1, fmt.Errorf("Failed to get pending instances count: %w", err)
}

count += pending
}
} else if projectID != -1 {
// Count for specified project.
created, err := query.Count(ctx, c.tx, "instances", "project_id=?", projectID)
if err != nil {
return -1, fmt.Errorf("Failed to get instances count: %w", err)
}

count += created

if includePending {
pending, err := query.Count(ctx, c.tx, "operations", "project_id=? AND type=?", projectID, operationtype.InstanceCreate)
if err != nil {
return -1, fmt.Errorf("Failed to get pending instances count: %w", err)
}

count += pending
}
} else if nodeID != -1 {
// Count for specified cluster member.
created, err := query.Count(ctx, c.tx, "instances", "node_id=?", nodeID)
if err != nil {
return -1, fmt.Errorf("Failed to get instances count: %w", err)
}

count += created

if includePending {
pending, err := query.Count(ctx, c.tx, "operations", "node_id=? AND type=?", nodeID, operationtype.InstanceCreate)
if err != nil {
return -1, fmt.Errorf("Failed to get pending instances count: %w", err)
}

count += pending
}
} else {
// Count everything.
created, err := query.Count(ctx, c.tx, "instances", "")
if err != nil {
return -1, fmt.Errorf("Failed to get instances count: %w", err)
}

count += created

if includePending {
pending, err := query.Count(ctx, c.tx, "operations", "type=?", operationtype.InstanceCreate)
if err != nil {
return -1, fmt.Errorf("Failed to get pending instances count: %w", err)
}

count += pending
}
}

return count, nil
}
40 changes: 7 additions & 33 deletions internal/server/db/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"fmt"
"net/http"
"slices"
"sort"
"strconv"
"strings"
"time"

"github.com/lxc/incus/v6/internal/server/db/cluster"
"github.com/lxc/incus/v6/internal/server/db/operationtype"
"github.com/lxc/incus/v6/internal/server/db/query"
localUtil "github.com/lxc/incus/v6/internal/server/util"
"github.com/lxc/incus/v6/internal/version"
Expand Down Expand Up @@ -1135,40 +1135,14 @@ func (c *ClusterTx) GetCandidateMembers(ctx context.Context, allMembers []NodeIn
}
}

return candidateMembers, nil
}

// GetNodeWithLeastInstances returns the name of the member with the least number of instances that are either
// already created or being created with an operation.
func (c *ClusterTx) GetNodeWithLeastInstances(ctx context.Context, members []NodeInfo) (*NodeInfo, error) {
var member *NodeInfo
var lowestInstanceCount = -1

for i := range members {
// Fetch the number of instances already created on this member.
created, err := query.Count(ctx, c.tx, "instances", "node_id=?", members[i].ID)
if err != nil {
return nil, fmt.Errorf("Failed to get instances count: %w", err)
}
sort.Slice(candidateMembers, func(i int, j int) bool {
iCount, _ := c.GetInstancesCount(ctx, "", candidateMembers[i].Name, true)
jCount, _ := c.GetInstancesCount(ctx, "", candidateMembers[j].Name, true)

// Fetch the number of instances currently being created on this member.
pending, err := query.Count(ctx, c.tx, "operations", "node_id=? AND type=?", members[i].ID, operationtype.InstanceCreate)
if err != nil {
return nil, fmt.Errorf("Failed to get pending instances count: %w", err)
}

memberInstanceCount := created + pending
if lowestInstanceCount == -1 || memberInstanceCount < lowestInstanceCount {
lowestInstanceCount = memberInstanceCount
member = &members[i]
}
}

if member == nil {
return nil, api.StatusErrorf(http.StatusNotFound, "No suitable cluster member could be found")
}
return iCount < jCount
})

return member, nil
return candidateMembers, nil
}

// SetNodeVersion updates the schema and API version of the node with the
Expand Down
30 changes: 10 additions & 20 deletions internal/server/db/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ INSERT INTO storage_volumes(name, storage_pool_id, node_id, type, project_id, de

// If there are 2 online nodes, return the address of the one with the least
// number of instances.
func TestGetNodeWithLeastInstances(t *testing.T) {
func TestGetCandidateMembers(t *testing.T) {
tx, cleanup := db.NewTestClusterTx(t)
defer cleanup()

Expand All @@ -325,14 +325,12 @@ INSERT INTO instances (id, node_id, name, architecture, type, project_id, descri
require.NoError(t, err)
require.Len(t, members, 2)

member, err := tx.GetNodeWithLeastInstances(context.Background(), members)
require.NoError(t, err)
assert.Equal(t, "buzz", member.Name)
assert.Equal(t, "buzz", members[0].Name)
}

// If there are nodes, and one of them is offline, return the name of the
// online node, even if the offline one has more instances.
func TestGetNodeWithLeastInstances_OfflineNode(t *testing.T) {
func TestGetCandidateMembers_OfflineNode(t *testing.T) {
tx, cleanup := db.NewTestClusterTx(t)
defer cleanup()

Expand All @@ -356,14 +354,12 @@ INSERT INTO instances (id, node_id, name, architecture, type, project_id, descri
require.NoError(t, err)
require.Len(t, members, 1)

member, err := tx.GetNodeWithLeastInstances(context.Background(), members)
require.NoError(t, err)
assert.Equal(t, "buzz", member.Name)
assert.Equal(t, "buzz", members[0].Name)
}

// If there are 2 online nodes, and an instance is pending on one of them,
// return the address of the other one number of instances.
func TestGetNodeWithLeastInstances_Pending(t *testing.T) {
func TestGetCandidateMembers_Pending(t *testing.T) {
tx, cleanup := db.NewTestClusterTx(t)
defer cleanup()

Expand All @@ -383,14 +379,12 @@ INSERT INTO operations (id, uuid, node_id, type, project_id) VALUES (1, 'abc', 1
require.NoError(t, err)
require.Len(t, members, 2)

member, err := tx.GetNodeWithLeastInstances(context.Background(), members)
require.NoError(t, err)
assert.Equal(t, "buzz", member.Name)
assert.Equal(t, "buzz", members[0].Name)
}

// If specific architectures were selected, return only nodes with those
// architectures.
func TestGetNodeWithLeastInstances_Architecture(t *testing.T) {
func TestGetCandidateMembers_Architecture(t *testing.T) {
tx, cleanup := db.NewTestClusterTx(t)
defer cleanup()

Expand Down Expand Up @@ -419,9 +413,7 @@ INSERT INTO instances (id, node_id, name, architecture, type, project_id, descri
require.Len(t, members, 1)

// The local member is returned despite it has more instances.
member, err := tx.GetNodeWithLeastInstances(context.Background(), members)
require.NoError(t, err)
assert.Equal(t, "none", member.Name)
assert.Equal(t, "none", members[0].Name)
}

func TestUpdateNodeFailureDomain(t *testing.T) {
Expand Down Expand Up @@ -452,7 +444,7 @@ func TestUpdateNodeFailureDomain(t *testing.T) {
assert.Equal(t, map[string]uint64{"0.0.0.0": 0, "1.2.3.4:666": 0}, domains)
}

func TestGetNodeWithLeastInstances_DefaultArch(t *testing.T) {
func TestGetCandidateMembers_DefaultArch(t *testing.T) {
tx, cleanup := db.NewTestClusterTx(t)
defer cleanup()

Expand Down Expand Up @@ -480,7 +472,5 @@ INSERT INTO instances (id, node_id, name, architecture, type, project_id, descri
require.NoError(t, err)
require.Len(t, members, 1)

member, err := tx.GetNodeWithLeastInstances(context.Background(), members)
require.NoError(t, err)
assert.Equal(t, "buzz", member.Name)
assert.Equal(t, "buzz", members[0].Name)
}
Loading

0 comments on commit 2cfb95c

Please sign in to comment.