Skip to content

Commit

Permalink
Fix failed to load collection with more than 128 partitions (#26763)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <yah2er0ne@outlook.com>
  • Loading branch information
yah01 authored Sep 1, 2023
1 parent 64cf5ea commit 941a383
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 36 deletions.
26 changes: 11 additions & 15 deletions internal/metastore/kv/querycoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,30 +41,26 @@ func (s Catalog) SaveCollection(collection *querypb.CollectionLoadInfo, partitio
if err != nil {
return err
}
kvs := make(map[string]string)
for _, partition := range partitions {
key := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID())
value, err := proto.Marshal(partition)
if err != nil {
return err
}
kvs[key] = string(value)
err = s.cli.Save(k, string(v))
if err != nil {
return err
}
kvs[k] = string(v)
return s.cli.MultiSave(kvs)
return s.SavePartition(partitions...)
}

func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error {
kvs := make(map[string]string)
for _, partition := range info {
key := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID())
value, err := proto.Marshal(partition)
k := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID())
v, err := proto.Marshal(partition)
if err != nil {
return err
}
err = s.cli.Save(k, string(v))
if err != nil {
return err
}
kvs[key] = string(value)
}
return s.cli.MultiSave(kvs)
return nil
}

func (s Catalog) SaveReplica(replica *querypb.Replica) error {
Expand Down
10 changes: 6 additions & 4 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (job *LoadCollectionJob) Execute() error {
log.Info("replica created", zap.Int64("replicaID", replica.GetID()),
zap.Int64s("nodes", replica.GetNodes()), zap.String("resourceGroup", replica.GetResourceGroup()))
}
job.undo.NewReplicaCreated = true
job.undo.IsReplicaCreated = true
}

// 3. loadPartitions on QueryNodes
Expand Down Expand Up @@ -189,6 +189,7 @@ func (job *LoadCollectionJob) Execute() error {
},
CreatedAt: time.Now(),
}
job.undo.IsNewCollection = true
err = job.meta.CollectionManager.PutCollection(collection, partitions...)
if err != nil {
msg := "failed to store collection and partitions"
Expand All @@ -204,7 +205,7 @@ func (job *LoadCollectionJob) Execute() error {
msg := "failed to update next target"
log.Warn(msg, zap.Error(err))
}
job.undo.TargetUpdated = true
job.undo.IsTargetUpdated = true

return nil
}
Expand Down Expand Up @@ -330,7 +331,7 @@ func (job *LoadPartitionJob) Execute() error {
log.Info("replica created", zap.Int64("replicaID", replica.GetID()),
zap.Int64s("nodes", replica.GetNodes()), zap.String("resourceGroup", replica.GetResourceGroup()))
}
job.undo.NewReplicaCreated = true
job.undo.IsReplicaCreated = true
}

// 3. loadPartitions on QueryNodes
Expand All @@ -353,6 +354,7 @@ func (job *LoadPartitionJob) Execute() error {
}
})
if !job.meta.CollectionManager.Exist(req.GetCollectionID()) {
job.undo.IsNewCollection = true
collection := &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: req.GetCollectionID(),
Expand Down Expand Up @@ -385,7 +387,7 @@ func (job *LoadPartitionJob) Execute() error {
msg := "failed to update next target"
log.Warn(msg, zap.Error(err))
}
job.undo.TargetUpdated = true
job.undo.IsTargetUpdated = true

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/job/job_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (job *ReleasePartitionJob) Execute() error {
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
} else {
err := job.meta.CollectionManager.RemovePartition(toRelease...)
err := job.meta.CollectionManager.RemovePartition(req.GetCollectionID(), toRelease...)
if err != nil {
msg := "failed to release partitions from store"
log.Warn(msg, zap.Error(err))
Expand Down
33 changes: 27 additions & 6 deletions internal/querycoordv2/job/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
"go.uber.org/zap"
)

type UndoList struct {
TargetUpdated bool // indicates if target updated during loading
NewReplicaCreated bool // indicates if created new replicas during loading
IsTargetUpdated bool // indicates if target updated during loading
IsReplicaCreated bool // indicates if created new replicas during loading
IsNewCollection bool // indicates if created new collection during loading

CollectionID int64
LackPartitions []int64
Expand All @@ -50,11 +53,29 @@ func NewUndoList(ctx context.Context, meta *meta.Meta,
}

func (u *UndoList) RollBack() {
if u.NewReplicaCreated {
u.meta.ReplicaManager.RemoveCollection(u.CollectionID)
log := log.Ctx(u.ctx).With(
zap.Int64("collectionID", u.CollectionID),
zap.Int64s("partitionIDs", u.LackPartitions),
)

log.Warn("rollback failed loading request...",
zap.Bool("isNewCollection", u.IsNewCollection),
zap.Bool("isReplicaCreated", u.IsReplicaCreated),
zap.Bool("isTargetUpdated", u.IsTargetUpdated),
)

var err error
if u.IsNewCollection || u.IsReplicaCreated {
err = u.meta.CollectionManager.RemoveCollection(u.CollectionID)
} else {
err = u.meta.CollectionManager.RemovePartition(u.CollectionID, u.LackPartitions...)
}
if u.TargetUpdated {
if !u.meta.CollectionManager.Exist(u.CollectionID) {
if err != nil {
log.Warn("failed to rollback collection from meta", zap.Error(err))
}

if u.IsTargetUpdated {
if u.IsNewCollection {
u.targetMgr.RemoveCollection(u.CollectionID)
u.targetObserver.ReleaseCollection(u.CollectionID)
} else {
Expand Down
13 changes: 6 additions & 7 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,24 +543,23 @@ func (m *CollectionManager) RemoveCollection(collectionID UniqueID) error {
return nil
}

func (m *CollectionManager) RemovePartition(ids ...UniqueID) error {
if len(ids) == 0 {
func (m *CollectionManager) RemovePartition(collectionID UniqueID, partitionIDs ...UniqueID) error {
if len(partitionIDs) == 0 {
return nil
}

m.rwmutex.Lock()
defer m.rwmutex.Unlock()

return m.removePartition(ids...)
return m.removePartition(collectionID, partitionIDs...)
}

func (m *CollectionManager) removePartition(ids ...UniqueID) error {
partition := m.partitions[ids[0]]
err := m.catalog.ReleasePartition(partition.CollectionID, ids...)
func (m *CollectionManager) removePartition(collectionID UniqueID, partitionIDs ...UniqueID) error {
err := m.catalog.ReleasePartition(collectionID, partitionIDs...)
if err != nil {
return err
}
for _, id := range ids {
for _, id := range partitionIDs {
delete(m.partitions, id)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/meta/collection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (suite *CollectionManagerSuite) TestRemove() {
err := mgr.RemoveCollection(collectionID)
suite.NoError(err)
} else {
err := mgr.RemovePartition(suite.partitions[collectionID]...)
err := mgr.RemovePartition(collectionID, suite.partitions[collectionID]...)
suite.NoError(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/observers/collection_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (ob *CollectionObserver) observeTimeout() {
zap.Int64("collectionID", collection),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Duration("loadTime", time.Since(partition.CreatedAt)))
ob.meta.CollectionManager.RemovePartition(partition.GetPartitionID())
ob.meta.CollectionManager.RemovePartition(collection, partition.GetPartitionID())
ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID())
}
// all partition timeout, remove collection
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (suite *ServiceSuite) TestShowPartitions() {
} else {
partitionID := partitions[0]
parBak := suite.meta.CollectionManager.GetPartition(partitionID)
err = suite.meta.CollectionManager.RemovePartition(partitionID)
err = suite.meta.CollectionManager.RemovePartition(collection, partitionID)
suite.NoError(err)
meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10))
resp, err = server.ShowPartitions(ctx, req)
Expand Down

0 comments on commit 941a383

Please sign in to comment.