Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/orb 851 unlink removed sink from dataset and inactivate if in case #887

Merged
merged 11 commits into from
Mar 9, 2022
11 changes: 10 additions & 1 deletion cmd/policies/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func main() {
go startHTTPServer(tracer, svc, svcCfg, logger, errs)
go startGRPCServer(svc, tracer, policiesGRPCCfg, logger, errs)
go subscribeToFleetES(svc, esClient, esCfg, logger)
go subscribeToSinksES(svc, esClient, esCfg, logger)

go func() {
c := make(chan os.Signal)
Expand Down Expand Up @@ -275,7 +276,15 @@ func startGRPCServer(svc policies.Service, tracer opentracing.Tracer, cfg config
func subscribeToFleetES(svc policies.Service, client *r.Client, cfg config.EsConfig, logger *zap.Logger) {
eventStore := rediscon.NewEventStore(svc, client, cfg.Consumer, logger)
logger.Info("Subscribed to Redis Event Store for agent groups")
if err := eventStore.Subscribe(context.Background()); err != nil {
if err := eventStore.SubscribeToFleet(context.Background()); err != nil {
logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
}
}

func subscribeToSinksES(svc policies.Service, client *r.Client, cfg config.EsConfig, logger *zap.Logger) {
eventStore := rediscon.NewEventStore(svc, client, cfg.Consumer, logger)
logger.Info("Subscribed to Redis Event Store for sinks")
if err := eventStore.SubscribeToSink(context.Background()); err != nil {
logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
}
}
28 changes: 28 additions & 0 deletions policies/api/http/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,34 @@ func (l loggingMiddleware) ListDatasets(ctx context.Context, token string, pm po
return l.svc.ListDatasets(ctx, token, pm)
}

func (l loggingMiddleware) InactivateDatasetBySinkID(ctx context.Context, sinkID string, token string) (err error) {
defer func(begin time.Time) {
if err != nil {
l.logger.Warn("method call: inactivate_dataset_by_sink_id",
zap.Error(err),
zap.Duration("duration", time.Since(begin)))
} else {
l.logger.Info("method call: inactivate_dataset_by_sink_id",
zap.Duration("duration", time.Since(begin)))
}
}(time.Now())
return l.svc.InactivateDatasetBySinkID(ctx, sinkID, token)
}

func (l loggingMiddleware) DeleteSinkFromDataset(ctx context.Context, sinkID string, token string) (err error) {
defer func(begin time.Time) {
if err != nil {
l.logger.Warn("method call: delete_sink_from_dataset",
zap.Error(err),
zap.Duration("duration", time.Since(begin)))
} else {
l.logger.Info("method call: delete_sink_from_dataset",
zap.Duration("duration", time.Since(begin)))
}
}(time.Now())
return l.svc.DeleteSinkFromDataset(ctx, sinkID, token)
}

func NewLoggingMiddleware(svc policies.Service, logger *zap.Logger) policies.Service {
return &loggingMiddleware{logger, svc}
}
44 changes: 44 additions & 0 deletions policies/api/http/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,50 @@ func (m metricsMiddleware) ListDatasets(ctx context.Context, token string, pm po
return m.svc.ListDatasets(ctx, token, pm)
}

func (m metricsMiddleware) InactivateDatasetBySinkID(ctx context.Context, sinkID string, token string) error {
ownerID, err := m.identify(token)
if err != nil {
return err
}

defer func(begin time.Time) {
labels := []string{
"method", "inactivateDatasetByGroupID",
"owner_id", ownerID,
"policy_id", "",
"dataset_id", "",
}

m.counter.With(labels...).Add(1)
m.latency.With(labels...).Observe(float64(time.Since(begin).Microseconds()))

}(time.Now())

return m.svc.InactivateDatasetBySinkID(ctx, sinkID, token)
}

func (m metricsMiddleware) DeleteSinkFromDataset(ctx context.Context, sinkID string, token string) error {
ownerID, err := m.identify(token)
if err != nil {
return err
}

defer func(begin time.Time) {
labels := []string{
"method", "inactivateDatasetByGroupID",
mclcavalcante marked this conversation as resolved.
Show resolved Hide resolved
"owner_id", ownerID,
"policy_id", "",
"dataset_id", "",
}

m.counter.With(labels...).Add(1)
m.latency.With(labels...).Observe(float64(time.Since(begin).Microseconds()))

}(time.Now())

return m.svc.DeleteSinkFromDataset(ctx, sinkID, token)
}

func (m metricsMiddleware) identify(token string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
41 changes: 41 additions & 0 deletions policies/mocks/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,47 @@ type mockPoliciesRepository struct {
gdb map[string][]policies.PolicyInDataset
}

func (m *mockPoliciesRepository) RetrieveAllDatasetsInternal(ctx context.Context, owner string) ([]policies.Dataset, error) {
var datasetList []policies.Dataset
id := uint64(0)
for _, d := range m.ddb {
if d.MFOwnerID == owner {
datasetList = append(datasetList, d)
}
id++
}

return datasetList, nil
}

func (m *mockPoliciesRepository) InactivateDatasetByID(ctx context.Context, sinkID string, ownerID string) error {
for _, ds := range m.ddb{
if ds.MFOwnerID == ownerID{
for _, sID := range ds.SinkIDs {
if sID == sinkID{
ds.Valid = false
}
}
}
}
return nil
}

func (m *mockPoliciesRepository) DeleteSinkFromDataset(ctx context.Context, sinkID string, ownerID string) error {
for _, ds := range m.ddb{
if ds.MFOwnerID == ownerID{
for i, sID := range ds.SinkIDs {
if sID == sinkID{
ds.SinkIDs[i] = ds.SinkIDs[len(ds.SinkIDs)-1]
ds.SinkIDs[len(ds.SinkIDs)-1] = ""
ds.SinkIDs = ds.SinkIDs[:len(ds.SinkIDs)-1]
}
}
}
}
return nil
}

func (m *mockPoliciesRepository) DeleteDataset(ctx context.Context, ownerID string, dsID string) error {
if _, ok := m.ddb[dsID]; ok {
if m.ddb[dsID].MFOwnerID != ownerID {
Expand Down
15 changes: 15 additions & 0 deletions policies/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ type Service interface {

// ListDatasets retrieve a list of Dataset by owner
ListDatasets(ctx context.Context, token string, pm PageMetadata) (PageDataset, error)

// InactivateDatasetBySinkID inactivate a dataset
InactivateDatasetBySinkID(ctx context.Context, sinkID string, token string) error

// DeleteSinkFromDataset removes a sink from a dataset
DeleteSinkFromDataset(ctx context.Context, sinkID string, token string) error
}

type Repository interface {
Expand Down Expand Up @@ -148,4 +154,13 @@ type Repository interface {

// RetrieveAllDatasetsByOwner retrieves the subset of Datasets owned by the specified user
RetrieveAllDatasetsByOwner(ctx context.Context, ownerID string, pm PageMetadata) (PageDataset, error)

// InactivateDatasetByID inactivate a dataset
InactivateDatasetByID(ctx context.Context, sinkID string, ownerID string) error
mclcavalcante marked this conversation as resolved.
Show resolved Hide resolved

// DeleteSinkFromDataset removes a sink from a dataset
DeleteSinkFromDataset(ctx context.Context, sinkID string, ownerID string) error

// RetrieveAllDatasetsInternal retrieves all datasets by owner
RetrieveAllDatasetsInternal(ctx context.Context, owner string) ([]Dataset, error)
}
40 changes: 40 additions & 0 deletions policies/policy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,43 @@ func (s policiesService) ListDatasets(ctx context.Context, token string, pm Page
}
return s.repo.RetrieveAllDatasetsByOwner(ctx, ownerID, pm)
}

func (s policiesService) InactivateDatasetBySinkID(ctx context.Context, sinkID string, token string) error {
ownerID, err := s.identify(token)
if err != nil {
return err
}

if sinkID == "" {
return ErrMalformedEntity
}

datasets, err := s.repo.RetrieveAllDatasetsInternal(ctx, ownerID)
mclcavalcante marked this conversation as resolved.
Show resolved Hide resolved
if err != nil{
return err
}

for _, ds := range datasets{
if len(ds.SinkIDs) == 1 && ds.SinkIDs[0] == sinkID{
err = s.repo.InactivateDatasetByID(ctx, ds.ID, ownerID)
}
if err != nil {
return err
}
}

return nil
}

func (s policiesService) DeleteSinkFromDataset(ctx context.Context, sinkID string, token string) error {
ownerID, err := s.identify(token)
if err != nil {
return err
}

if sinkID == "" {
return ErrMalformedEntity
}

return s.repo.DeleteSinkFromDataset(ctx, sinkID, ownerID)
}
Loading