Skip to content

Commit

Permalink
Fix/1005 orb remove deleted group from dataset db (#1029)
Browse files Browse the repository at this point in the history
* fix(dataset): remove agent group from dataset  db when it gets invalid because of group deletion
  • Loading branch information
mclcavalcante authored and gpazuch committed Apr 12, 2022
1 parent c372573 commit f0e6802
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 11 deletions.
14 changes: 14 additions & 0 deletions policies/api/http/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,20 @@ func (l loggingMiddleware) DeleteSinkFromAllDatasetsInternal(ctx context.Context
return l.svc.DeleteSinkFromAllDatasetsInternal(ctx, sinkID, ownerID)
}

func (l loggingMiddleware) DeleteAgentGroupFromAllDatasets(ctx context.Context, groupID string, token string) (err error) {
defer func(begin time.Time) {
if err != nil {
l.logger.Warn("method call: delete_agent_group_from_all_datasets",
zap.Error(err),
zap.Duration("duration", time.Since(begin)))
} else {
l.logger.Info("method call: delete_agent_group_from_all_datasets",
zap.Duration("duration", time.Since(begin)))
}
}(time.Now())
return l.svc.DeleteAgentGroupFromAllDatasets(ctx, groupID, token)
}

func NewLoggingMiddleware(svc policies.Service, logger *zap.Logger) policies.Service {
return &loggingMiddleware{logger, svc}
}
22 changes: 22 additions & 0 deletions policies/api/http/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,28 @@ func (m metricsMiddleware) DeleteSinkFromAllDatasetsInternal(ctx context.Context
return m.svc.DeleteSinkFromAllDatasetsInternal(ctx, sinkID, ownerID)
}

func (m metricsMiddleware) DeleteAgentGroupFromAllDatasets(ctx context.Context, groupID string, token string) error {
ownerID, err := m.identify(token)
if err != nil {
return err
}

defer func(begin time.Time) {
labels := []string{
"method", "deleteAgentGroupFromAllDatasets",
"owner_id", ownerID,
"policy_id", "",
"dataset_id", "",
}

m.counter.With(labels...).Add(1)
m.latency.With(labels...).Observe(float64(time.Since(begin).Microseconds()))

}(time.Now())

return m.svc.DeleteAgentGroupFromAllDatasets(ctx, groupID, token)
}

func (m metricsMiddleware) identify(token string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
11 changes: 11 additions & 0 deletions policies/mocks/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,14 @@ func (m *mockPoliciesRepository) RetrieveAllDatasetsByOwner(ctx context.Context,
}
return pageDataset, nil
}

func (m *mockPoliciesRepository) DeleteAgentGroupFromAllDatasets(ctx context.Context, groupID string, ownerID string) error {
for _, ds := range m.ddb{
if ds.MFOwnerID == ownerID{
if ds.AgentGroupID == groupID{
ds.AgentGroupID = ""
}
}
}
return nil
}
6 changes: 6 additions & 0 deletions policies/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type Service interface {

// DeleteSinkFromAllDatasetsInternal removes a sink from a dataset
DeleteSinkFromAllDatasetsInternal(ctx context.Context, sinkID string, ownerID string) ([]Dataset, error)

// DeleteAgentGroupFromAllDatasets removes an agent group from a dataset
DeleteAgentGroupFromAllDatasets(ctx context.Context, groupID string, token string) error
}

type Repository interface {
Expand Down Expand Up @@ -165,4 +168,7 @@ type Repository interface {

// ActivateDatasetByID Activate a dataset
ActivateDatasetByID(ctx context.Context, datasetID string, ownerID string) error

// DeleteAgentGroupFromAllDatasets removes agent group from a dataset
DeleteAgentGroupFromAllDatasets(ctx context.Context, groupID string, ownerID string) error
}
22 changes: 20 additions & 2 deletions policies/policy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (s policiesService) ListDatasets(ctx context.Context, token string, pm Page
}

func (s policiesService) DeleteSinkFromAllDatasetsInternal(ctx context.Context, sinkID string, ownerID string) ([]Dataset, error) {
if sinkID == "" || ownerID == ""{
if sinkID == "" || ownerID == "" {
return []Dataset{}, ErrMalformedEntity
}

Expand All @@ -341,7 +341,7 @@ func (s policiesService) DeleteSinkFromAllDatasetsInternal(ctx context.Context,
}

func (s policiesService) InactivateDatasetByIDInternal(ctx context.Context, ownerID string, datasetID string) error {
if datasetID == "" || ownerID == ""{
if datasetID == "" || ownerID == "" {
return ErrMalformedEntity
}

Expand Down Expand Up @@ -401,5 +401,23 @@ func (s policiesService) validateDatasetAgentGroup(ctx context.Context, ownerID
if err != nil {
return errors.Wrap(errors.New("agent group id does not exist"), err)
}
return nil
}

func (s policiesService) DeleteAgentGroupFromAllDatasets(ctx context.Context, groupID string, token string) error {
ownerID, err := s.identify(token)
if err != nil {
return err
}

if groupID == "" {
return ErrMalformedEntity
}

err = s.repo.DeleteAgentGroupFromAllDatasets(ctx, groupID, ownerID)
if err != nil {
return err
}

return nil
}
91 changes: 82 additions & 9 deletions policies/policy_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,19 +306,19 @@ func TestValidatePolicy(t *testing.T) {
svc := newService(users)

cases := map[string]struct {
policy policies.Policy
token string
err error
policy policies.Policy
token string
err error
}{
"validate a new policy": {
policy: policy,
token: token,
err: nil,
policy: policy,
token: token,
err: nil,
},
"validate a policy with a invalid token": {
policy: policy,
token: invalidToken,
err: policies.ErrUnauthorizedAccess,
policy: policy,
token: invalidToken,
err: policies.ErrUnauthorizedAccess,
},
}

Expand Down Expand Up @@ -1278,6 +1278,79 @@ func TestInactivateDatasetByID(t *testing.T) {
}
}

func TestDeleteAGroupFromDataset(t *testing.T) {
users := flmocks.NewAuthService(map[string]string{token: email})
svc := newService(users)

agentGroupID, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("Unexpected error: %s", err))

policy := createPolicy(t, svc, "policy")

var total = 10

datasetsTest := make([]policies.Dataset, total)

sinkIDs := make([]string, 0)
for i := 0; i < total; i++ {
ID, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("Unexpected error: %s", err))

for i := 0; i < 2; i++ {
sinkID, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("Unexpected error: %s", err))
sinkIDs = append(sinkIDs, sinkID.String())
}

validName, err := types.NewIdentifier(fmt.Sprintf("dataset-%d", i))
require.Nil(t, err, fmt.Sprintf("Unexpected error: %s", err))

dataset := policies.Dataset{
ID: ID.String(),
Name: validName,
PolicyID: policy.ID,
AgentGroupID: agentGroupID.String(),
SinkIDs: sinkIDs,
}

ds, err := svc.AddDataset(context.Background(), token, dataset)
if err != nil {
require.Nil(t, err, fmt.Sprintf("Unexpected error: %s", err))
}

datasetsTest[i] = ds
}

cases := map[string]struct {
token string
aGroup string
err error
}{
"delete agent group of a set of datasets": {
aGroup: agentGroupID.String(),
token: token,
err: nil,
},
"delete agent group of a set of datasets with empty agent group ID": {
aGroup: "",
token: token,
err: policies.ErrMalformedEntity,
},
"delete agent group of a set of datasets with empty owner": {
aGroup: agentGroupID.String(),
token: "wrong",
err: policies.ErrUnauthorizedAccess,
},
}

for desc, tc := range cases {
t.Run(desc, func(t *testing.T) {
err := svc.DeleteAgentGroupFromAllDatasets(context.Background(), tc.aGroup, tc.token)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s", desc, tc.err, err))
})
}
}

func testSortPolicies(t *testing.T, pm policies.PageMetadata, ags []policies.Policy) {
t.Helper()
switch pm.Order {
Expand Down
105 changes: 105 additions & 0 deletions policies/postgres/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,111 @@ func TestActivateDatasetByID(t *testing.T) {
}
}

func TestDeleteAgentGroupFromDataset(t *testing.T) {
dbMiddleware := postgres.NewDatabase(db)
repo := postgres.NewPoliciesRepository(dbMiddleware, logger)

oID, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

oID2, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

wrongAGroupID, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

groupID, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

policyID, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

sinkIDs := make([]string, 2)
for i := 0; i < 2; i++ {
sinkID, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
sinkIDs[i] = sinkID.String()
}

nameID, err := types.NewIdentifier("mydataset")
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

nameID2, err := types.NewIdentifier("mydataset2")
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

dataset := policies.Dataset{
Name: nameID,
MFOwnerID: oID.String(),
Valid: true,
AgentGroupID: groupID.String(),
PolicyID: policyID.String(),
SinkIDs: sinkIDs,
Metadata: types.Metadata{"testkey": "testvalue"},
Created: time.Time{},
}

dataset2 := dataset
dataset2.Name = nameID2
dataset2.MFOwnerID = oID2.String()

dsID, err := repo.SaveDataset(context.Background(), dataset)
require.Nil(t, err, fmt.Sprintf("Unexpected error: %s", err))

dataset.ID = dsID

dsID2, err := repo.SaveDataset(context.Background(), dataset2)
require.Nil(t, err, fmt.Sprintf("Unexpected error: %s", err))

dataset2.ID = dsID2

cases := map[string]struct {
owner string
groupID string
contains bool
dataset policies.Dataset
err error
}{
"delete a agent group from existing dataset": {
owner: dataset.MFOwnerID,
groupID: dataset.AgentGroupID,
contains: false,
dataset: dataset,
err: nil,
},
"delete a non-existing agent group from a dataset": {
owner: dataset.MFOwnerID,
groupID: wrongAGroupID.String(),
contains: false,
dataset: dataset,
err: nil,
},
"delete a agent group from a dataset with an invalid ownerID": {
groupID: dataset2.AgentGroupID,
owner: "",
contains: true,
dataset: dataset2,
err: errors.ErrMalformedEntity,
},
}

for desc, tc := range cases {
t.Run(desc, func(t *testing.T) {
err := repo.DeleteAgentGroupFromAllDatasets(context.Background(), tc.groupID, tc.owner)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected '%s' got '%s'", desc, tc.err, err))

d, err := repo.RetrieveDatasetByID(context.Background(), tc.dataset.ID, tc.dataset.MFOwnerID)
require.Nil(t, err, fmt.Sprintf("Unexpected error: %s", err))

switch tc.contains {
case false:
assert.NotEqual(t, d.AgentGroupID, tc.groupID, fmt.Sprintf("%s: expected '%s' to not contains '%s'", desc, d.AgentGroupID, tc.groupID))
case true:
assert.Equal(t, d.AgentGroupID, tc.groupID, fmt.Sprintf("%s: expected '%s' to contains '%s'", desc, d.AgentGroupID, tc.groupID))
}
})
}
}

func testSortDataset(t *testing.T, pm policies.PageMetadata, ags []policies.Dataset) {
t.Helper()
switch pm.Order {
Expand Down
29 changes: 29 additions & 0 deletions policies/postgres/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,35 @@ func (r policiesRepository) ActivateDatasetByID(ctx context.Context, id string,
return nil
}

func (r policiesRepository) DeleteAgentGroupFromAllDatasets(ctx context.Context, groupID string, ownerID string) error {
q := `UPDATE datasets SET agent_group_id = null WHERE mf_owner_id = :mf_owner_id AND agent_group_id = :agent_group_id`

if ownerID == "" {
return errors.ErrMalformedEntity
}

params := map[string]interface{}{
"mf_owner_id": ownerID,
"agent_group_id": groupID,
}

res, err := r.db.NamedQueryContext(ctx, q, params)
if err != nil {
pqErr, ok := err.(*pq.Error)
if ok {
switch pqErr.Code.Name() {
case db.ErrInvalid, db.ErrTruncation:
return errors.Wrap(policies.ErrMalformedEntity, err)
}
}
return errors.Wrap(errors.ErrSelectEntity, err)
}

defer res.Close()

return nil
}

type dbPolicy struct {
ID string `db:"id"`
Name types.Identifier `db:"name"`
Expand Down
5 changes: 5 additions & 0 deletions policies/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (es eventStore) handleAgentGroupRemove(ctx context.Context, groupID string,
if err != nil {
return err
}

err = es.policiesService.DeleteAgentGroupFromAllDatasets(ctx, groupID, token)
if err != nil {
return err
}
return nil
}

Expand Down
Loading

0 comments on commit f0e6802

Please sign in to comment.