Skip to content

Commit

Permalink
feat: searcher plugin change return params (#844)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Dec 1, 2021
1 parent eb06464 commit 9b90440
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 45 deletions.
6 changes: 3 additions & 3 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *DaemonOption) Validate() error {
}

if p.Scheduler.Manager.Enable {
if p.Scheduler.Manager.Addr == "" {
if len(p.Scheduler.NetAddrs) == 0 {
return errors.New("manager addr is not specified")
}

Expand All @@ -140,8 +140,8 @@ type SchedulerOption struct {
type ManagerOption struct {
// Enable get configuration from manager
Enable bool `mapstructure:"enable" yaml:"enable"`
// Addr is manager addresse
Addr string `mapstructure:"addr" yaml:"addr"`
// NetAddrs is manager addresses.
NetAddrs []dfnet.NetAddr `mapstructure:"netAddrs" yaml:"netAddrs"`
// RefreshInterval is the refresh interval
RefreshInterval time.Duration `mapstructure:"refreshInterval" yaml:"refreshInterval"`
}
Expand Down
9 changes: 7 additions & 2 deletions client/config/peerhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,13 @@ func TestPeerHostOption_Load(t *testing.T) {
KeepStorage: false,
Scheduler: SchedulerOption{
Manager: ManagerOption{
Enable: false,
Addr: "127.0.0.1:65003",
Enable: false,
NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
Addr: "127.0.0.1:65003",
},
},
RefreshInterval: 5 * time.Minute,
},
NetAddrs: []dfnet.NetAddr{
Expand Down
4 changes: 3 additions & 1 deletion client/config/testdata/config/daemon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ keepStorage: false
scheduler:
manager:
enable: false
addr: "127.0.0.1:65003"
netAddrs:
- type: tcp
addr: 127.0.0.1:65003
refreshInterval: 5m
netAddrs:
- type: tcp
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func New(opt *config.DaemonOption) (Daemon, error) {
var dynconfig config.Dynconfig
if opt.Scheduler.Manager.Enable == true {
// New manager client
managerClient, err := managerclient.New(opt.Scheduler.Manager.Addr)
managerClient, err := managerclient.NewWithAddrs(opt.Scheduler.Manager.NetAddrs)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions docs/en/deployment/configuration/dfget.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ scheduler:
manager:
# get scheduler list dynamically from manager
enable: false
# manager service address
addr: 127.0.0.1:65003
# manager service addresses
netAddrs:
- type: tcp
addr: 127.0.0.1:65003
# scheduler list refresh interval
refreshInterval: 5m
# schedule timeout
Expand Down
4 changes: 3 additions & 1 deletion docs/zh-CN/deployment/configuration/dfget.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ scheduler:
# 通过 manager 接口动态获取 scheduler 列表
enable: false
# manager 服务地址
addr: 127.0.0.1:65003
netAddrs:
- type: tcp
addr: 127.0.0.1:65003
# scheduler 列表刷新时间
refreshInterval: 5m
# 调度超时
Expand Down
20 changes: 13 additions & 7 deletions manager/searcher/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package searcher

import (
"context"
"errors"
"fmt"
"strings"

"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -73,7 +75,7 @@ type Scopes struct {
}

type Searcher interface {
FindSchedulerCluster(context.Context, []model.SchedulerCluster, *manager.ListSchedulersRequest) (model.SchedulerCluster, bool)
FindSchedulerCluster(context.Context, []model.SchedulerCluster, *manager.ListSchedulersRequest) (model.SchedulerCluster, error)
}

type searcher struct{}
Expand All @@ -89,10 +91,14 @@ func New() Searcher {
return s
}

func (s *searcher) FindSchedulerCluster(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) (model.SchedulerCluster, bool) {
func (s *searcher) FindSchedulerCluster(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) (model.SchedulerCluster, error) {
conditions := client.HostInfo
if len(schedulerClusters) <= 0 || len(conditions) <= 0 {
return model.SchedulerCluster{}, false
if len(conditions) <= 0 {
return model.SchedulerCluster{}, errors.New("empty conditions")
}

if len(schedulerClusters) <= 0 {
return model.SchedulerCluster{}, errors.New("empty scheduler clusters")
}

// If there are security domain conditions, match clusters of the same security domain.
Expand Down Expand Up @@ -121,10 +127,10 @@ func (s *searcher) FindSchedulerCluster(ctx context.Context, schedulerClusters [
switch len(clusters) {
case 0:
// If the security domain does not match, there is no cluster available
return model.SchedulerCluster{}, false
return model.SchedulerCluster{}, fmt.Errorf("security domain %s does not match", securityDomain)
case 1:
// If only one cluster matches the security domain, return the cluster directly
return clusters[0], true
return clusters[0], nil
default:
// If there are multiple clusters matching the security domain,
// select the schuelder cluster with a higher score
Expand All @@ -143,7 +149,7 @@ func (s *searcher) FindSchedulerCluster(ctx context.Context, schedulerClusters [
result = cluster
}
}
return result, true
return result, nil
}
}

Expand Down
71 changes: 50 additions & 21 deletions manager/searcher/searcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,53 @@ func TestSchedulerCluster(t *testing.T) {
name string
schedulerClusters []model.SchedulerCluster
conditions map[string]string
expect func(t *testing.T, data model.SchedulerCluster, ok bool)
expect func(t *testing.T, data model.SchedulerCluster, err error)
}{
{
name: "conditions is empty",
schedulerClusters: []model.SchedulerCluster{{Name: "foo"}},
conditions: map[string]string{},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(ok, false)
assert.EqualError(err, "empty conditions")
},
},
{
name: "scheduler clusters is empty",
schedulerClusters: []model.SchedulerCluster{},
conditions: map[string]string{"location": "foo"},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(ok, false)
assert.EqualError(err, "empty scheduler clusters")
},
},
{
name: "security_domain does not match",
schedulerClusters: []model.SchedulerCluster{
{
Name: "foo",
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
{
Domain: "domain-2",
},
},
},
Schedulers: []model.Scheduler{
{
HostName: "foo",
State: "active",
},
},
},
{
Name: "bar",
},
},
conditions: map[string]string{"security_domain": "domain-1"},
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.EqualError(err, "security domain domain-1 does not match")
},
},
{
Expand All @@ -75,10 +104,10 @@ func TestSchedulerCluster(t *testing.T) {
},
},
conditions: map[string]string{"security_domain": "domain-1"},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.Equal(ok, true)
assert.NoError(err)
},
},
{
Expand Down Expand Up @@ -107,10 +136,10 @@ func TestSchedulerCluster(t *testing.T) {
},
},
conditions: map[string]string{"location": "location-1"},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.Equal(ok, true)
assert.NoError(err)
},
},
{
Expand Down Expand Up @@ -139,10 +168,10 @@ func TestSchedulerCluster(t *testing.T) {
},
},
conditions: map[string]string{"idc": "idc-1"},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.Equal(ok, true)
assert.NoError(err)
},
},
{
Expand Down Expand Up @@ -171,10 +200,10 @@ func TestSchedulerCluster(t *testing.T) {
},
},
conditions: map[string]string{"net_topology": "net-topology-1"},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.Equal(ok, true)
assert.NoError(err)
},
},
{
Expand Down Expand Up @@ -207,10 +236,10 @@ func TestSchedulerCluster(t *testing.T) {
"location": "location-1",
"idc": "idc-1",
},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.Equal(ok, true)
assert.NoError(err)
},
},
{
Expand Down Expand Up @@ -249,10 +278,10 @@ func TestSchedulerCluster(t *testing.T) {
"security_domain": "domain-1",
"location": "location-1",
},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.Equal(ok, true)
assert.NoError(err)
},
},
{
Expand Down Expand Up @@ -291,10 +320,10 @@ func TestSchedulerCluster(t *testing.T) {
"security_domain": "domain-1",
"idc": "idc-1",
},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.Equal(ok, true)
assert.NoError(err)
},
},
{
Expand Down Expand Up @@ -335,10 +364,10 @@ func TestSchedulerCluster(t *testing.T) {
"idc": "idc-1",
"location": "location-1",
},
expect: func(t *testing.T, data model.SchedulerCluster, ok bool) {
expect: func(t *testing.T, data model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data.Name, "foo")
assert.Equal(ok, true)
assert.NoError(err)
},
},
}
Expand Down
4 changes: 2 additions & 2 deletions manager/searcher/testdata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func main() {
os.Exit(1)
}

cluster, ok := s.FindSchedulerCluster(context.Background(), []model.SchedulerCluster{}, &manager.ListSchedulersRequest{})
if !ok {
cluster, err := s.FindSchedulerCluster(context.Background(), []model.SchedulerCluster{}, &manager.ListSchedulersRequest{})
if err != nil {
fmt.Println("scheduler cluster not found")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions manager/searcher/testdata/plugin/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

type searcher struct{}

func (s *searcher) FindSchedulerCluster(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) (model.SchedulerCluster, bool) {
return model.SchedulerCluster{Name: "foo"}, true
func (s *searcher) FindSchedulerCluster(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) (model.SchedulerCluster, error) {
return model.SchedulerCluster{Name: "foo"}, nil
}

func DragonflyPluginInit(option map[string]string) (interface{}, map[string]string, error) {
Expand Down
6 changes: 3 additions & 3 deletions manager/service/service_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,9 @@ func (s *GRPC) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRe

// Search optimal scheduler cluster
log.Infof("list scheduler clusters %+v with hostInfo %+v", schedulerClusters, req.HostInfo)
schedulerCluster, ok := s.searcher.FindSchedulerCluster(ctx, schedulerClusters, req)
if !ok {
log.Errorf("can not matching scheduler cluster")
schedulerCluster, err := s.searcher.FindSchedulerCluster(ctx, schedulerClusters, req)
if err != nil {
log.Errorf("can not matching scheduler cluster %v", err)
return nil, status.Error(codes.NotFound, "scheduler cluster not found")
}
log.Infof("find matching scheduler cluster %v", schedulerCluster)
Expand Down
18 changes: 18 additions & 0 deletions pkg/rpc/manager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package client

import (
"context"
"errors"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -27,6 +28,8 @@ import (
"google.golang.org/grpc/backoff"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/reachable"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
)

Expand Down Expand Up @@ -91,6 +94,21 @@ func New(target string) (Client, error) {
}, nil
}

func NewWithAddrs(netAddrs []dfnet.NetAddr) (Client, error) {
for _, netAddr := range netAddrs {
ipReachable := reachable.New(&reachable.Config{Address: netAddr.Addr})
if err := ipReachable.Check(); err != nil {
logger.Warnf("%s address can not reachable", netAddr.Addr)
continue
}

logger.Infof("use %s address for manager grpc client", netAddr.Addr)
return New(netAddr.Addr)
}

return nil, errors.New("can not find available addresses")
}

func (c *client) GetScheduler(req *manager.GetSchedulerRequest) (*manager.Scheduler, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()
Expand Down

0 comments on commit 9b90440

Please sign in to comment.