Skip to content

Commit

Permalink
Check the ready status of component before it becomes reachable. (#761)
Browse files Browse the repository at this point in the history
This fixes an situation where the component takes a while to
initialize: previously, we were declaring the component as
activated/healthy even during the pending init, resulting in
subsequent method calls failing.

Thanks to rgrandl@ for debugging the issue.

Other changes:
  * Don't add a component to the routing info until it has been
    initialized. This fixes the health-checking situation where
    one component replica becomes healthy, but we subsequently
    send method calls to other replicas which fail.
  * Fix go vet in a couple of places.
  • Loading branch information
spetrovic77 authored May 9, 2024
1 parent 3d36c90 commit eafae6b
Show file tree
Hide file tree
Showing 13 changed files with 399 additions and 377 deletions.
2 changes: 1 addition & 1 deletion examples/bankofanthos/common/ledger_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (reader *LedgerReader) pollTransactions(startingID int64) int64 {
latestID := startingID
transactionList, err := reader.dbRepo.FindLatest(startingID)
if err != nil {
reader.logger.Error("Error polling transactions", err)
reader.logger.Error("Error polling transactions", "err", err)
return latestID
}
reader.logger.Info("Polling new transactions")
Expand Down
22 changes: 11 additions & 11 deletions examples/bankofanthos/frontend/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ func (s *server) homeHandler(w http.ResponseWriter, r *http.Request) {

balance, err := s.balanceReader.Get().GetBalance(r.Context(), accountID)
if err != nil {
logger.Error("Couldn't fetch balance", err)
logger.Error("Couldn't fetch balance", "err", err)
}
txnHistory, err := s.transactionHistory.Get().GetTransactions(r.Context(), accountID)
if err != nil {
logger.Error("Couldn't fetch transaction history", err)
logger.Error("Couldn't fetch transaction history", "err", err)
}
contacts, err := s.contacts.Get().GetContacts(r.Context(), username)
if err != nil {
logger.Error("Couldn't fetch contacts", err)
logger.Error("Couldn't fetch contacts", "err", err)
}
labeledHistory := populateContactLabels(accountID, txnHistory, contacts)

Expand All @@ -149,7 +149,7 @@ func (s *server) homeHandler(w http.ResponseWriter, r *http.Request) {
"Message": r.URL.Query().Get("msg"),
"BankName": s.config.bankName,
}); err != nil {
logger.Error("couldn't generate home page", err)
logger.Error("couldn't generate home page", "err", err)
}
}

Expand Down Expand Up @@ -194,7 +194,7 @@ func (s *server) paymentHandler(w http.ResponseWriter, r *http.Request) {
token, err := r.Cookie(tokenCookieName)
if err != nil || !verifyToken(token.Value, s.config.publicKey) {
msg := "Error submitting payment: user is not authenticated"
logger.Error(msg, err)
logger.Error(msg, "err", err)
http.Error(w, msg, http.StatusUnauthorized)
return
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s *server) depositHandler(w http.ResponseWriter, r *http.Request) {
token, err := r.Cookie(tokenCookieName)
if err != nil || !verifyToken(token.Value, s.config.publicKey) {
msg := "Error submitting deposit: user is not authenticated"
logger.Error(msg, err)
logger.Error(msg, "err", err)
http.Error(w, msg, http.StatusUnauthorized)
return
}
Expand Down Expand Up @@ -415,7 +415,7 @@ func (s *server) loginGetHandler(w http.ResponseWriter, r *http.Request) {
"RedirectURI": redirectURI,
"AppName": appName,
}); err != nil {
logger.Error("couldn't generate login page", err)
logger.Error("couldn't generate login page", "err", err)
}
}

Expand All @@ -426,7 +426,7 @@ func (s *server) loginPostHandler(w http.ResponseWriter, r *http.Request) {
logger := s.Logger(r.Context())
err := s.loginPostHelper(w, r)
if err != nil {
logger.Error("/login POST failed", err, "user", r.FormValue("username"))
logger.Error("/login POST failed", "err", err, "user", r.FormValue("username"))
http.Redirect(w, r, "/login?msg=Login+Failed", http.StatusFound)
}
}
Expand Down Expand Up @@ -565,7 +565,7 @@ func (s *server) consentGetHandler(w http.ResponseWriter, r *http.Request) {
"RedirectURI": redirectURI,
"AppName": appName,
}); err != nil {
logger.Error("couldn't generate consent.html", err)
logger.Error("couldn't generate consent.html", "err", err)
}
return
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func (s *server) signupGetHandler(w http.ResponseWriter, r *http.Request) {
"PodZone": s.config.podZone,
"BankName": s.config.bankName,
}); err != nil {
logger.Error("couldn't generate consent.html", err)
logger.Error("couldn't generate consent.html", "err", err)
}
}

Expand All @@ -637,7 +637,7 @@ func (s *server) signupPostHandler(w http.ResponseWriter, r *http.Request) {
}
err := s.userService.Get().CreateUser(r.Context(), creq)
if err != nil {
logger.Debug("Error creating new user", "err", err)
logger.Error("Error creating new user", "err", err)
url := fmt.Sprintf("/login?msg=Account creation failed: %s", err)
http.Redirect(w, r, url, http.StatusSeeOther)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (i *impl) processTransactionForAcct(accountID string, transaction model.Tra
i.Logger(ctx).Debug("Processing transaction", "accountID", accountID, "transaction", transaction)
got, err := i.txnCache.c.Get(accountID)
if err != nil {
i.Logger(ctx).Error("processTransactionForAcct failed", err)
i.Logger(ctx).Error("processTransactionForAcct failed", "err", err)
return
}
txns := got.([]model.Transaction)
Expand Down
1 change: 1 addition & 0 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ github.com/ServiceWeaver/weaver/internal/weaver
sort
strings
sync
sync/atomic
syscall
time
github.com/ServiceWeaver/weaver/metrics
Expand Down
11 changes: 2 additions & 9 deletions internal/net/call/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,12 @@ type HandlerMap struct {
}

// NewHandlerMap returns a handler map to which the server handlers can
// be added. A "ready" handler is automatically registered in the new
// returned map.
// be added.
func NewHandlerMap() *HandlerMap {
hm := &HandlerMap{
return &HandlerMap{
handlers: map[MethodKey]Handler{},
names: map[MethodKey]string{},
}
// Add a dummy "ready" handler. Clients will repeatedly call this
// RPC until it responds successfully, ensuring the server is ready.
hm.Set("", "ready", func(context.Context, []byte) ([]byte, error) {
return nil, nil
})
return hm
}

// Set registers a handler for the specified method of component.
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (p *Proxy) director(r *http.Request) {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.backends) == 0 {
p.logger.Error("director", errors.New("no backends"), "url", r.URL)
p.logger.Error("director", "err", errors.New("no backends"), "url", r.URL)
return
}
r.URL.Scheme = "http" // TODO(mwhittaker): Support HTTPS.
Expand Down
44 changes: 30 additions & 14 deletions internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"

"github.com/ServiceWeaver/weaver/internal/config"
"github.com/ServiceWeaver/weaver/internal/control"
Expand All @@ -45,8 +46,9 @@ import (
"golang.org/x/sync/errgroup"
)

// readyMethodKey holds the key for a method used to check if a backend is ready.
var readyMethodKey = call.MakeMethodKey("", "ready")
// readyMethodName holds the name of the special component method used by the
// clients to check if a component is ready.
const readyMethodName = "ready"

// RemoteWeaveletOptions configure a RemoteWeavelet.
type RemoteWeaveletOptions struct {
Expand Down Expand Up @@ -113,16 +115,10 @@ type component struct {

implInit sync.Once // used to initialize impl, severStub
implErr error // non-nil if impl creation fails
implReady atomic.Bool // true only after impl creation succeeds
impl any // instance of component implementation
serverStub codegen.Server // handles remote calls from other processes

// TODO(mwhittaker): We have one client for every component. Every client
// independently maintains network connections to every weavelet hosting
// the component. Thus, there may be many redundant network connections to
// the same weavelet. Given n weavelets hosting m components, there's at
// worst n^2m connections rather than a more optimal n^2 (a single
// connection between every pair of weavelets). We should rewrite things to
// avoid the redundancy.
resolver *routingResolver // client resolver
balancer *routingBalancer // client balancer

Expand Down Expand Up @@ -419,6 +415,7 @@ func (w *RemoteWeavelet) GetImpl(t reflect.Type) (any, error) {
return
} else {
w.syslogger.Debug("Constructed", "component", name)
c.implReady.Store(true)
}

logger := w.logger(c.reg.Name)
Expand Down Expand Up @@ -513,7 +510,7 @@ func (w *RemoteWeavelet) makeStub(fullName string, reg *codegen.Registration, re
return nil, err
}
if wait {
if err := waitUntilReady(w.ctx, conn); err != nil {
if err := waitUntilReady(w.ctx, conn, fullName); err != nil {
w.syslogger.Error("Failed to wait for remote", "component", name, "err", err)
return nil, err
}
Expand Down Expand Up @@ -647,7 +644,16 @@ func (w *RemoteWeavelet) UpdateRoutingInfo(ctx context.Context, req *protos.Upda

// GetHealth implements controller.GetHealth.
func (w *RemoteWeavelet) GetHealth(ctx context.Context, req *protos.GetHealthRequest) (*protos.GetHealthReply, error) {
return &protos.GetHealthReply{Status: protos.HealthStatus_HEALTHY}, nil
// Get the health status for all components. For now, we consider a component
// healthy iff it has been successfully initialized. In the future, we will
// maintain a real-time health for each component.
reply := &protos.GetHealthReply{Status: protos.HealthStatus_HEALTHY}
for cname, c := range w.componentsByName {
if c.implReady.Load() {
reply.HealthyComponents = append(reply.HealthyComponents, cname)
}
}
return reply, nil
}

// GetMetrics implements controller.GetMetrics.
Expand Down Expand Up @@ -718,6 +724,16 @@ func (w *RemoteWeavelet) addHandlers(handlers *call.HandlerMap, c *component) {
}
handlers.Set(c.reg.Name, mname, handler)
}

// Add the special "component is ready" method handler, which is used by
// the clients to wait for the component to be ready before receiving traffic
// (see waitUntilReady).
handlers.Set(c.reg.Name, readyMethodName, func(context.Context, []byte) ([]byte, error) {
if c.implReady.Load() {
return nil, nil
}
return nil, call.Unreachable
})
}

// repeatedly repeatedly executes f until it succeeds or until ctx is cancelled.
Expand Down Expand Up @@ -924,10 +940,10 @@ func (s *server) handlers(components []string) (*call.HandlerMap, error) {
}

// waitUntilReady blocks until a successful call to the "ready" method is made
// on the provided client.
func waitUntilReady(ctx context.Context, client call.Connection) error {
// on the provided component.
func waitUntilReady(ctx context.Context, client call.Connection, fullComponentName string) error {
for r := retry.Begin(); r.Continue(ctx); {
_, err := client.Call(ctx, readyMethodKey, nil, call.CallOptions{})
_, err := client.Call(ctx, call.MakeMethodKey(fullComponentName, readyMethodName), nil, call.CallOptions{})
if err == nil || !errors.Is(err, call.Unreachable) {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/bin/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
// the value of version.DeployerVersion. If the string is not a
// constant---if we try to use fmt.Sprintf, for example---it will not be
// embedded in a Service Weaver binary.
versionData = "⟦wEaVeRvErSiOn:deployer=v0.23.0⟧"
versionData = "⟦wEaVeRvErSiOn:deployer=v0.24.0⟧"
}

// rodata returns the read-only data section of the provided binary.
Expand Down
6 changes: 3 additions & 3 deletions runtime/envelope/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,12 @@ func (e *Envelope) WeaveletAddress() string {
}

// GetHealth returns the health status of the weavelet.
func (e *Envelope) GetHealth() protos.HealthStatus {
func (e *Envelope) GetHealth() *protos.GetHealthReply {
reply, err := e.controller.GetHealth(context.TODO(), &protos.GetHealthRequest{})
if err != nil {
return protos.HealthStatus_UNKNOWN
return &protos.GetHealthReply{Status: protos.HealthStatus_UNKNOWN}
}
return reply.Status
return reply
}

// GetProfile gets a profile from the weavelet.
Expand Down
Loading

0 comments on commit eafae6b

Please sign in to comment.