Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "WaitForReady" method #31

Merged
merged 2 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"

Expand All @@ -16,6 +15,7 @@ import (
"github.com/smartcontractkit/wsrpc/internal/message"
"github.com/smartcontractkit/wsrpc/internal/transport"
"github.com/smartcontractkit/wsrpc/internal/wsrpcsync"
"github.com/smartcontractkit/wsrpc/logger"
)

var (
Expand Down Expand Up @@ -55,6 +55,8 @@ type ClientConn struct {

// The RPC service definition
service *serviceInfo

logger logger.Logger
}

func Dial(target string, opts ...DialOption) (*ClientConn, error) {
Expand Down Expand Up @@ -121,6 +123,28 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connec
}
}

// WaitForReady waits until the state becomes Ready
// It returns true when that happens
// It returns false if the context is cancelled, or the conn is shut down
func (cc *ClientConn) WaitForReady(ctx context.Context) bool {
ch := cc.csMgr.getNotifyChan()
switch cc.csMgr.getState() {
case connectivity.Ready:
return true
case connectivity.Shutdown:
return false
case connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure:
break
}
cc.logger.Debugf("Waiting for connection to be ready, current state: %s", cc.csMgr.getState())
select {
case <-ctx.Done():
return false
case <-ch:
return cc.WaitForReady(ctx)
}
}

// GetState gets the current connectivity state.
func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
Expand Down Expand Up @@ -201,7 +225,7 @@ func (cc *ClientConn) handleRead(done <-chan struct{}) {
case *message.Message_Response:
go cc.handleMessageResponse(ex.Response)
default:
log.Println("Invalid message type")
cc.logger.Errorf("Invalid message type: %T", ex)
}
case <-done:
return
Expand Down Expand Up @@ -233,7 +257,7 @@ func (cc *ClientConn) handleMessageRequest(r *message.Request) {
}

if err := cc.conn.transport.Write(replyMsg); err != nil {
log.Printf("error writing to transport: %s", err)
cc.logger.Errorf("error writing to transport: %s", err)
}
}
}
Expand Down Expand Up @@ -443,6 +467,7 @@ func (ac *addrConn) resetTransport() {

newTr, reconnect, err := ac.createTransport(addr, copts)
if err != nil {
ac.dopts.logger.Errorf("failed to connect to server at %s, got: %v", addr, err)
// After connection failure, the addrConn enters TRANSIENT_FAILURE.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
Expand All @@ -454,7 +479,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Unlock()

// Reconnection backoff time
log.Println("[wsrpc] attempting reconnection in", backoffFor)
ac.dopts.logger.Infof("attempting reconnection in %s", backoffFor)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to use %s here? backoffFor is a time.Duration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it will call .String() and print a human readable duration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timer := time.NewTimer(backoffFor)

select {
Expand Down Expand Up @@ -484,13 +509,13 @@ func (ac *addrConn) resetTransport() {

ac.mu.Unlock()

log.Println("[wsrpc] Connected to", ac.addr)
ac.dopts.logger.Debugf("Connected to %s", ac.addr)

// Block until the created transport is down. When this happens, we
// attempt to reconnect by starting again from the top
<-reconnect.Done()

log.Println("[wsrpc] Reconnecting to server...")
ac.dopts.logger.Info("Reconnecting to server...")
}
}

Expand All @@ -514,7 +539,7 @@ func (ac *addrConn) createTransport(addr string, copts transport.ConnectOptions)
reconnect.Fire()
}

tr, err := transport.NewClientTransport(ac.cc.ctx, addr, copts, onClose)
tr, err := transport.NewClientTransport(ac.cc.ctx, ac.dopts.logger, addr, copts, onClose)

return tr, reconnect, err
}
Expand Down
17 changes: 13 additions & 4 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"github.com/smartcontractkit/wsrpc/credentials"
"github.com/smartcontractkit/wsrpc/internal/backoff"
"github.com/smartcontractkit/wsrpc/internal/transport"
"github.com/smartcontractkit/wsrpc/logger"
)

// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
copts transport.ConnectOptions
bs backoff.Strategy
block bool
copts transport.ConnectOptions
bs backoff.Strategy
block bool
logger logger.Logger
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -74,8 +76,15 @@ func WithWriteTimeout(d time.Duration) DialOption {
})
}

func WithLogger(lggr logger.Logger) DialOption {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually might be preferable to just make it required.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to not make backwards incompatible changes here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current change is backwards-compatible.

return newFuncDialOption(func(o *dialOptions) {
o.logger = lggr
})
}

func defaultDialOptions() dialOptions {
return dialOptions{
copts: transport.ConnectOptions{},
copts: transport.ConnectOptions{},
logger: logger.DefaultLogger,
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/websocket v1.4.2
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.0
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.26.0
)
61 changes: 55 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,28 +1,77 @@
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
5 changes: 3 additions & 2 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/gorilla/websocket"

"github.com/smartcontractkit/wsrpc/credentials"
"github.com/smartcontractkit/wsrpc/logger"
)

const (
Expand Down Expand Up @@ -44,8 +45,8 @@ type ClientTransport interface {

// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(ctx context.Context, addr string, opts ConnectOptions, onClose func()) (ClientTransport, error) {
return newWebsocketClient(ctx, addr, opts, onClose)
func NewClientTransport(ctx context.Context, lggr logger.Logger, addr string, opts ConnectOptions, onClose func()) (ClientTransport, error) {
return newWebsocketClient(ctx, lggr, addr, opts, onClose)
}

// state of transport.
Expand Down
11 changes: 7 additions & 4 deletions internal/transport/websocket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package transport
import (
"context"
"fmt"
"log"
"net/http"
"time"

"github.com/gorilla/websocket"
"github.com/smartcontractkit/wsrpc/logger"
)

// WebsocketClient implements the ClientTransport interface with websockets.
Expand All @@ -31,11 +31,13 @@ type WebsocketClient struct {
done chan struct{}
// A signal channel called when the transport is closed
interrupt chan struct{}

log logger.Logger
}

// newWebsocketClient establishes the transport with the required ConnectOptions
// and returns it to the caller.
func newWebsocketClient(ctx context.Context, addr string, opts ConnectOptions, onClose func()) (_ *WebsocketClient, err error) {
func newWebsocketClient(ctx context.Context, log logger.Logger, addr string, opts ConnectOptions, onClose func()) (_ *WebsocketClient, err error) {
writeTimeout := defaultWriteTimeout
if opts.WriteTimeout != 0 {
writeTimeout = opts.WriteTimeout
Expand All @@ -61,6 +63,7 @@ func newWebsocketClient(ctx context.Context, addr string, opts ConnectOptions, o
read: make(chan []byte), // Should this be buffered?
done: make(chan struct{}),
interrupt: make(chan struct{}),
log: log,
}

// Start go routines to establish the read/write channels
Expand Down Expand Up @@ -115,7 +118,7 @@ func (c *WebsocketClient) readPump() {
for {
_, msg, err := c.conn.ReadMessage()
if err != nil {
log.Println("[wsrpc] Read error: ", err)
c.log.Errorw("[wsrpc] Read error", "err", err)

return
}
Expand Down Expand Up @@ -146,7 +149,7 @@ func (c *WebsocketClient) writePump() {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Printf("[wsrpc] write error: %v\n", err)
c.log.Errorf("Write error: %v", err)

c.conn.Close()

Expand Down
Loading