Skip to content

Commit

Permalink
Merge pull request #31 from smartcontractkit/wait_for_ready
Browse files Browse the repository at this point in the history
Add "WaitForReady" method
  • Loading branch information
samsondav authored Mar 7, 2023
2 parents 7a25841 + 3f5d73d commit d7d5168
Show file tree
Hide file tree
Showing 8 changed files with 469 additions and 24 deletions.
39 changes: 32 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"

Expand All @@ -16,6 +15,7 @@ import (
"github.com/smartcontractkit/wsrpc/internal/message"
"github.com/smartcontractkit/wsrpc/internal/transport"
"github.com/smartcontractkit/wsrpc/internal/wsrpcsync"
"github.com/smartcontractkit/wsrpc/logger"
)

var (
Expand Down Expand Up @@ -55,6 +55,8 @@ type ClientConn struct {

// The RPC service definition
service *serviceInfo

logger logger.Logger
}

func Dial(target string, opts ...DialOption) (*ClientConn, error) {
Expand Down Expand Up @@ -121,6 +123,28 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connec
}
}

// WaitForReady waits until the state becomes Ready
// It returns true when that happens
// It returns false if the context is cancelled, or the conn is shut down
func (cc *ClientConn) WaitForReady(ctx context.Context) bool {
ch := cc.csMgr.getNotifyChan()
switch cc.csMgr.getState() {
case connectivity.Ready:
return true
case connectivity.Shutdown:
return false
case connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure:
break
}
cc.logger.Debugf("Waiting for connection to be ready, current state: %s", cc.csMgr.getState())
select {
case <-ctx.Done():
return false
case <-ch:
return cc.WaitForReady(ctx)
}
}

// GetState gets the current connectivity state.
func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
Expand Down Expand Up @@ -201,7 +225,7 @@ func (cc *ClientConn) handleRead(done <-chan struct{}) {
case *message.Message_Response:
go cc.handleMessageResponse(ex.Response)
default:
log.Println("Invalid message type")
cc.logger.Errorf("Invalid message type: %T", ex)
}
case <-done:
return
Expand Down Expand Up @@ -233,7 +257,7 @@ func (cc *ClientConn) handleMessageRequest(r *message.Request) {
}

if err := cc.conn.transport.Write(replyMsg); err != nil {
log.Printf("error writing to transport: %s", err)
cc.logger.Errorf("error writing to transport: %s", err)
}
}
}
Expand Down Expand Up @@ -443,6 +467,7 @@ func (ac *addrConn) resetTransport() {

newTr, reconnect, err := ac.createTransport(addr, copts)
if err != nil {
ac.dopts.logger.Errorf("failed to connect to server at %s, got: %v", addr, err)
// After connection failure, the addrConn enters TRANSIENT_FAILURE.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
Expand All @@ -454,7 +479,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Unlock()

// Reconnection backoff time
log.Println("[wsrpc] attempting reconnection in", backoffFor)
ac.dopts.logger.Infof("attempting reconnection in %s", backoffFor)
timer := time.NewTimer(backoffFor)

select {
Expand Down Expand Up @@ -484,13 +509,13 @@ func (ac *addrConn) resetTransport() {

ac.mu.Unlock()

log.Println("[wsrpc] Connected to", ac.addr)
ac.dopts.logger.Debugf("Connected to %s", ac.addr)

// Block until the created transport is down. When this happens, we
// attempt to reconnect by starting again from the top
<-reconnect.Done()

log.Println("[wsrpc] Reconnecting to server...")
ac.dopts.logger.Info("Reconnecting to server...")
}
}

Expand All @@ -514,7 +539,7 @@ func (ac *addrConn) createTransport(addr string, copts transport.ConnectOptions)
reconnect.Fire()
}

tr, err := transport.NewClientTransport(ac.cc.ctx, addr, copts, onClose)
tr, err := transport.NewClientTransport(ac.cc.ctx, ac.dopts.logger, addr, copts, onClose)

return tr, reconnect, err
}
Expand Down
17 changes: 13 additions & 4 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"github.com/smartcontractkit/wsrpc/credentials"
"github.com/smartcontractkit/wsrpc/internal/backoff"
"github.com/smartcontractkit/wsrpc/internal/transport"
"github.com/smartcontractkit/wsrpc/logger"
)

// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
copts transport.ConnectOptions
bs backoff.Strategy
block bool
copts transport.ConnectOptions
bs backoff.Strategy
block bool
logger logger.Logger
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -74,8 +76,15 @@ func WithWriteTimeout(d time.Duration) DialOption {
})
}

func WithLogger(lggr logger.Logger) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.logger = lggr
})
}

func defaultDialOptions() dialOptions {
return dialOptions{
copts: transport.ConnectOptions{},
copts: transport.ConnectOptions{},
logger: logger.DefaultLogger,
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/websocket v1.4.2
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.0
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.26.0
)
61 changes: 55 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,28 +1,77 @@
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
5 changes: 3 additions & 2 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/gorilla/websocket"

"github.com/smartcontractkit/wsrpc/credentials"
"github.com/smartcontractkit/wsrpc/logger"
)

const (
Expand Down Expand Up @@ -44,8 +45,8 @@ type ClientTransport interface {

// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(ctx context.Context, addr string, opts ConnectOptions, onClose func()) (ClientTransport, error) {
return newWebsocketClient(ctx, addr, opts, onClose)
func NewClientTransport(ctx context.Context, lggr logger.Logger, addr string, opts ConnectOptions, onClose func()) (ClientTransport, error) {
return newWebsocketClient(ctx, lggr, addr, opts, onClose)
}

// state of transport.
Expand Down
11 changes: 7 additions & 4 deletions internal/transport/websocket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package transport
import (
"context"
"fmt"
"log"
"net/http"
"time"

"github.com/gorilla/websocket"
"github.com/smartcontractkit/wsrpc/logger"
)

// WebsocketClient implements the ClientTransport interface with websockets.
Expand All @@ -31,11 +31,13 @@ type WebsocketClient struct {
done chan struct{}
// A signal channel called when the transport is closed
interrupt chan struct{}

log logger.Logger
}

// newWebsocketClient establishes the transport with the required ConnectOptions
// and returns it to the caller.
func newWebsocketClient(ctx context.Context, addr string, opts ConnectOptions, onClose func()) (_ *WebsocketClient, err error) {
func newWebsocketClient(ctx context.Context, log logger.Logger, addr string, opts ConnectOptions, onClose func()) (_ *WebsocketClient, err error) {
writeTimeout := defaultWriteTimeout
if opts.WriteTimeout != 0 {
writeTimeout = opts.WriteTimeout
Expand All @@ -61,6 +63,7 @@ func newWebsocketClient(ctx context.Context, addr string, opts ConnectOptions, o
read: make(chan []byte), // Should this be buffered?
done: make(chan struct{}),
interrupt: make(chan struct{}),
log: log,
}

// Start go routines to establish the read/write channels
Expand Down Expand Up @@ -115,7 +118,7 @@ func (c *WebsocketClient) readPump() {
for {
_, msg, err := c.conn.ReadMessage()
if err != nil {
log.Println("[wsrpc] Read error: ", err)
c.log.Errorw("[wsrpc] Read error", "err", err)

return
}
Expand Down Expand Up @@ -146,7 +149,7 @@ func (c *WebsocketClient) writePump() {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Printf("[wsrpc] write error: %v\n", err)
c.log.Errorf("Write error: %v", err)

c.conn.Close()

Expand Down
Loading

0 comments on commit d7d5168

Please sign in to comment.