Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wdpost: disabled post worker handling #10394

Merged
merged 5 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions itests/kit/ensemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node
MinerNode: minerNode,
RemoteListener: rl,
options: options,

Stop: func(ctx context.Context) error { return nil },
}

n.inactive.workers = append(n.inactive.workers, worker)
Expand Down
6 changes: 6 additions & 0 deletions itests/kit/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func workerRpc(t *testing.T, m *TestWorker) *TestWorker {
require.NoError(t, err)
t.Cleanup(stop)

m.Stop = func(ctx context.Context) error {
srv.Close()
srv.CloseClientConnections()
return nil
}

m.ListenAddr, m.Worker = maddr, cl
return m
}
53 changes: 41 additions & 12 deletions itests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {

sectors := 2 * 48 * 2

client, miner, _, ens := kit.EnsembleWorker(t,
client, miner, _, _ := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
Expand All @@ -378,17 +378,8 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)

bm := ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)[0]

di = di.NextNotElapsed()

t.Log("Running one proving period")
waitUntil := di.Open + di.WPoStChallengeWindow*2 - 2
client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))

t.Log("Waiting for post message")
bm.Stop()

tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err)
Expand All @@ -398,10 +389,48 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
tryDl(0)
tryDl(40)
tryDl(di.Index + 4)
}

lastPending, err := client.MpoolPending(ctx, types.EmptyTSK)
func TestWindowPostWorkerDisconnected(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_ = logging.SetLogLevel("storageminer", "INFO")

sectors := 2 * 48 * 2

_, miner, badWorker, ens := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}))

var goodWorker kit.TestWorker
ens.Worker(miner, &goodWorker, kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}), kit.ThroughRPC()).Start()

// wait for all workers
require.Eventually(t, func() bool {
w, err := miner.WorkerStats(ctx)
require.NoError(t, err)
return len(w) == 3 // 2 post + 1 miner-builtin
}, 10*time.Second, 100*time.Millisecond)

tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err)
require.Len(t, p, 1)
require.Equal(t, dl, p[0].Deadline)
}
tryDl(0) // this will run on the not-yet-bad badWorker

err := badWorker.Stop(ctx)
require.NoError(t, err)
require.Len(t, lastPending, 0)

tryDl(10) // will fail on the badWorker, then should retry on the goodWorker

time.Sleep(15 * time.Second)

tryDl(40) // after HeartbeatInterval, the badWorker should be marked as disabled
}

func TestSchedulerRemoveRequest(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion storage/sealer/manager_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
skipped = append(skipped, sk...)

if err != nil {
retErr = multierr.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err))
retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err))
}
flk.Unlock()
}
Expand Down
38 changes: 31 additions & 7 deletions storage/sealer/sched_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package sealer

import (
"context"
"errors"
"math/rand"
"sync"
"time"

"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/storage/paths"
Expand Down Expand Up @@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
}
}()

selected := candidates[0]
worker := ps.workers[selected.id]
var rpcErrs error

return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
for i, selected := range candidates {
worker := ps.workers[selected.id]

return work(ctx, worker.workerRpc)
})
err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()

return work(ctx, worker.workerRpc)
})
if err == nil {
return nil
}

// if the error is RPCConnectionError, try another worker, if not, return the error
if !errors.As(err, new(*jsonrpc.RPCConnectionError)) {
return err
}

log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates))
rpcErrs = multierror.Append(rpcErrs, err)
}

return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs)
}

type candidateWorker struct {
Expand All @@ -124,6 +143,11 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
for wid, wr := range ps.workers {
needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType)

if !wr.Enabled {
log.Debugf("sched: not scheduling on PoSt-worker %s, worker disabled", wid)
continue
}

if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue
}
Expand Down