Skip to content

Commit

Permalink
feat: implement upload persistent cache task (#3620)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Oct 30, 2024
1 parent ab11fbf commit 4f8fb8f
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 40 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.23.0

require (
d7y.io/api/v2 v2.0.167
d7y.io/api/v2 v2.0.168
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.167 h1:rhioXzV8AMn4trMbv2RPZ6jeMIuwAbTDRpwl7bwRrfQ=
d7y.io/api/v2 v2.0.167/go.mod h1:3dzPWMxjqcdnkk1WFmPQEC+OJn/5NHFStANSRybQZUY=
d7y.io/api/v2 v2.0.168 h1:4ewK+RN9b608Cng2eIdt9G98thcLAkVh0EfHoi5mDa8=
d7y.io/api/v2 v2.0.168/go.mod h1:yT5MhUI0My91HWiq8ThPzQu8FNydTpRUix5LgpDE8bw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
30 changes: 20 additions & 10 deletions scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
// Peer has been created but did not start running.
PeerStatePending = "Pending"

// Peer is uploading resources for p2p cluster.
PeerStateUploading = "Uploading"

// Peer successfully registered and perpared to download.
PeerStateReceived = "Received"

Expand All @@ -44,17 +47,20 @@ const (
)

const (
// Peer is uploding.
PeerEventUpload = "Uploda"

// Peer is registered and perpared to download.
PeerEventRegister = "Register"

// Peer is downloading.
PeerEventDownload = "Download"

// Peer downloaded successfully.
PeerEventDownloadSucceeded = "DownloadSucceeded"
// Peer downloaded or uploaded successfully.
PeerEventSucceeded = "Succeeded"

// Peer downloaded failed.
PeerEventDownloadFailed = "DownloadFailed"
// Peer downloaded or uploaded failed.
PeerEventFailed = "Failed"
)

// Peer contains content for persistent cache peer.
Expand Down Expand Up @@ -112,22 +118,26 @@ func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, b
p.FSM = fsm.NewFSM(
PeerStatePending,
fsm.Events{
{Name: PeerEventRegister, Src: []string{PeerStatePending}, Dst: PeerStateReceived},
{Name: PeerEventDownload, Src: []string{PeerStateReceived}, Dst: PeerStateRunning},
{Name: PeerEventDownloadSucceeded, Src: []string{PeerStateRunning}, Dst: PeerStateSucceeded},
{Name: PeerEventDownloadFailed, Src: []string{PeerStateRunning}, Dst: PeerStateFailed},
fsm.EventDesc{Name: PeerEventUpload, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateUploading},
fsm.EventDesc{Name: PeerEventRegister, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceived},
fsm.EventDesc{Name: PeerEventDownload, Src: []string{PeerStateReceived}, Dst: PeerStateRunning},
fsm.EventDesc{Name: PeerEventSucceeded, Src: []string{PeerStateUploading, PeerStateRunning}, Dst: PeerStateSucceeded},
fsm.EventDesc{Name: PeerEventFailed, Src: []string{PeerStateUploading, PeerStateRunning}, Dst: PeerStateFailed},
},
fsm.Callbacks{
PeerEventUpload: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventRegister: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownload: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownloadSucceeded: func(ctx context.Context, e *fsm.Event) {
PeerEventSucceeded: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownloadFailed: func(ctx context.Context, e *fsm.Event) {
PeerEventFailed: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
},
Expand Down
5 changes: 3 additions & 2 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), peer.Task.TTL).Result(); err != nil {
ttl := peer.Task.TTL - time.Since(peer.Task.CreatedAt)
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set peer ttl failed: %v", err)
return err
}
Expand All @@ -199,7 +200,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.Task.TTL).Result(); err != nil {
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}
Expand Down
14 changes: 7 additions & 7 deletions scheduler/resource/persistentcache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ const (
TaskEventUpload = "Upload"

// Task uploaded successfully.
TaskEventUploadSucceeded = "UploadSucceeded"
TaskEventSucceeded = "Succeeded"

// Task uploaded failed.
TaskEventUploadFailed = "UploadFailed"
TaskEventFailed = "Failed"
)

// Task contains content for persistent cache task.
Expand Down Expand Up @@ -122,18 +122,18 @@ func NewTask(id, tag, application, state string, persistentReplicaCount uint64,
t.FSM = fsm.NewFSM(
TaskStatePending,
fsm.Events{
{Name: TaskEventUpload, Src: []string{TaskStatePending, TaskStateFailed}, Dst: TaskStateUploading},
{Name: TaskEventUploadSucceeded, Src: []string{TaskStateUploading}, Dst: TaskStateSucceeded},
{Name: TaskEventUploadFailed, Src: []string{TaskStateUploading}, Dst: TaskStateFailed},
fsm.EventDesc{Name: TaskEventUpload, Src: []string{TaskStatePending, TaskStateFailed}, Dst: TaskStateUploading},
fsm.EventDesc{Name: TaskEventSucceeded, Src: []string{TaskStateUploading}, Dst: TaskStateSucceeded},
fsm.EventDesc{Name: TaskEventFailed, Src: []string{TaskStateUploading}, Dst: TaskStateFailed},
},
fsm.Callbacks{
TaskEventUpload: func(ctx context.Context, e *fsm.Event) {
t.Log.Infof("task state is %s", e.FSM.Current())
},
TaskEventUploadSucceeded: func(ctx context.Context, e *fsm.Event) {
TaskEventSucceeded: func(ctx context.Context, e *fsm.Event) {
t.Log.Infof("task state is %s", e.FSM.Current())
},
TaskEventUploadFailed: func(ctx context.Context, e *fsm.Event) {
TaskEventFailed: func(ctx context.Context, e *fsm.Event) {
t.Log.Infof("task state is %s", e.FSM.Current())
},
},
Expand Down
3 changes: 2 additions & 1 deletion scheduler/resource/persistentcache/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ func (t *taskManager) Store(ctx context.Context, task *Task) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL).Result(); err != nil {
ttl := task.TTL - time.Since(task.CreatedAt)
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), ttl).Result(); err != nil {
task.Log.Errorf("set task ttl failed: %v", err)
return err
}
Expand Down
18 changes: 9 additions & 9 deletions scheduler/resource/standard/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,23 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
p.FSM = fsm.NewFSM(
PeerStatePending,
fsm.Events{
{Name: PeerEventRegisterEmpty, Src: []string{PeerStatePending}, Dst: PeerStateReceivedEmpty},
{Name: PeerEventRegisterTiny, Src: []string{PeerStatePending}, Dst: PeerStateReceivedTiny},
{Name: PeerEventRegisterSmall, Src: []string{PeerStatePending}, Dst: PeerStateReceivedSmall},
{Name: PeerEventRegisterNormal, Src: []string{PeerStatePending}, Dst: PeerStateReceivedNormal},
{Name: PeerEventDownload, Src: []string{PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal}, Dst: PeerStateRunning},
{Name: PeerEventDownloadBackToSource, Src: []string{PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, PeerStateRunning}, Dst: PeerStateBackToSource},
{Name: PeerEventDownloadSucceeded, Src: []string{
fsm.EventDesc{Name: PeerEventRegisterEmpty, Src: []string{PeerStatePending}, Dst: PeerStateReceivedEmpty},
fsm.EventDesc{Name: PeerEventRegisterTiny, Src: []string{PeerStatePending}, Dst: PeerStateReceivedTiny},
fsm.EventDesc{Name: PeerEventRegisterSmall, Src: []string{PeerStatePending}, Dst: PeerStateReceivedSmall},
fsm.EventDesc{Name: PeerEventRegisterNormal, Src: []string{PeerStatePending}, Dst: PeerStateReceivedNormal},
fsm.EventDesc{Name: PeerEventDownload, Src: []string{PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal}, Dst: PeerStateRunning},
fsm.EventDesc{Name: PeerEventDownloadBackToSource, Src: []string{PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, PeerStateRunning}, Dst: PeerStateBackToSource},
fsm.EventDesc{Name: PeerEventDownloadSucceeded, Src: []string{
// Since ReportPeerResult and ReportPieceResult are called in no order,
// the result may be reported after the register is successful.
PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal,
PeerStateRunning, PeerStateBackToSource,
}, Dst: PeerStateSucceeded},
{Name: PeerEventDownloadFailed, Src: []string{
fsm.EventDesc{Name: PeerEventDownloadFailed, Src: []string{
PeerStatePending, PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal,
PeerStateRunning, PeerStateBackToSource, PeerStateSucceeded,
}, Dst: PeerStateFailed},
{Name: PeerEventLeave, Src: []string{
fsm.EventDesc{Name: PeerEventLeave, Src: []string{
PeerStatePending, PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal,
PeerStateRunning, PeerStateBackToSource, PeerStateFailed, PeerStateSucceeded,
}, Dst: PeerStateLeave},
Expand Down
8 changes: 4 additions & 4 deletions scheduler/resource/standard/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ func NewTask(id, url, tag, application string, typ commonv2.TaskType, filteredQu
t.FSM = fsm.NewFSM(
TaskStatePending,
fsm.Events{
{Name: TaskEventDownload, Src: []string{TaskStatePending, TaskStateSucceeded, TaskStateFailed, TaskStateLeave}, Dst: TaskStateRunning},
{Name: TaskEventDownloadSucceeded, Src: []string{TaskStateLeave, TaskStateRunning, TaskStateFailed}, Dst: TaskStateSucceeded},
{Name: TaskEventDownloadFailed, Src: []string{TaskStateRunning}, Dst: TaskStateFailed},
{Name: TaskEventLeave, Src: []string{TaskStatePending, TaskStateRunning, TaskStateSucceeded, TaskStateFailed}, Dst: TaskStateLeave},
fsm.EventDesc{Name: TaskEventDownload, Src: []string{TaskStatePending, TaskStateSucceeded, TaskStateFailed, TaskStateLeave}, Dst: TaskStateRunning},
fsm.EventDesc{Name: TaskEventDownloadSucceeded, Src: []string{TaskStateLeave, TaskStateRunning, TaskStateFailed}, Dst: TaskStateSucceeded},
fsm.EventDesc{Name: TaskEventDownloadFailed, Src: []string{TaskStateRunning}, Dst: TaskStateFailed},
fsm.EventDesc{Name: TaskEventLeave, Src: []string{TaskStatePending, TaskStateRunning, TaskStateSucceeded, TaskStateFailed}, Dst: TaskStateLeave},
},
fsm.Callbacks{
TaskEventDownload: func(ctx context.Context, e *fsm.Event) {
Expand Down
141 changes: 137 additions & 4 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"time"

"github.com/bits-and-blooms/bitset"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -1504,21 +1505,153 @@ func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.Del
return nil
}

// TODO Implement the following methods.
// UploadPersistentCacheTaskStarted uploads the metadata of the persistent cache task started.
func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskStartedRequest) error {
log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
host, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.GetHostId())
if !loaded {
log.Error("host not found")
return status.Errorf(codes.NotFound, "host %s not found", req.GetHostId())
}

// Handle task with task started request, new task and store it.
task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
if loaded && !task.FSM.Can(persistentcache.TaskEventUpload) {
log.Errorf("persistent cache task %s is %s cannot upload", task.ID, task.FSM.Current())
return status.Errorf(codes.FailedPrecondition, "persistent cache task %s is %s cannot upload", task.ID, task.FSM.Current())
}

digest, err := digest.Parse(req.GetDigest())
if err != nil {
log.Errorf("parse digest %s error %s", req.GetDigest(), err)
return status.Errorf(codes.InvalidArgument, err.Error())
}

task = persistentcache.NewTask(req.GetTaskId(), req.GetTag(), req.GetApplication(), persistentcache.TaskStatePending, req.GetPersistentReplicaCount(),
0, int32(req.GetPieceLength()), int64(req.GetContentLength()), int32(req.GetPieceCount()), digest, req.GetTtl().AsDuration(), time.Now(), time.Now(), log)

if err := task.FSM.Event(ctx, persistentcache.TaskEventUpload); err != nil {
log.Errorf("task fsm event failed: %s", err.Error())
return status.Errorf(codes.Internal, err.Error())
}

if err := v.persistentCacheResource.TaskManager().Store(ctx, task); err != nil {
log.Errorf("store persistent cache task %s error %s", task.ID, err)
return status.Errorf(codes.Internal, err.Error())
}

// Handle peer with task started request, new peer and store it.
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if loaded {
log.Error("persistent cache peer already exists")
return status.Errorf(codes.AlreadyExists, "persistent cache peer %s already exists", peer.ID)
}

peer = persistentcache.NewPeer(req.GetPeerId(), persistentcache.PeerStatePending, true, bitset.New(uint(req.GetPieceCount())), nil, task, host, 0, time.Now(), time.Now(), log)

if err := peer.FSM.Event(ctx, persistentcache.PeerEventUpload); err != nil {
log.Errorf("peer fsm event failed: %s", err.Error())
return status.Errorf(codes.Internal, err.Error())
}

if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil {
log.Errorf("store persistent cache peer %s error %s", peer.ID, err)
return status.Errorf(codes.Internal, err.Error())
}

return nil
}

// TODO Implement the following methods.
// UploadPersistentCacheTaskFinished uploads the metadata of the persistent cache task finished.
func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskFinishedRequest) (*commonv2.PersistentCacheTask, error) {
return nil, nil
log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
// Handle peer with task finished request, load peer and update it.
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !loaded {
log.Error("persistent cache peer not found")
return nil, status.Errorf(codes.NotFound, "persistent cache peer %s not found", req.GetPeerId())
}

peer.FinishedPieces.SetAll()
if err := peer.FSM.Event(ctx, persistentcache.PeerEventSucceeded); err != nil {
log.Errorf("peer fsm event failed: %s", err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
}
peer.Cost = time.Since(peer.CreatedAt)
peer.UpdatedAt = time.Now()

if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil {
log.Errorf("store persistent cache peer %s error %s", peer.ID, err)
return nil, status.Errorf(codes.Internal, err.Error())
}

// Handle task with peer finished request, load task and update it.
peer.Task.ReplicaCount++
if err := peer.Task.FSM.Event(ctx, persistentcache.TaskEventSucceeded); err != nil {
log.Errorf("task fsm event failed: %s", err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
}
peer.Task.UpdatedAt = time.Now()

if err := v.persistentCacheResource.TaskManager().Store(ctx, peer.Task); err != nil {
log.Errorf("store persistent cache task %s error %s", peer.Task.ID, err)
return nil, status.Errorf(codes.Internal, err.Error())
}

// TODO(gaius) Implement copy multiple replicas to the other peers.
// Select the remote peer to copy the replica and trigger the download task with asynchronous.
if peer.Task.ReplicaCount < peer.Task.PersistentReplicaCount {
}

return &commonv2.PersistentCacheTask{
Id: peer.Task.ID,
PersistentReplicaCount: peer.Task.PersistentReplicaCount,
ReplicaCount: peer.Task.ReplicaCount,
Digest: peer.Task.Digest.String(),
Tag: &peer.Task.Tag,
Application: &peer.Task.Application,
PieceLength: uint64(peer.Task.PieceLength),
ContentLength: uint64(peer.Task.ContentLength),
PieceCount: uint32(peer.Task.TotalPieceCount),
State: peer.Task.FSM.Current(),
CreatedAt: timestamppb.New(peer.Task.CreatedAt),
UpdatedAt: timestamppb.New(peer.Task.UpdatedAt),
}, nil
}

// TODO Implement the following methods.
// UploadPersistentCacheTaskFailed uploads the metadata of the persistent cache task failed.
func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskFailedRequest) error {
log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
// Handle peer with task failed request, load peer and update it.
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !loaded {
log.Error("persistent cache peer not found")
return status.Errorf(codes.NotFound, "persistent cache peer %s not found", req.GetPeerId())
}

if err := peer.FSM.Event(ctx, persistentcache.PeerEventFailed); err != nil {
log.Errorf("peer fsm event failed: %s", err.Error())
return status.Errorf(codes.Internal, err.Error())
}
peer.UpdatedAt = time.Now()

if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil {
log.Errorf("store persistent cache peer %s error %s", peer.ID, err)
return status.Errorf(codes.Internal, err.Error())
}

// Handle task with peer failed request, load task and update it.
if err := peer.Task.FSM.Event(ctx, persistentcache.TaskEventSucceeded); err != nil {
log.Errorf("task fsm event failed: %s", err.Error())
return status.Errorf(codes.Internal, err.Error())
}
peer.Task.UpdatedAt = time.Now()

if err := v.persistentCacheResource.TaskManager().Store(ctx, peer.Task); err != nil {
log.Errorf("store persistent cache task %s error %s", peer.Task.ID, err)
return status.Errorf(codes.Internal, err.Error())
}

return nil
}

Expand Down

0 comments on commit 4f8fb8f

Please sign in to comment.