From 941a38301976e9ff4fa4bce5d10f27e83242b2d2 Mon Sep 17 00:00:00 2001 From: yah01 Date: Sat, 2 Sep 2023 00:09:01 +0800 Subject: [PATCH] Fix failed to load collection with more than 128 partitions (#26763) Signed-off-by: yah01 --- .../metastore/kv/querycoord/kv_catalog.go | 26 +++++++-------- internal/querycoordv2/job/job_load.go | 10 +++--- internal/querycoordv2/job/job_release.go | 2 +- internal/querycoordv2/job/undo.go | 33 +++++++++++++++---- .../querycoordv2/meta/collection_manager.go | 13 ++++---- .../meta/collection_manager_test.go | 2 +- .../observers/collection_observer.go | 2 +- internal/querycoordv2/services_test.go | 2 +- 8 files changed, 54 insertions(+), 36 deletions(-) diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index 1e28a95eda46d..75753ec237969 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -41,30 +41,26 @@ func (s Catalog) SaveCollection(collection *querypb.CollectionLoadInfo, partitio if err != nil { return err } - kvs := make(map[string]string) - for _, partition := range partitions { - key := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID()) - value, err := proto.Marshal(partition) - if err != nil { - return err - } - kvs[key] = string(value) + err = s.cli.Save(k, string(v)) + if err != nil { + return err } - kvs[k] = string(v) - return s.cli.MultiSave(kvs) + return s.SavePartition(partitions...) } func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error { - kvs := make(map[string]string) for _, partition := range info { - key := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID()) - value, err := proto.Marshal(partition) + k := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID()) + v, err := proto.Marshal(partition) + if err != nil { + return err + } + err = s.cli.Save(k, string(v)) if err != nil { return err } - kvs[key] = string(value) } - return s.cli.MultiSave(kvs) + return nil } func (s Catalog) SaveReplica(replica *querypb.Replica) error { diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 73faa4e60ecc3..a00507b8668b8 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -157,7 +157,7 @@ func (job *LoadCollectionJob) Execute() error { log.Info("replica created", zap.Int64("replicaID", replica.GetID()), zap.Int64s("nodes", replica.GetNodes()), zap.String("resourceGroup", replica.GetResourceGroup())) } - job.undo.NewReplicaCreated = true + job.undo.IsReplicaCreated = true } // 3. loadPartitions on QueryNodes @@ -189,6 +189,7 @@ func (job *LoadCollectionJob) Execute() error { }, CreatedAt: time.Now(), } + job.undo.IsNewCollection = true err = job.meta.CollectionManager.PutCollection(collection, partitions...) if err != nil { msg := "failed to store collection and partitions" @@ -204,7 +205,7 @@ func (job *LoadCollectionJob) Execute() error { msg := "failed to update next target" log.Warn(msg, zap.Error(err)) } - job.undo.TargetUpdated = true + job.undo.IsTargetUpdated = true return nil } @@ -330,7 +331,7 @@ func (job *LoadPartitionJob) Execute() error { log.Info("replica created", zap.Int64("replicaID", replica.GetID()), zap.Int64s("nodes", replica.GetNodes()), zap.String("resourceGroup", replica.GetResourceGroup())) } - job.undo.NewReplicaCreated = true + job.undo.IsReplicaCreated = true } // 3. loadPartitions on QueryNodes @@ -353,6 +354,7 @@ func (job *LoadPartitionJob) Execute() error { } }) if !job.meta.CollectionManager.Exist(req.GetCollectionID()) { + job.undo.IsNewCollection = true collection := &meta.Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: req.GetCollectionID(), @@ -385,7 +387,7 @@ func (job *LoadPartitionJob) Execute() error { msg := "failed to update next target" log.Warn(msg, zap.Error(err)) } - job.undo.TargetUpdated = true + job.undo.IsTargetUpdated = true return nil } diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 7d4b21cfa188f..57ad526d94071 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -183,7 +183,7 @@ func (job *ReleasePartitionJob) Execute() error { metrics.QueryCoordNumCollections.WithLabelValues().Dec() waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) } else { - err := job.meta.CollectionManager.RemovePartition(toRelease...) + err := job.meta.CollectionManager.RemovePartition(req.GetCollectionID(), toRelease...) if err != nil { msg := "failed to release partitions from store" log.Warn(msg, zap.Error(err)) diff --git a/internal/querycoordv2/job/undo.go b/internal/querycoordv2/job/undo.go index c4235736111c4..5ea53e62ad890 100644 --- a/internal/querycoordv2/job/undo.go +++ b/internal/querycoordv2/job/undo.go @@ -22,11 +22,14 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/pkg/log" + "go.uber.org/zap" ) type UndoList struct { - TargetUpdated bool // indicates if target updated during loading - NewReplicaCreated bool // indicates if created new replicas during loading + IsTargetUpdated bool // indicates if target updated during loading + IsReplicaCreated bool // indicates if created new replicas during loading + IsNewCollection bool // indicates if created new collection during loading CollectionID int64 LackPartitions []int64 @@ -50,11 +53,29 @@ func NewUndoList(ctx context.Context, meta *meta.Meta, } func (u *UndoList) RollBack() { - if u.NewReplicaCreated { - u.meta.ReplicaManager.RemoveCollection(u.CollectionID) + log := log.Ctx(u.ctx).With( + zap.Int64("collectionID", u.CollectionID), + zap.Int64s("partitionIDs", u.LackPartitions), + ) + + log.Warn("rollback failed loading request...", + zap.Bool("isNewCollection", u.IsNewCollection), + zap.Bool("isReplicaCreated", u.IsReplicaCreated), + zap.Bool("isTargetUpdated", u.IsTargetUpdated), + ) + + var err error + if u.IsNewCollection || u.IsReplicaCreated { + err = u.meta.CollectionManager.RemoveCollection(u.CollectionID) + } else { + err = u.meta.CollectionManager.RemovePartition(u.CollectionID, u.LackPartitions...) } - if u.TargetUpdated { - if !u.meta.CollectionManager.Exist(u.CollectionID) { + if err != nil { + log.Warn("failed to rollback collection from meta", zap.Error(err)) + } + + if u.IsTargetUpdated { + if u.IsNewCollection { u.targetMgr.RemoveCollection(u.CollectionID) u.targetObserver.ReleaseCollection(u.CollectionID) } else { diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 132f2fdc8816d..c994d0df5494c 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -543,24 +543,23 @@ func (m *CollectionManager) RemoveCollection(collectionID UniqueID) error { return nil } -func (m *CollectionManager) RemovePartition(ids ...UniqueID) error { - if len(ids) == 0 { +func (m *CollectionManager) RemovePartition(collectionID UniqueID, partitionIDs ...UniqueID) error { + if len(partitionIDs) == 0 { return nil } m.rwmutex.Lock() defer m.rwmutex.Unlock() - return m.removePartition(ids...) + return m.removePartition(collectionID, partitionIDs...) } -func (m *CollectionManager) removePartition(ids ...UniqueID) error { - partition := m.partitions[ids[0]] - err := m.catalog.ReleasePartition(partition.CollectionID, ids...) +func (m *CollectionManager) removePartition(collectionID UniqueID, partitionIDs ...UniqueID) error { + err := m.catalog.ReleasePartition(collectionID, partitionIDs...) if err != nil { return err } - for _, id := range ids { + for _, id := range partitionIDs { delete(m.partitions, id) } diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index f879c8135c1e4..13430eb1155fd 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -259,7 +259,7 @@ func (suite *CollectionManagerSuite) TestRemove() { err := mgr.RemoveCollection(collectionID) suite.NoError(err) } else { - err := mgr.RemovePartition(suite.partitions[collectionID]...) + err := mgr.RemovePartition(collectionID, suite.partitions[collectionID]...) suite.NoError(err) } } diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 63e252f4b35d7..0fa9a8b1a51af 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -132,7 +132,7 @@ func (ob *CollectionObserver) observeTimeout() { zap.Int64("collectionID", collection), zap.Int64("partitionID", partition.GetPartitionID()), zap.Duration("loadTime", time.Since(partition.CreatedAt))) - ob.meta.CollectionManager.RemovePartition(partition.GetPartitionID()) + ob.meta.CollectionManager.RemovePartition(collection, partition.GetPartitionID()) ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID()) } // all partition timeout, remove collection diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 7fa1aab5ab18b..a42b7f45d4c86 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -291,7 +291,7 @@ func (suite *ServiceSuite) TestShowPartitions() { } else { partitionID := partitions[0] parBak := suite.meta.CollectionManager.GetPartition(partitionID) - err = suite.meta.CollectionManager.RemovePartition(partitionID) + err = suite.meta.CollectionManager.RemovePartition(collection, partitionID) suite.NoError(err) meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10)) resp, err = server.ShowPartitions(ctx, req)