Skip to content

Commit

Permalink
Merge pull request #5577 from oasisprotocol/kostko/fix/rhp-wait-ready
Browse files Browse the repository at this point in the history
go/runtime/host: Wait for readiness instead of failing immediately
  • Loading branch information
kostko authored Feb 28, 2024
2 parents da555b1 + 09374a7 commit 8a75487
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
1 change: 1 addition & 0 deletions .changelog/5577.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/runtime/host: Wait for readiness instead of failing immediately
40 changes: 24 additions & 16 deletions go/runtime/host/protocol/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import (
const (
moduleName = "rhp/internal"

// connWriteTimeout is the connection write timeout.
connWriteTimeout = 5 * time.Second
// connReadyTimeout is the timeout while waiting for the connection to be ready while attempting
// to handle a new request from the runtime.
connReadyTimeout = 5 * time.Second
)

var (
Expand Down Expand Up @@ -212,6 +216,7 @@ type connection struct { // nolint: maligned

info *RuntimeInfoResponse

readyCh chan struct{}
outCh chan *Message
closeCh chan struct{}
quitWg sync.WaitGroup
Expand All @@ -226,6 +231,19 @@ func (c *connection) getState() state {
return s
}

// waitReady waits for the connection to become ready for at most connReadyTimeout.
func (c *connection) waitReady(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, connReadyTimeout)
defer cancel()

select {
case <-c.readyCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (c *connection) setStateLocked(s state) {
// Validate state transition.
dests := validStateTransitions[c.state]
Expand Down Expand Up @@ -417,22 +435,7 @@ func (c *connection) handleMessage(ctx context.Context, message *Message) {
switch message.MessageType {
case MessageRequest:
// Incoming request.
var allowed bool
state := c.getState()
switch {
case state == stateReady:
// All requests allowed.
allowed = true
default:
// No requests allowed.
allowed = false
}
if !allowed {
// Reject incoming requests if not in correct state.
c.logger.Warn("rejecting incoming request before being ready",
"state", state,
"request", fmt.Sprintf("%+v", message.Body),
)
if err := c.waitReady(ctx); err != nil {
_ = c.sendMessage(ctx, newResponseMessage(message, errorToBody(ErrNotReady)))
return
}
Expand Down Expand Up @@ -540,6 +543,8 @@ func (c *connection) InitGuest(conn net.Conn) error {
c.setStateLocked(stateReady)
c.Unlock()

close(c.readyCh)

return nil
}

Expand Down Expand Up @@ -587,6 +592,8 @@ func (c *connection) InitHost(ctx context.Context, conn net.Conn, hi *HostInfo)
c.info = info
c.Unlock()

close(c.readyCh)

return &rtVersion, nil
}

Expand All @@ -601,6 +608,7 @@ func NewConnection(logger *logging.Logger, runtimeID common.Namespace, handler H
handler: handler,
state: stateUninitialized,
pendingRequests: make(map[uint64]chan<- *Body),
readyCh: make(chan struct{}),
outCh: make(chan *Message),
closeCh: make(chan struct{}),
logger: logger,
Expand Down

0 comments on commit 8a75487

Please sign in to comment.