Skip to content

Commit

Permalink
Merge pull request #5404 from oasisprotocol/peternose/bugfix/fix-bakc…
Browse files Browse the repository at this point in the history
…up-workers

go/worker/compute/executor: Minor bug fixes
  • Loading branch information
peternose authored Oct 18, 2023
2 parents 22bbb10 + 26ffcfb commit f63c447
Show file tree
Hide file tree
Showing 17 changed files with 210 additions and 271 deletions.
1 change: 1 addition & 0 deletions .changelog/5404.bugfix.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute/executor: Use local time for batch scheduling
1 change: 1 addition & 0 deletions .changelog/5404.bugfix.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute/executor: Start processing once all txs are fetched
1 change: 1 addition & 0 deletions .changelog/5404.bugfix.3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute/executor: Schedule only if higher ranks didn't propose
1 change: 1 addition & 0 deletions .changelog/5404.bugfix.4.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute/executor: Estimate pool rank from observed commitments
2 changes: 1 addition & 1 deletion docs/oasis-node/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ oasis_worker_node_status_frozen | Gauge | Is oasis node frozen (binary). | | [w
oasis_worker_node_status_runtime_faults | Gauge | Number of runtime faults. | runtime | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go)
oasis_worker_node_status_runtime_suspended | Gauge | Runtime node suspension status (binary). | runtime | [worker/registration](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/registration/worker.go)
oasis_worker_processed_block_count | Counter | Number of processed roothash blocks. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_processed_event_count | Counter | Number of processed roothash events. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_processed_event_count | Counter | Number of processed roothash events. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
oasis_worker_storage_commit_latency | Summary | Latency of storage commit calls (state + outputs) (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
oasis_worker_storage_full_round | Gauge | The last round that was fully synced and finalized. | runtime | [worker/storage/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/storage/committee/metrics.go)
oasis_worker_storage_pending_round | Gauge | The last round that is in-flight for syncing. | runtime | [worker/storage/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/storage/committee/metrics.go)
Expand Down
3 changes: 2 additions & 1 deletion go/consensus/cometbft/apps/roothash/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo
case commitment.ErrDiscrepancyDetected:
ctx.Logger().Warn("executor discrepancy detected",
"round", round,
"priority", rtState.CommitmentPool.HighestRank,
"rank", rtState.CommitmentPool.HighestRank,
logging.LogEvent, roothash.LogEventExecutionDiscrepancyDetected,
)

Expand Down Expand Up @@ -291,6 +291,7 @@ func (app *rootHashApplication) finalizeBlock(ctx *tmapi.Context, rtState *rooth

// Emit event.
ctx.Logger().Debug("new runtime block",
"runtime_id", rtState.Runtime.ID,
"height", ctx.BlockHeight()+1, // Current height is ctx.BlockHeight() + 1
"round", blk.Header.Round,
"type", blk.Header.HeaderType,
Expand Down
18 changes: 13 additions & 5 deletions go/roothash/api/commitment/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,15 @@ func (p *Pool) AddVerifiedExecutorCommitment(c *scheduler.Committee, ec *Executo
// Discrepancy detection accepts commitments arriving in any order, e.g., a backup worker
// can submit a commitment even before there is a discrepancy.
logger.Debug("node is not in the committee",
"round", ec.Header.Header.Round,
"node_id", ec.NodeID,
)
return ErrNotInCommittee
case p.Discrepancy && !c.IsBackupWorker(ec.NodeID):
// Discrepancy resolution accepts commitments only from backup workers to prevent workers
// from improving their liveness statistics.
logger.Debug("node is not a backup worker",
"round", ec.Header.Header.Round,
"node_id", ec.NodeID,
)
return ErrBadExecutorCommitment
Expand All @@ -246,7 +248,9 @@ func (p *Pool) AddVerifiedExecutorCommitment(c *scheduler.Committee, ec *Executo
// Reject commitments with invalid schedulers.
logger.Debug("executor commitment's scheduler is not in the committee",
"round", ec.Header.Header.Round,
"node_id", ec.NodeID,
"scheduler_id", ec.Header.SchedulerID,
"rank", rank,
)
return ErrBadExecutorCommitment
}
Expand All @@ -255,18 +259,22 @@ func (p *Pool) AddVerifiedExecutorCommitment(c *scheduler.Committee, ec *Executo
switch {
case rank > p.HighestRank:
// Reject commitments with higher ranking.
logger.Debug("executor commitment's scheduler has too high ranking",
logger.Debug("executor commitment's scheduler has worse ranking",
"round", ec.Header.Header.Round,
"commitment_rank", rank,
"pool_rank", p.HighestRank,
"node_id", ec.NodeID,
"scheduler_id", ec.Header.SchedulerID,
"rank", rank,
"highest_rank", p.HighestRank,
)
return ErrBadExecutorCommitment
case rank != p.HighestRank && p.Discrepancy:
// Prevent placing commitments with different rank during discrepancy resolution.
logger.Debug("executor commitment's scheduler rank does not match",
"round", ec.Header.Header.Round,
"commitment_rank", rank,
"pool_rank", p.HighestRank,
"node_id", ec.NodeID,
"scheduler_id", ec.Header.SchedulerID,
"rank", rank,
"highest_rank", p.HighestRank,
)
return ErrBadExecutorCommitment
case rank < p.HighestRank:
Expand Down
2 changes: 1 addition & 1 deletion go/roothash/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

const (
recvTimeout = 5 * time.Second
recvTimeout = 10 * time.Second
nrRuntimes = 3
)

Expand Down
5 changes: 0 additions & 5 deletions go/worker/client/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ func (n *Node) HandleNewBlockLocked(*runtime.BlockInfo) {
// Nothing to do here.
}

// HandleNewEventLocked is guarded by CrossNode.
func (n *Node) HandleNewEventLocked(*roothash.Event) {
// Nothing to do here.
}

// HandleRuntimeHostEventLocked is guarded by CrossNode.
func (n *Node) HandleRuntimeHostEventLocked(*host.Event) {
// Nothing to do here.
Expand Down
36 changes: 0 additions & 36 deletions go/worker/common/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ var (
},
[]string{"runtime"},
)
processedEventCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "oasis_worker_processed_event_count",
Help: "Number of processed roothash events.",
},
[]string{"runtime"},
)
failedRoundCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "oasis_worker_failed_round_count",
Expand Down Expand Up @@ -116,7 +109,6 @@ var (

nodeCollectors = []prometheus.Collector{
processedBlockCount,
processedEventCount,
failedRoundCount,
epochTransitionCount,
epochNumber,
Expand Down Expand Up @@ -145,8 +137,6 @@ type NodeHooks interface {
// Guarded by CrossNode.
HandleNewBlockLocked(*runtime.BlockInfo)
// Guarded by CrossNode.
HandleNewEventLocked(*roothash.Event)
// Guarded by CrossNode.
HandleRuntimeHostEventLocked(*host.Event)

// Initialized returns a channel that will be closed when the worker is initialized and ready
Expand Down Expand Up @@ -616,15 +606,6 @@ func (n *Node) handleNewBlockLocked(blk *block.Block, height int64) {
}
}

// Guarded by n.CrossNode.
func (n *Node) handleNewEventLocked(ev *roothash.Event) {
processedEventCount.With(n.getMetricLabels()).Inc()

for _, hooks := range n.hooks {
hooks.HandleNewEventLocked(ev)
}
}

// Guarded by n.CrossNode.
func (n *Node) handleRuntimeHostEventLocked(ev *host.Event) {
n.logger.Debug("got runtime event", "ev", ev)
Expand Down Expand Up @@ -712,16 +693,6 @@ func (n *Node) worker() {
}
defer blocksSub.Close()

// Start watching roothash events.
events, eventsSub, err := n.Consensus.RootHash().WatchEvents(n.ctx, n.Runtime.ID())
if err != nil {
n.logger.Error("failed to subscribe to roothash events",
"err", err,
)
return
}
defer eventsSub.Close()

// Provision the hosted runtime.
hrt, hrtNotifier, err := n.ProvisionHostedRuntime(n.ctx)
if err != nil {
Expand Down Expand Up @@ -798,13 +769,6 @@ func (n *Node) worker() {
defer n.CrossNode.Unlock()
n.handleNewBlockLocked(blk.Block, blk.Height)
}()
case ev := <-events:
// Received an event.
func() {
n.CrossNode.Lock()
defer n.CrossNode.Unlock()
n.handleNewEventLocked(ev)
}()
case ev := <-hrtEventCh:
// Received a hosted runtime event.
func() {
Expand Down
25 changes: 2 additions & 23 deletions go/worker/compute/executor/committee/discrepancy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,6 @@ type discrepancyEvent struct {
authoritative bool
}

func (n *Node) NotifyDiscrepancy(info *discrepancyEvent) {
// Drop discrepancies if the worker falls behind.
select {
case <-n.discrepancyCh:
default:
}

// Non-blocking send.
n.discrepancyCh <- info
}

func (n *Node) handleDiscrepancy(ctx context.Context, ev *discrepancyEvent) {
n.logger.Warn("execution discrepancy detected",
"rank", ev.rank,
Expand Down Expand Up @@ -55,17 +44,7 @@ func (n *Node) handleDiscrepancy(ctx context.Context, ev *discrepancyEvent) {
n.discrepancy = ev
}

func (n *Node) handleObservedExecutorCommitment(ctx context.Context, ec *commitment.ExecutorCommitment) {
// Don't do anything if we are not a backup worker or we are an executor worker.
id := n.commonNode.Identity.NodeSigner.Public()
if !n.committee.IsBackupWorker(id) || n.committee.IsWorker(id) {
return
}

n.logger.Debug("observed executor commitment",
"commitment", ec,
)

func (n *Node) predictDiscrepancy(ctx context.Context, ec *commitment.ExecutorCommitment) {
// TODO: Handle equivocation detection.

// Don't do anything if the discrepancy has already been detected.
Expand Down Expand Up @@ -101,7 +80,7 @@ func (n *Node) handleObservedExecutorCommitment(ctx context.Context, ec *commitm

n.logger.Warn("observed commitments indicate discrepancy")

n.NotifyDiscrepancy(&discrepancyEvent{
n.handleDiscrepancy(ctx, &discrepancyEvent{
rank: n.commitPool.HighestRank,
height: uint64(n.blockInfo.ConsensusBlock.Height),
authoritative: false,
Expand Down
33 changes: 0 additions & 33 deletions go/worker/compute/executor/committee/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"

"github.com/oasisprotocol/oasis-core/go/common/crash"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
runtime "github.com/oasisprotocol/oasis-core/go/runtime/api"
"github.com/oasisprotocol/oasis-core/go/worker/common/committee"
)
Expand Down Expand Up @@ -46,34 +44,3 @@ func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) {
// Non-blocking send.
n.blockInfoCh <- bi
}

// HandleNewEventLocked implements NodeHooks.
// Guarded by n.commonNode.CrossNode.
func (n *Node) HandleNewEventLocked(ev *roothash.Event) {
switch {
case ev.ExecutionDiscrepancyDetected != nil:
n.NotifyDiscrepancy(&discrepancyEvent{
rank: ev.ExecutionDiscrepancyDetected.Rank,
height: uint64(ev.Height),
authoritative: true,
})
case ev.ExecutorCommitted != nil:
n.NotifySchedulerCommitment(&ev.ExecutorCommitted.Commit)
}
}

func (n *Node) NotifySchedulerCommitment(ec *commitment.ExecutorCommitment) {
// Filter scheduler commitments.
if ec.NodeID != ec.Header.SchedulerID {
return
}

// Drop commitments if the worker falls behind. The pool's rank can only improve.
select {
case <-n.schedulerCommitmentCh:
default:
}

// Non-blocking send.
n.schedulerCommitmentCh <- ec
}
8 changes: 8 additions & 0 deletions go/worker/compute/executor/committee/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import (
)

var (
processedEventCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "oasis_worker_processed_event_count",
Help: "Number of processed roothash events.",
},
[]string{"runtime"},
)
discrepancyDetectedCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "oasis_worker_execution_discrepancy_detected_count",
Expand Down Expand Up @@ -52,6 +59,7 @@ var (
[]string{"runtime"},
)
nodeCollectors = []prometheus.Collector{
processedEventCount,
discrepancyDetectedCount,
abortedBatchCount,
storageCommitLatency,
Expand Down
Loading

0 comments on commit f63c447

Please sign in to comment.