Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check the ready status of component before it becomes reachable. #761

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/bankofanthos/common/ledger_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (reader *LedgerReader) pollTransactions(startingID int64) int64 {
latestID := startingID
transactionList, err := reader.dbRepo.FindLatest(startingID)
if err != nil {
reader.logger.Error("Error polling transactions", err)
reader.logger.Error("Error polling transactions", "err", err)
return latestID
}
reader.logger.Info("Polling new transactions")
Expand Down
22 changes: 11 additions & 11 deletions examples/bankofanthos/frontend/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ func (s *server) homeHandler(w http.ResponseWriter, r *http.Request) {

balance, err := s.balanceReader.Get().GetBalance(r.Context(), accountID)
if err != nil {
logger.Error("Couldn't fetch balance", err)
logger.Error("Couldn't fetch balance", "err", err)
}
txnHistory, err := s.transactionHistory.Get().GetTransactions(r.Context(), accountID)
if err != nil {
logger.Error("Couldn't fetch transaction history", err)
logger.Error("Couldn't fetch transaction history", "err", err)
}
contacts, err := s.contacts.Get().GetContacts(r.Context(), username)
if err != nil {
logger.Error("Couldn't fetch contacts", err)
logger.Error("Couldn't fetch contacts", "err", err)
}
labeledHistory := populateContactLabels(accountID, txnHistory, contacts)

Expand All @@ -149,7 +149,7 @@ func (s *server) homeHandler(w http.ResponseWriter, r *http.Request) {
"Message": r.URL.Query().Get("msg"),
"BankName": s.config.bankName,
}); err != nil {
logger.Error("couldn't generate home page", err)
logger.Error("couldn't generate home page", "err", err)
}
}

Expand Down Expand Up @@ -194,7 +194,7 @@ func (s *server) paymentHandler(w http.ResponseWriter, r *http.Request) {
token, err := r.Cookie(tokenCookieName)
if err != nil || !verifyToken(token.Value, s.config.publicKey) {
msg := "Error submitting payment: user is not authenticated"
logger.Error(msg, err)
logger.Error(msg, "err", err)
http.Error(w, msg, http.StatusUnauthorized)
return
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s *server) depositHandler(w http.ResponseWriter, r *http.Request) {
token, err := r.Cookie(tokenCookieName)
if err != nil || !verifyToken(token.Value, s.config.publicKey) {
msg := "Error submitting deposit: user is not authenticated"
logger.Error(msg, err)
logger.Error(msg, "err", err)
http.Error(w, msg, http.StatusUnauthorized)
return
}
Expand Down Expand Up @@ -415,7 +415,7 @@ func (s *server) loginGetHandler(w http.ResponseWriter, r *http.Request) {
"RedirectURI": redirectURI,
"AppName": appName,
}); err != nil {
logger.Error("couldn't generate login page", err)
logger.Error("couldn't generate login page", "err", err)
}
}

Expand All @@ -426,7 +426,7 @@ func (s *server) loginPostHandler(w http.ResponseWriter, r *http.Request) {
logger := s.Logger(r.Context())
err := s.loginPostHelper(w, r)
if err != nil {
logger.Error("/login POST failed", err, "user", r.FormValue("username"))
logger.Error("/login POST failed", "err", err, "user", r.FormValue("username"))
http.Redirect(w, r, "/login?msg=Login+Failed", http.StatusFound)
}
}
Expand Down Expand Up @@ -565,7 +565,7 @@ func (s *server) consentGetHandler(w http.ResponseWriter, r *http.Request) {
"RedirectURI": redirectURI,
"AppName": appName,
}); err != nil {
logger.Error("couldn't generate consent.html", err)
logger.Error("couldn't generate consent.html", "err", err)
}
return
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func (s *server) signupGetHandler(w http.ResponseWriter, r *http.Request) {
"PodZone": s.config.podZone,
"BankName": s.config.bankName,
}); err != nil {
logger.Error("couldn't generate consent.html", err)
logger.Error("couldn't generate consent.html", "err", err)
}
}

Expand All @@ -637,7 +637,7 @@ func (s *server) signupPostHandler(w http.ResponseWriter, r *http.Request) {
}
err := s.userService.Get().CreateUser(r.Context(), creq)
if err != nil {
logger.Debug("Error creating new user", "err", err)
logger.Error("Error creating new user", "err", err)
url := fmt.Sprintf("/login?msg=Account creation failed: %s", err)
http.Redirect(w, r, url, http.StatusSeeOther)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (i *impl) processTransactionForAcct(accountID string, transaction model.Tra
i.Logger(ctx).Debug("Processing transaction", "accountID", accountID, "transaction", transaction)
got, err := i.txnCache.c.Get(accountID)
if err != nil {
i.Logger(ctx).Error("processTransactionForAcct failed", err)
i.Logger(ctx).Error("processTransactionForAcct failed", "err", err)
return
}
txns := got.([]model.Transaction)
Expand Down
1 change: 1 addition & 0 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ github.com/ServiceWeaver/weaver/internal/weaver
sort
strings
sync
sync/atomic
syscall
time
github.com/ServiceWeaver/weaver/metrics
Expand Down
11 changes: 2 additions & 9 deletions internal/net/call/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,12 @@ type HandlerMap struct {
}

// NewHandlerMap returns a handler map to which the server handlers can
// be added. A "ready" handler is automatically registered in the new
// returned map.
// be added.
func NewHandlerMap() *HandlerMap {
hm := &HandlerMap{
return &HandlerMap{
handlers: map[MethodKey]Handler{},
names: map[MethodKey]string{},
}
// Add a dummy "ready" handler. Clients will repeatedly call this
// RPC until it responds successfully, ensuring the server is ready.
hm.Set("", "ready", func(context.Context, []byte) ([]byte, error) {
return nil, nil
})
return hm
}

// Set registers a handler for the specified method of component.
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (p *Proxy) director(r *http.Request) {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.backends) == 0 {
p.logger.Error("director", errors.New("no backends"), "url", r.URL)
p.logger.Error("director", "err", errors.New("no backends"), "url", r.URL)
return
}
r.URL.Scheme = "http" // TODO(mwhittaker): Support HTTPS.
Expand Down
44 changes: 30 additions & 14 deletions internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"

"github.com/ServiceWeaver/weaver/internal/config"
"github.com/ServiceWeaver/weaver/internal/control"
Expand All @@ -45,8 +46,9 @@ import (
"golang.org/x/sync/errgroup"
)

// readyMethodKey holds the key for a method used to check if a backend is ready.
var readyMethodKey = call.MakeMethodKey("", "ready")
// readyMethodName holds the name of the special component method used by the
// clients to check if a component is ready.
const readyMethodName = "ready"

// RemoteWeaveletOptions configure a RemoteWeavelet.
type RemoteWeaveletOptions struct {
Expand Down Expand Up @@ -113,16 +115,10 @@ type component struct {

implInit sync.Once // used to initialize impl, severStub
implErr error // non-nil if impl creation fails
implReady atomic.Bool // true only after impl creation succeeds
impl any // instance of component implementation
serverStub codegen.Server // handles remote calls from other processes

// TODO(mwhittaker): We have one client for every component. Every client
// independently maintains network connections to every weavelet hosting
// the component. Thus, there may be many redundant network connections to
// the same weavelet. Given n weavelets hosting m components, there's at
// worst n^2m connections rather than a more optimal n^2 (a single
// connection between every pair of weavelets). We should rewrite things to
// avoid the redundancy.
resolver *routingResolver // client resolver
balancer *routingBalancer // client balancer

Expand Down Expand Up @@ -419,6 +415,7 @@ func (w *RemoteWeavelet) GetImpl(t reflect.Type) (any, error) {
return
} else {
w.syslogger.Debug("Constructed", "component", name)
c.implReady.Store(true)
}

logger := w.logger(c.reg.Name)
Expand Down Expand Up @@ -513,7 +510,7 @@ func (w *RemoteWeavelet) makeStub(fullName string, reg *codegen.Registration, re
return nil, err
}
if wait {
if err := waitUntilReady(w.ctx, conn); err != nil {
if err := waitUntilReady(w.ctx, conn, fullName); err != nil {
w.syslogger.Error("Failed to wait for remote", "component", name, "err", err)
return nil, err
}
Expand Down Expand Up @@ -647,7 +644,16 @@ func (w *RemoteWeavelet) UpdateRoutingInfo(ctx context.Context, req *protos.Upda

// GetHealth implements controller.GetHealth.
func (w *RemoteWeavelet) GetHealth(ctx context.Context, req *protos.GetHealthRequest) (*protos.GetHealthReply, error) {
return &protos.GetHealthReply{Status: protos.HealthStatus_HEALTHY}, nil
// Get the health status for all components. For now, we consider a component
// healthy iff it has been successfully initialized. In the future, we will
// maintain a real-time health for each component.
reply := &protos.GetHealthReply{Status: protos.HealthStatus_HEALTHY}
for cname, c := range w.componentsByName {
if c.implReady.Load() {
reply.HealthyComponents = append(reply.HealthyComponents, cname)
}
}
return reply, nil
}

// GetMetrics implements controller.GetMetrics.
Expand Down Expand Up @@ -718,6 +724,16 @@ func (w *RemoteWeavelet) addHandlers(handlers *call.HandlerMap, c *component) {
}
handlers.Set(c.reg.Name, mname, handler)
}

// Add the special "component is ready" method handler, which is used by
// the clients to wait for the component to be ready before receiving traffic
// (see waitUntilReady).
handlers.Set(c.reg.Name, readyMethodName, func(context.Context, []byte) ([]byte, error) {
if c.implReady.Load() {
return nil, nil
}
return nil, call.Unreachable
})
}

// repeatedly repeatedly executes f until it succeeds or until ctx is cancelled.
Expand Down Expand Up @@ -924,10 +940,10 @@ func (s *server) handlers(components []string) (*call.HandlerMap, error) {
}

// waitUntilReady blocks until a successful call to the "ready" method is made
// on the provided client.
func waitUntilReady(ctx context.Context, client call.Connection) error {
// on the provided component.
func waitUntilReady(ctx context.Context, client call.Connection, fullComponentName string) error {
for r := retry.Begin(); r.Continue(ctx); {
_, err := client.Call(ctx, readyMethodKey, nil, call.CallOptions{})
_, err := client.Call(ctx, call.MakeMethodKey(fullComponentName, readyMethodName), nil, call.CallOptions{})
if err == nil || !errors.Is(err, call.Unreachable) {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/bin/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
// the value of version.DeployerVersion. If the string is not a
// constant---if we try to use fmt.Sprintf, for example---it will not be
// embedded in a Service Weaver binary.
versionData = "⟦wEaVeRvErSiOn:deployer=v0.23.0⟧"
versionData = "⟦wEaVeRvErSiOn:deployer=v0.24.0⟧"
}

// rodata returns the read-only data section of the provided binary.
Expand Down
6 changes: 3 additions & 3 deletions runtime/envelope/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,12 @@ func (e *Envelope) WeaveletAddress() string {
}

// GetHealth returns the health status of the weavelet.
func (e *Envelope) GetHealth() protos.HealthStatus {
func (e *Envelope) GetHealth() *protos.GetHealthReply {
reply, err := e.controller.GetHealth(context.TODO(), &protos.GetHealthRequest{})
if err != nil {
return protos.HealthStatus_UNKNOWN
return &protos.GetHealthReply{Status: protos.HealthStatus_UNKNOWN}
}
return reply.Status
return reply
}

// GetProfile gets a profile from the weavelet.
Expand Down
Loading
Loading