Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change view to get mix of agents #915

Merged
merged 5 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions agent/cloud_config/cloud_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (cc *cloudConfigManager) autoProvision(apiAddress string, token string) (co
}

type AgentReq struct {
Name string `json:"name"`
Name string `json:"name"`
AgentTags map[string]string `json:"agent_tags"`
}

aname := cc.config.OrbAgent.Cloud.Config.AgentName
Expand All @@ -133,7 +134,7 @@ func (cc *cloudConfigManager) autoProvision(apiAddress string, token string) (co
aname = hostname
}

agentReq := AgentReq{Name: strings.Replace(aname, ".", "-", -1)}
agentReq := AgentReq{Name: strings.Replace(aname, ".", "-", -1), AgentTags: cc.config.OrbAgent.Tags}
body, err := json.Marshal(agentReq)
if err != nil {
return config.MQTTConfig{}, err
Expand Down
3 changes: 2 additions & 1 deletion fleet/agent_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ var (
)

func (svc fleetService) addAgentToAgentGroupChannels(token string, a Agent) error {
// first we get the agent group to connect the new agent to the correct group channel
groupList, err := svc.agentGroupRepository.RetrieveAllByAgent(context.Background(), a)
if err != nil {
return err
}

if len(groupList) == 0 {
return nil
}
Expand Down Expand Up @@ -160,6 +160,7 @@ func (svc fleetService) CreateAgent(ctx context.Context, token string, a Agent)
// TODO should we roll back?
svc.logger.Error("failed to add agent to a existing group channel", zap.String("agent_id", a.MFThingID), zap.Error(err))
}

return a, nil
}

Expand Down
1 change: 1 addition & 0 deletions fleet/api/http/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func addAgentEndpoint(svc fleet.Service) endpoint.Endpoint {
agent := fleet.Agent{
Name: nID,
OrbTags: req.OrbTags,
AgentTags: req.AgentTags,
}
saved, err := svc.CreateAgent(c, req.token, agent)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions fleet/api/http/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ func (req updateAgentGroupReq) validate() error {
}

type addAgentReq struct {
token string
Name string `json:"name,omitempty"`
OrbTags types.Tags `json:"orb_tags,omitempty"`
token string
Name string `json:"name,omitempty"`
OrbTags types.Tags `json:"orb_tags,omitempty"`
AgentTags types.Tags `json:"agent_tags,omitempty"`
}

func (req addAgentReq) validate() error {
Expand Down
14 changes: 12 additions & 2 deletions fleet/postgres/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,10 @@ func (r agentRepository) RetrieveByIDWithChannel(ctx context.Context, thingID st
}

func (r agentRepository) Save(ctx context.Context, agent fleet.Agent) error {

tx, err := r.db.BeginTxx(ctx, nil)
dscabral marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
q := `INSERT INTO agents (name, mf_thing_id, mf_owner_id, mf_channel_id, orb_tags, agent_tags, agent_metadata, state)
VALUES (:name, :mf_thing_id, :mf_owner_id, :mf_channel_id, :orb_tags, :agent_tags, :agent_metadata, :state)`

Expand All @@ -328,20 +331,27 @@ func (r agentRepository) Save(ctx context.Context, agent fleet.Agent) error {
return errors.Wrap(db.ErrSaveDB, err)
}

_, err = r.db.NamedExecContext(ctx, q, dba)
_, err = tx.NamedExecContext(ctx, q, dba)
if err != nil {
pqErr, ok := err.(*pq.Error)
if ok {
switch pqErr.Code.Name() {
case db.ErrInvalid, db.ErrTruncation:
tx.Rollback()
return errors.Wrap(errors.ErrMalformedEntity, err)
case db.ErrDuplicate:
tx.Rollback()
return errors.Wrap(errors.ErrConflict, err)
}
}
tx.Rollback()
return errors.Wrap(db.ErrSaveDB, err)
}

if err = tx.Commit(); err != nil {
return err
}

return nil

}
Expand Down
4 changes: 2 additions & 2 deletions fleet/postgres/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func migrateDB(db *sqlx.DB) error {
)`,
`CREATE INDEX ON agent_groups (mf_owner_id)`,
`CREATE INDEX ON agent_groups USING gin (tags)`,
`CREATE VIEW agent_group_membership(agent_groups_id, agent_groups_name, agent_mf_thing_id, agent_mf_channel_id, group_mf_channel_id, mf_owner_id, agent_state) as
`CREATE or REPLACE VIEW agent_group_membership(agent_groups_id, agent_groups_name, agent_mf_thing_id, agent_mf_channel_id, group_mf_channel_id, mf_owner_id, agent_state) as
dscabral marked this conversation as resolved.
Show resolved Hide resolved
SELECT agent_groups.id,
agent_groups.name,
agents.mf_thing_id,
Expand All @@ -93,7 +93,7 @@ func migrateDB(db *sqlx.DB) error {
FROM agents,
agent_groups
WHERE agent_groups.mf_owner_id = agents.mf_owner_id
AND (agent_groups.tags <@ agents.agent_tags OR agent_groups.tags <@ agents.orb_tags)`,
AND (agent_groups.tags <@ coalesce(agents.agent_tags || agents.orb_tags, agents.agent_tags, agents.orb_tags))`,
},
Down: []string{
"DROP TABLE agents",
Expand Down