Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added possibility to setup NB configuration from file at VPP-Agent start #1769

Merged
merged 11 commits into from
Dec 21, 2020
Merged
8 changes: 4 additions & 4 deletions client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"go.ligato.io/cn-infra/v2/db/keyval"

"go.ligato.io/vpp-agent/v3/pkg/models"
orch "go.ligato.io/vpp-agent/v3/plugins/orchestrator"
"go.ligato.io/vpp-agent/v3/plugins/orchestrator/contextdecorator"
"go.ligato.io/vpp-agent/v3/proto/ligato/generic"
)

Expand Down Expand Up @@ -64,7 +64,7 @@ func (c *client) ResyncConfig(items ...proto.Message) error {
}

ctx := context.Background()
ctx = orch.DataSrcContext(ctx, "localclient")
ctx = contextdecorator.DataSrcContext(ctx, "localclient")
return txn.Commit(ctx)
}

Expand Down Expand Up @@ -121,9 +121,9 @@ func (r *changeRequest) Send(ctx context.Context) error {
if r.err != nil {
return r.err
}
_, withDataSrc := orch.DataSrcFromContext(ctx)
_, withDataSrc := contextdecorator.DataSrcFromContext(ctx)
if !withDataSrc {
ctx = orch.DataSrcContext(ctx, "localclient")
ctx = contextdecorator.DataSrcContext(ctx, "localclient")
}
return r.txn.Commit(ctx)
}
Expand Down
5 changes: 2 additions & 3 deletions clientv2/vpp/dbadapter/data_change_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
"context"

"go.ligato.io/cn-infra/v2/db/keyval"

"go.ligato.io/vpp-agent/v3/pkg/models"
"go.ligato.io/vpp-agent/v3/plugins/orchestrator/contextdecorator"

vppclient "go.ligato.io/vpp-agent/v3/clientv2/vpp"
orch "go.ligato.io/vpp-agent/v3/plugins/orchestrator"
abf "go.ligato.io/vpp-agent/v3/proto/ligato/vpp/abf"
acl "go.ligato.io/vpp-agent/v3/proto/ligato/vpp/acl"
intf "go.ligato.io/vpp-agent/v3/proto/ligato/vpp/interfaces"
Expand Down Expand Up @@ -73,7 +72,7 @@ func (dsl *DataChangeDSL) Delete() vppclient.DeleteDSL {
// Send propagates requested changes to the plugins.
func (dsl *DataChangeDSL) Send() vppclient.Reply {
ctx := context.Background()
ctx = orch.DataSrcContext(ctx, "localclient")
ctx = contextdecorator.DataSrcContext(ctx, "localclient")
err := dsl.txn.Commit(ctx)
return &Reply{err}
}
Expand Down
5 changes: 2 additions & 3 deletions clientv2/vpp/dbadapter/data_resync_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
"context"

"go.ligato.io/cn-infra/v2/db/keyval"

"go.ligato.io/vpp-agent/v3/pkg/models"
"go.ligato.io/vpp-agent/v3/plugins/orchestrator/contextdecorator"

vppclient "go.ligato.io/vpp-agent/v3/clientv2/vpp"
orch "go.ligato.io/vpp-agent/v3/plugins/orchestrator"
abf "go.ligato.io/vpp-agent/v3/proto/ligato/vpp/abf"
acl "go.ligato.io/vpp-agent/v3/proto/ligato/vpp/acl"
intf "go.ligato.io/vpp-agent/v3/proto/ligato/vpp/interfaces"
Expand Down Expand Up @@ -367,7 +366,7 @@ func (dsl *DataResyncDSL) Send() vppclient.Reply {
}

ctx := context.Background()
ctx = orch.DataSrcContext(ctx, "localclient")
ctx = contextdecorator.DataSrcContext(ctx, "localclient")
err := dsl.txn.Commit(ctx)

return &Reply{err: err}
Expand Down
17 changes: 10 additions & 7 deletions cmd/vpp-agent/app/vpp_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
package app

import (
"github.com/go-errors/errors"
"go.ligato.io/cn-infra/v2/datasync"
"go.ligato.io/cn-infra/v2/datasync/kvdbsync"
"go.ligato.io/cn-infra/v2/datasync/kvdbsync/local"
"go.ligato.io/cn-infra/v2/datasync/msgsync"
"go.ligato.io/cn-infra/v2/datasync/resync"
"go.ligato.io/cn-infra/v2/db/keyval/consul"
"go.ligato.io/cn-infra/v2/db/keyval/etcd"
"go.ligato.io/cn-infra/v2/db/keyval/redis"
Expand All @@ -28,14 +28,15 @@ import (
"go.ligato.io/cn-infra/v2/infra"
"go.ligato.io/cn-infra/v2/logging/logmanager"
"go.ligato.io/cn-infra/v2/messaging/kafka"

"go.ligato.io/vpp-agent/v3/plugins/configurator"
linux_ifplugin "go.ligato.io/vpp-agent/v3/plugins/linux/ifplugin"
linux_iptablesplugin "go.ligato.io/vpp-agent/v3/plugins/linux/iptablesplugin"
linux_l3plugin "go.ligato.io/vpp-agent/v3/plugins/linux/l3plugin"
linux_nsplugin "go.ligato.io/vpp-agent/v3/plugins/linux/nsplugin"
"go.ligato.io/vpp-agent/v3/plugins/netalloc"
"go.ligato.io/vpp-agent/v3/plugins/orchestrator"
"go.ligato.io/vpp-agent/v3/plugins/orchestrator/localregistry"
"go.ligato.io/vpp-agent/v3/plugins/orchestrator/watcher"
"go.ligato.io/vpp-agent/v3/plugins/restapi"
"go.ligato.io/vpp-agent/v3/plugins/telemetry"
"go.ligato.io/vpp-agent/v3/plugins/vpp/abfplugin"
Expand Down Expand Up @@ -100,12 +101,14 @@ func New() *VPPAgent {
)

// Set watcher for KVScheduler.
watchers := datasync.KVProtoWatchers{
initFileRegistry := localregistry.NewInitFileRegistryPlugin()
watchers := watcher.NewPlugin(watcher.UseWatchers(
local.DefaultRegistry,
initFileRegistry,
etcdDataSync,
consulDataSync,
redisDataSync,
}
))
orchestrator.DefaultPlugin.Watcher = watchers
orchestrator.DefaultPlugin.StatusPublisher = writers
orchestrator.EnabledGrpcMetrics()
Expand Down Expand Up @@ -157,9 +160,9 @@ func (a *VPPAgent) Init() error {

// AfterInit executes resync.
func (a *VPPAgent) AfterInit() error {
// manually start resync after all plugins started
resync.DefaultPlugin.DoResync()
//orchestrator.DefaultPlugin.InitialSync()
if err := orchestrator.DefaultPlugin.InitialSync(); err != nil {
return errors.Errorf("failure in initial sync: %v", err)
}
a.StatusCheck.ReportStateChange(a.PluginName, statuscheck.OK, nil)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/models/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,5 @@ func (m *LocallyKnownModel) InstanceName(x interface{}) (string, error) {
if m.nameFunc == nil {
return "", nil
}
return m.nameFunc(x)
return m.nameFunc(x, m.goType)
}
34 changes: 34 additions & 0 deletions pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ package models

import (
"path"
"reflect"
"strings"

"github.com/go-errors/errors"
"github.com/golang/protobuf/proto"
protoV2 "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/dynamicpb"
)

// Register registers model in DefaultRegistry.
Expand Down Expand Up @@ -136,3 +139,34 @@ func keyPrefix(modelSpec Spec, hasTemplateName bool) string {
}
return keyPrefix
}

// dynamicMessageToGeneratedMessage converts proto dynamic message to corresponding generated proto message
// (identified by go type).
// This conversion method should help handling dynamic proto messages in mostly protoc-generated proto message
// oriented codebase (i.e. help for type conversions to named, help handle missing data fields as seen
// in generated proto messages,...)
func dynamicMessageToGeneratedMessage(dynamicMessage *dynamicpb.Message,
goTypeOfGeneratedMessage reflect.Type) (proto.Message, error) {

// create empty proto message of the same type as it was used for registration
var registeredGoType interface{}
if goTypeOfGeneratedMessage.Kind() == reflect.Ptr {
registeredGoType = reflect.New(goTypeOfGeneratedMessage.Elem()).Interface()
} else {
registeredGoType = reflect.Zero(goTypeOfGeneratedMessage).Interface()
}
message, isProtoV1 := registeredGoType.(proto.Message)
if !isProtoV1 {
messageV2, isProtoV2 := registeredGoType.(protoV2.Message)
if !isProtoV2 {
return nil, errors.Errorf("registered go type(%T) is not proto.Message", registeredGoType)
}
message = proto.MessageV1(messageV2)
}

// fill empty proto message with data from its dynamic proto message counterpart
// (alternative approach to this is marshalling dynamicMessage to json and unmarshalling it back to message)
proto.Merge(message, dynamicMessage)

return message, nil
}
20 changes: 18 additions & 2 deletions pkg/models/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package models

import (
"net"
"reflect"
"strings"
"text/template"

"google.golang.org/protobuf/types/dynamicpb"
)

type modelOptions struct {
Expand All @@ -15,7 +18,10 @@ type modelOptions struct {
type ModelOption func(*modelOptions)

// NameFunc represents function which can name model instance.
type NameFunc func(obj interface{}) (string, error)
// To properly handle also dynamic Messages (dynamicpb.Message)
// as model instances, the go type of corresponding generated
// proto message must be given.
type NameFunc func(obj interface{}, messageGoType reflect.Type) (string, error)

// WithNameTemplate returns option for models which sets function
// for generating name of instances using custom template.
Expand All @@ -36,7 +42,17 @@ func NameTemplate(t string) NameFunc {
tmpl := template.Must(
template.New("name").Funcs(funcMap).Option("missingkey=error").Parse(t),
)
return func(obj interface{}) (string, error) {
return func(obj interface{}, messageGoType reflect.Type) (string, error) {
// handling dynamic messages (they don't have data fields as generated proto messages)
if dynMessage, ok := obj.(*dynamicpb.Message); ok {
var err error
obj, err = dynamicMessageToGeneratedMessage(dynMessage, messageGoType)
if err != nil {
return "", err
}
}

// execute name template on generated proto message
var s strings.Builder
if err := tmpl.Execute(&s, obj); err != nil {
return "", err
Expand Down
75 changes: 61 additions & 14 deletions pkg/models/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,20 @@ var (
// LocalRegistry defines model registry for managing registered local models. Local models are locally compiled into
// the program binary and hence some additional information in compare to remote models, i.e. go type.
type LocalRegistry struct {
registeredTypes map[reflect.Type]*LocallyKnownModel
modelNames map[string]*LocallyKnownModel
ordered []reflect.Type
proxied *RemoteRegistry
registeredModelsByGoType map[reflect.Type]*LocallyKnownModel
registeredModelsByProtoName map[string]*LocallyKnownModel
modelNames map[string]*LocallyKnownModel
ordered []reflect.Type
proxied *RemoteRegistry
}

// NewRegistry returns initialized Registry.
func NewRegistry() *LocalRegistry {
return &LocalRegistry{
registeredTypes: make(map[reflect.Type]*LocallyKnownModel),
modelNames: make(map[string]*LocallyKnownModel),
proxied: NewRemoteRegistry(),
registeredModelsByGoType: make(map[reflect.Type]*LocallyKnownModel),
registeredModelsByProtoName: make(map[string]*LocallyKnownModel),
modelNames: make(map[string]*LocallyKnownModel),
proxied: NewRemoteRegistry(),
}
}

Expand All @@ -66,22 +68,59 @@ func (r *LocalRegistry) GetModel(name string) (KnownModel, error) {

// GetModelFor returns registered model for the given proto message.
func (r *LocalRegistry) GetModelFor(x interface{}) (KnownModel, error) {
// find model by Go type
t := reflect.TypeOf(x)
model, found := r.registeredTypes[t]
model, found := r.registeredModelsByGoType[t]
if !found {
// check remotely retrieved models registered in local registry
if proxModel, err := r.proxied.GetModelFor(x); err == nil {
return proxModel, nil
}

// find model by Proto name
// (useful when using dynamically generated config instead of configurator.Config => go type of proto
// messages is in such case always dynamicpb.Message and never the go type of registered (generated)
// proto message)
if len(r.registeredModelsByProtoName) == 0 && len(r.registeredModelsByGoType) > 0 {
r.lazyInitRegisteredTypesByProtoName()
}
var protoName string
if pb, ok := x.(protoreflect.ProtoMessage); ok {
protoName = string(pb.ProtoReflect().Descriptor().FullName())
} else if v1, ok := x.(proto.Message); ok {
protoName = string(proto.MessageV2(v1).ProtoReflect().Descriptor().FullName())
}
if protoName != "" {
if model, found = r.registeredModelsByProtoName[protoName]; found {
return model, nil
}
}

// find model by checking proto options
if model = r.checkProtoOptions(x); model == nil {
return &LocallyKnownModel{}, fmt.Errorf("no model registered for type %v", t)
}
}
return model, nil
}

// lazyInitRegisteredTypesByProtoName performs lazy initialization of registeredModelsByProtoName. The reason
// why initialization can't happen while registration (call of func Register(...)) is that some proto reflect
// functionality is not available during this time. The registration happens as variable initialization, but
// the reflection is initialized in init() func and that happens after variable initialization.
//
// Alternative solution would be to change when the models are registered (VPP-Agent have it like described
// above and 3rd party model are probably copying the same behaviour). So to not break anything, the lazy
// initialization seems like the best solution for now.
func (r *LocalRegistry) lazyInitRegisteredTypesByProtoName() {
for _, model := range r.registeredModelsByGoType {
r.registeredModelsByProtoName[model.ProtoName()] = model // ProtoName() == ProtoReflect().Descriptor().FullName()
}
}

// GetModelForKey returns registered model for the given key or error.
func (r *LocalRegistry) GetModelForKey(key string) (KnownModel, error) {
for _, model := range r.registeredTypes {
for _, model := range r.registeredModelsByGoType {
if model.IsKeyValid(key) {
return model, nil
}
Expand All @@ -96,7 +135,7 @@ func (r *LocalRegistry) GetModelForKey(key string) (KnownModel, error) {
func (r *LocalRegistry) RegisteredModels() []KnownModel {
var models []KnownModel
for _, typ := range r.ordered {
models = append(models, r.registeredTypes[typ])
models = append(models, r.registeredModelsByGoType[typ])
}
models = append(models, r.proxied.RegisteredModels()...)
return models
Expand All @@ -109,7 +148,7 @@ func (r *LocalRegistry) MessageTypeRegistry() *protoregistry.Types {
typeRegistry.RegisterMessage(dynamicpb.NewMessageType(model.proto.ProtoReflect().Descriptor()))
}
proxiedTypes := r.proxied.MessageTypeRegistry()
proxiedTypes.RangeMessages(func (mt protoreflect.MessageType) bool {
proxiedTypes.RangeMessages(func(mt protoreflect.MessageType) bool {
typeRegistry.RegisterMessage(mt)
return true
})
Expand All @@ -135,7 +174,7 @@ func (r *LocalRegistry) Register(x interface{}, spec Spec, opts ...ModelOption)

goType := reflect.TypeOf(x)
// Check go type duplicate registration
if m, ok := r.registeredTypes[goType]; ok {
if m, ok := r.registeredModelsByGoType[goType]; ok {
return nil, fmt.Errorf("go type %v already registered for model %v", goType, m.Name())
}

Expand Down Expand Up @@ -173,7 +212,15 @@ func (r *LocalRegistry) Register(x interface{}, spec Spec, opts ...ModelOption)

// Use GetName as fallback for generating name
if _, ok := x.(named); ok {
model.nameFunc = func(obj interface{}) (s string, e error) {
model.nameFunc = func(obj interface{}, messageGoType reflect.Type) (s string, e error) {
// handling dynamic messages (they don't implement named interface)
if dynMessage, ok := obj.(*dynamicpb.Message); ok {
obj, e = dynamicMessageToGeneratedMessage(dynMessage, messageGoType)
if e != nil {
return "", e
}
}
// handling other proto message
return obj.(named).GetName(), nil
}
model.nameTemplate = namedTemplate
Expand All @@ -184,7 +231,7 @@ func (r *LocalRegistry) Register(x interface{}, spec Spec, opts ...ModelOption)
opt(&model.modelOptions)
}

r.registeredTypes[goType] = model
r.registeredModelsByGoType[goType] = model
r.modelNames[model.Name()] = model
r.ordered = append(r.ordered, goType)

Expand Down
4 changes: 2 additions & 2 deletions pkg/models/remote_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"google.golang.org/protobuf/types/dynamicpb"
)

// RemotelyKnownModel represents a registered remnote model (remote model has only information about model
// RemotelyKnownModel represents a registered remote model (remote model has only information about model
// from remote source, i.e. missing go type because VPP-Agent meta service doesn't provide it)
type RemotelyKnownModel struct {
model *ModelInfo
Expand Down Expand Up @@ -155,7 +155,7 @@ func (m *RemotelyKnownModel) InstanceName(x interface{}) (string, error) {
return "", errors.Errorf("can't load json of marshalled "+
"message to generic map due to: %v (json=%v)", err, jsonData)
}
name, err := NameTemplate(nameTemplate)(mapData)
name, err := NameTemplate(nameTemplate)(mapData, nil)
if err != nil {
return "", errors.Errorf("can't compute name from name template by applying generic map "+
"due to: %v (name template=%v, generic map=%v)", err, nameTemplate, mapData)
Expand Down
Loading