From 1bfe364d65a4ea96c6b397bdaa567fa3ed8e4d59 Mon Sep 17 00:00:00 2001 From: miagilepner Date: Fri, 24 Jan 2025 16:06:01 +0100 Subject: [PATCH] VAULT-31907: Entity loading speedup (#29326) * perf improvements for loading entities in unseal * lint * changelog * abort on error * update to defer --- changelog/29326.txt | 3 + vault/identity_store_util.go | 176 +++++++++++++++++++++++------------ 2 files changed, 121 insertions(+), 58 deletions(-) create mode 100644 changelog/29326.txt diff --git a/changelog/29326.txt b/changelog/29326.txt new file mode 100644 index 000000000000..f0f38b847358 --- /dev/null +++ b/changelog/29326.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core/identity: Improve performance of loading entities when unsealing by batching updates, caching local alias storage reads, and doing more work in parallel. +``` diff --git a/vault/identity_store_util.go b/vault/identity_store_util.go index b5f83d4f49b8..9418b41d6fff 100644 --- a/vault/identity_store_util.go +++ b/vault/identity_store_util.go @@ -13,6 +13,7 @@ import ( "time" metrics "github.com/armon/go-metrics" + "github.com/golang/protobuf/ptypes" "github.com/hashicorp/errwrap" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-secure-stdlib/strutil" @@ -33,6 +34,7 @@ import ( var ( errCycleDetectedPrefix = "cyclic relationship detected for member group ID" tmpSuffix = ".tmp" + entityLoadingTxMaxSize = 1024 ) // loadIdentityStoreArtifacts is responsible for loading entities, groups, and aliases from storage into MemDB. @@ -393,10 +395,10 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error { // create a slice of result channels, one for each bucket. We need each result // and err chan to be 1 buffered so we can leave a result there even if the // processing loop is blocking on an earlier bucket still. - results := make([]chan *storagepacker.Bucket, len(existing)) + results := make([]chan []*identity.Entity, len(existing)) errs := make([]chan error, len(existing)) for j := range existing { - results[j] = make(chan *storagepacker.Bucket, 1) + results[j] = make(chan []*identity.Entity, 1) errs[j] = make(chan error, 1) } @@ -424,8 +426,18 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error { continue } + items := make([]*identity.Entity, len(bucket.Items)) + for j, item := range bucket.Items { + entity, err := i.parseEntityFromBucketItem(ctx, item) + if err != nil { + errs[idx] <- err + continue + } + items[j] = entity + } + // Write results out to the result channel - results[idx] <- bucket + results[idx] <- items // quit early case <-quit: @@ -453,6 +465,8 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error { close(broker) }() + localAliasBuckets := make(map[string]*storagepacker.Bucket) + // Restore each key by pulling from the result chan LOOP: for j := range existing { @@ -462,77 +476,89 @@ LOOP: close(quit) break LOOP - case bucket := <-results[j]: + case entities := <-results[j]: // If there is no entry, nothing to restore - if bucket == nil { + if entities == nil { continue } + load := func(entities []*identity.Entity) error { + tx := i.db.Txn(true) + defer tx.Abort() + upsertedItems := 0 + for _, entity := range entities { + if entity == nil { + continue + } - for _, item := range bucket.Items { - entity, err := i.parseEntityFromBucketItem(ctx, item) - if err != nil { - return err - } - if entity == nil { - continue - } - - ns, err := i.namespacer.NamespaceByID(ctx, entity.NamespaceID) - if err != nil { - return err - } - if ns == nil { - // Remove dangling entities - if !(i.localNode.ReplicationState().HasState(consts.ReplicationPerformanceSecondary) || i.localNode.HAState() == consts.PerfStandby) { - // Entity's namespace doesn't exist anymore but the - // entity from the namespace still exists. - i.logger.Warn("deleting entity and its any existing aliases", "name", entity.Name, "namespace_id", entity.NamespaceID) - err = i.entityPacker.DeleteItem(ctx, entity.ID) - if err != nil { - return err + ns, err := i.namespacer.NamespaceByID(ctx, entity.NamespaceID) + if err != nil { + return err + } + if ns == nil { + // Remove dangling entities + if !(i.localNode.ReplicationState().HasState(consts.ReplicationPerformanceSecondary) || i.localNode.HAState() == consts.PerfStandby) { + // Entity's namespace doesn't exist anymore but the + // entity from the namespace still exists. + i.logger.Warn("deleting entity and its any existing aliases", "name", entity.Name, "namespace_id", entity.NamespaceID) + err = i.entityPacker.DeleteItem(ctx, entity.ID) + if err != nil { + return err + } } + continue } - continue - } - nsCtx := namespace.ContextWithNamespace(ctx, ns) + nsCtx := namespace.ContextWithNamespace(ctx, ns) - // Ensure that there are no entities with duplicate names - entityByName, err := i.MemDBEntityByName(nsCtx, entity.Name, false) - if err != nil { - return nil - } - if err := i.conflictResolver.ResolveEntities(ctx, entityByName, entity); err != nil && !i.disableLowerCasedNames { - return err - } + // Ensure that there are no entities with duplicate names + entityByName, err := i.MemDBEntityByName(nsCtx, entity.Name, false) + if err != nil { + return nil + } + if err := i.conflictResolver.ResolveEntities(ctx, entityByName, entity); err != nil && !i.disableLowerCasedNames { + return err + } - mountAccessors := getAccessorsOnDuplicateAliases(entity.Aliases) + mountAccessors := getAccessorsOnDuplicateAliases(entity.Aliases) - if len(mountAccessors) > 0 { - i.logger.Warn("Entity has multiple aliases on the same mount(s)", "entity_id", entity.ID, "mount_accessors", mountAccessors) - } + if len(mountAccessors) > 0 { + i.logger.Warn("Entity has multiple aliases on the same mount(s)", "entity_id", entity.ID, "mount_accessors", mountAccessors) + } - for _, accessor := range mountAccessors { - if _, ok := duplicatedAccessors[accessor]; !ok { - duplicatedAccessors[accessor] = struct{}{} + for _, accessor := range mountAccessors { + if _, ok := duplicatedAccessors[accessor]; !ok { + duplicatedAccessors[accessor] = struct{}{} + } } - } - localAliases, err := i.parseLocalAliases(entity.ID) - if err != nil { - return fmt.Errorf("failed to load local aliases from storage: %v", err) - } - if localAliases != nil { - for _, alias := range localAliases.Aliases { - entity.UpsertAlias(alias) + err = i.loadLocalAliasesForEntity(ctx, entity, localAliasBuckets) + if err != nil { + return fmt.Errorf("failed to load local aliases from storage: %v", err) } - } - // Only update MemDB and don't hit the storage again - err = i.upsertEntity(nsCtx, entity, nil, false) - if err != nil { - return fmt.Errorf("failed to update entity in MemDB: %w", err) + toBeUpserted := 1 + len(entity.Aliases) + if upsertedItems+toBeUpserted > entityLoadingTxMaxSize { + tx.Commit() + upsertedItems = 0 + tx = i.db.Txn(true) + defer tx.Abort() + } + // Only update MemDB and don't hit the storage again + err = i.upsertEntityInTxn(nsCtx, tx, entity, nil, false) + if err != nil { + return fmt.Errorf("failed to update entity in MemDB: %w", err) + } + upsertedItems += toBeUpserted + } + if upsertedItems > 0 { + tx.Commit() } + return nil } + err := load(entities) + if err != nil { + return err + } + } } @@ -557,6 +583,40 @@ LOOP: return nil } +// loadLocalAliasesForEntity upserts local aliases into the entity by retrieving +// the local aliases from the cache (if present) or storage +func (i *IdentityStore) loadLocalAliasesForEntity(ctx context.Context, entity *identity.Entity, localAliasCache map[string]*storagepacker.Bucket) error { + bucketKey := i.localAliasPacker.BucketKey(entity.ID) + if len(bucketKey) == 0 { + return fmt.Errorf("no bucket key for ID %s", entity.ID) + } + bucket, ok := localAliasCache[bucketKey] + if !ok { + var err error + bucket, err = i.localAliasPacker.GetBucket(ctx, bucketKey) + if err != nil { + return fmt.Errorf("failed to load local alias bucket from storage: %v", err) + } + localAliasCache[bucketKey] = bucket + } + if bucket == nil { + return nil + } + for _, item := range bucket.Items { + if item.ID == entity.ID { + var localAliases identity.LocalAliases + err := ptypes.UnmarshalAny(item.Message, &localAliases) + if err != nil { + return fmt.Errorf("failed to unmarshal local alias: %v", err) + } + for _, alias := range localAliases.Aliases { + entity.UpsertAlias(alias) + } + } + } + return nil +} + // getAccessorsOnDuplicateAliases returns a list of accessors by checking aliases in // the passed in list which belong to the same accessor(s) func getAccessorsOnDuplicateAliases(aliases []*identity.Alias) []string {