Skip to content

Commit

Permalink
Feat/orb 851 unlink removed sink from dataset and inactivate if in ca…
Browse files Browse the repository at this point in the history
…se (#887)

* feat(dataset): invalidate dataset and unlink deleted sink

* feat(dataset): add unit tests to DeleteSinkFromDataset and InactivateDatasetBySinkID

* feat(dataset): fix dataset inactivate by sink ID

* feat(dataset): enhancement on invalidating datasets with no sinks linked

* feat(dataset): fix method name doesn't match in DeleteSinkFromAllDatasets metrics labels, fix method signature and name and add unit tests to inactivate dataset by id service

* feat(dataset): rename DeleteSinkFromAllDatasets to DeleteSinkFromAllDatasetsInternal method and InactivateDatasetById to InactivateDatasetByIdInternal method

* feat(dataset): fix order of InactivateDatasetByIDInternal arguments to be consistent with preexistings
  • Loading branch information
mclcavalcante authored Mar 9, 2022
1 parent b512980 commit b02295a
Show file tree
Hide file tree
Showing 14 changed files with 702 additions and 27 deletions.
11 changes: 10 additions & 1 deletion cmd/policies/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func main() {
go startHTTPServer(tracer, svc, svcCfg, logger, errs)
go startGRPCServer(svc, tracer, policiesGRPCCfg, logger, errs)
go subscribeToFleetES(svc, esClient, esCfg, logger)
go subscribeToSinksES(svc, esClient, esCfg, logger)

go func() {
c := make(chan os.Signal)
Expand Down Expand Up @@ -275,7 +276,15 @@ func startGRPCServer(svc policies.Service, tracer opentracing.Tracer, cfg config
func subscribeToFleetES(svc policies.Service, client *r.Client, cfg config.EsConfig, logger *zap.Logger) {
eventStore := rediscon.NewEventStore(svc, client, cfg.Consumer, logger)
logger.Info("Subscribed to Redis Event Store for agent groups")
if err := eventStore.Subscribe(context.Background()); err != nil {
if err := eventStore.SubscribeToFleet(context.Background()); err != nil {
logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
}
}

func subscribeToSinksES(svc policies.Service, client *r.Client, cfg config.EsConfig, logger *zap.Logger) {
eventStore := rediscon.NewEventStore(svc, client, cfg.Consumer, logger)
logger.Info("Subscribed to Redis Event Store for sinks")
if err := eventStore.SubscribeToSink(context.Background()); err != nil {
logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
}
}
28 changes: 28 additions & 0 deletions policies/api/http/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ type loggingMiddleware struct {
svc policies.Service
}

func (l loggingMiddleware) InactivateDatasetByIDInternal(ctx context.Context, ownerID string, datasetID string) (err error) {
defer func(begin time.Time) {
if err != nil {
l.logger.Warn("method call: inactivate_dataset_by_id_internal",
zap.Error(err),
zap.Duration("duration", time.Since(begin)))
} else {
l.logger.Info("method call: inactivate_dataset_by_id_internal",
zap.Duration("duration", time.Since(begin)))
}
}(time.Now())
return l.svc.InactivateDatasetByIDInternal(ctx, ownerID, datasetID)
}

func (l loggingMiddleware) ViewDatasetByIDInternal(ctx context.Context, ownerID string, datasetID string) (_ policies.Dataset, err error) {
defer func(begin time.Time) {
if err != nil {
Expand Down Expand Up @@ -256,6 +270,20 @@ func (l loggingMiddleware) ListDatasets(ctx context.Context, token string, pm po
return l.svc.ListDatasets(ctx, token, pm)
}

func (l loggingMiddleware) DeleteSinkFromAllDatasetsInternal(ctx context.Context, sinkID string, ownerID string) (ds []policies.Dataset, err error) {
defer func(begin time.Time) {
if err != nil {
l.logger.Warn("method call: delete_sink_from_all_datasets",
zap.Error(err),
zap.Duration("duration", time.Since(begin)))
} else {
l.logger.Info("method call: delete_sink_from_all_datasets",
zap.Duration("duration", time.Since(begin)))
}
}(time.Now())
return l.svc.DeleteSinkFromAllDatasetsInternal(ctx, sinkID, ownerID)
}

func NewLoggingMiddleware(svc policies.Service, logger *zap.Logger) policies.Service {
return &loggingMiddleware{logger, svc}
}
34 changes: 34 additions & 0 deletions policies/api/http/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ type metricsMiddleware struct {
svc policies.Service
}

func (m metricsMiddleware) InactivateDatasetByIDInternal(ctx context.Context, ownerID string, datasetID string) error {
defer func(begin time.Time) {
labels := []string{
"method", "inactivateDatasetByIDInternal",
"owner_id", ownerID,
"policy_id", "",
"dataset_id", datasetID,
}

m.counter.With(labels...).Add(1)
m.latency.With(labels...).Observe(float64(time.Since(begin).Microseconds()))

}(time.Now())

return m.svc.InactivateDatasetByIDInternal(ctx, ownerID, datasetID)
}

func (m metricsMiddleware) ViewDatasetByIDInternal(ctx context.Context, ownerID string, datasetID string) (policies.Dataset, error) {
defer func(begin time.Time) {
labels := []string{
Expand Down Expand Up @@ -376,6 +393,23 @@ func (m metricsMiddleware) ListDatasets(ctx context.Context, token string, pm po
return m.svc.ListDatasets(ctx, token, pm)
}

func (m metricsMiddleware) DeleteSinkFromAllDatasetsInternal(ctx context.Context, sinkID string, ownerID string) ([]policies.Dataset, error) {
defer func(begin time.Time) {
labels := []string{
"method", "deleteSinkFromAllDatasetsInternal",
"owner_id", ownerID,
"policy_id", "",
"dataset_id", "",
}

m.counter.With(labels...).Add(1)
m.latency.With(labels...).Observe(float64(time.Since(begin).Microseconds()))

}(time.Now())

return m.svc.DeleteSinkFromAllDatasetsInternal(ctx, sinkID, ownerID)
}

func (m metricsMiddleware) identify(token string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
45 changes: 45 additions & 0 deletions policies/mocks/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,51 @@ type mockPoliciesRepository struct {
gdb map[string][]policies.PolicyInDataset
}

func (m *mockPoliciesRepository) RetrieveAllDatasetsInternal(ctx context.Context, owner string) ([]policies.Dataset, error) {
var datasetList []policies.Dataset
id := uint64(0)
for _, d := range m.ddb {
if d.MFOwnerID == owner {
datasetList = append(datasetList, d)
}
id++
}

return datasetList, nil
}

func (m *mockPoliciesRepository) InactivateDatasetByID(ctx context.Context, sinkID string, ownerID string) error {
for _, ds := range m.ddb{
if ds.MFOwnerID == ownerID{
for _, sID := range ds.SinkIDs {
if sID == sinkID{
ds.Valid = false
}
}
}
}
return nil
}

func (m *mockPoliciesRepository) DeleteSinkFromAllDatasets(ctx context.Context, sinkID string, ownerID string) ([]policies.Dataset, error) {
var datasets []policies.Dataset

for _, ds := range m.ddb{
if ds.MFOwnerID == ownerID{
for i, sID := range ds.SinkIDs {
if sID == sinkID{
ds.SinkIDs[i] = ds.SinkIDs[len(ds.SinkIDs)-1]
ds.SinkIDs[len(ds.SinkIDs)-1] = ""
ds.SinkIDs = ds.SinkIDs[:len(ds.SinkIDs)-1]

datasets = append(datasets, ds)
}
}
}
}
return datasets, nil
}

func (m *mockPoliciesRepository) DeleteDataset(ctx context.Context, ownerID string, dsID string) error {
if _, ok := m.ddb[dsID]; ok {
if m.ddb[dsID].MFOwnerID != ownerID {
Expand Down
12 changes: 12 additions & 0 deletions policies/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ type Service interface {

// ListDatasets retrieve a list of Dataset by owner
ListDatasets(ctx context.Context, token string, pm PageMetadata) (PageDataset, error)

// InactivateDatasetByIDInternal inactivate a dataset
InactivateDatasetByIDInternal(ctx context.Context, ownerID string, datasetID string) error

// DeleteSinkFromAllDatasetsInternal removes a sink from a dataset
DeleteSinkFromAllDatasetsInternal(ctx context.Context, sinkID string, ownerID string) ([]Dataset, error)
}

type Repository interface {
Expand Down Expand Up @@ -148,4 +154,10 @@ type Repository interface {

// RetrieveAllDatasetsByOwner retrieves the subset of Datasets owned by the specified user
RetrieveAllDatasetsByOwner(ctx context.Context, ownerID string, pm PageMetadata) (PageDataset, error)

// InactivateDatasetByID inactivate a dataset
InactivateDatasetByID(ctx context.Context, sinkID string, ownerID string) error

// DeleteSinkFromAllDatasets removes a sink from a dataset
DeleteSinkFromAllDatasets(ctx context.Context, sinkID string, ownerID string) ([]Dataset, error)
}
26 changes: 26 additions & 0 deletions policies/policy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,29 @@ func (s policiesService) ListDatasets(ctx context.Context, token string, pm Page
}
return s.repo.RetrieveAllDatasetsByOwner(ctx, ownerID, pm)
}

func (s policiesService) DeleteSinkFromAllDatasetsInternal(ctx context.Context, sinkID string, ownerID string) ([]Dataset, error) {
if sinkID == "" || ownerID == ""{
return []Dataset{}, ErrMalformedEntity
}

datasets, err := s.repo.DeleteSinkFromAllDatasets(ctx, sinkID, ownerID)
if err != nil {
return []Dataset{}, err
}

return datasets, nil
}

func (s policiesService) InactivateDatasetByIDInternal(ctx context.Context, ownerID string, datasetID string) error {
if datasetID == "" || ownerID == ""{
return ErrMalformedEntity
}

err := s.repo.InactivateDatasetByID(ctx, datasetID, ownerID)
if err != nil {
return errors.Wrap(ErrInactivateDataset, err)
}

return nil
}
Loading

0 comments on commit b02295a

Please sign in to comment.