Skip to content

Commit

Permalink
rpcclient: Add a lifetime to requests.
Browse files Browse the repository at this point in the history
Store the initial context in result types and stop waiting on context
done with ErrRequestCanceled by adding a context argument to
receiveFuture.
  • Loading branch information
JoeGruffins committed May 16, 2020
1 parent 3edcbbd commit 0d483f5
Show file tree
Hide file tree
Showing 9 changed files with 461 additions and 444 deletions.
232 changes: 116 additions & 116 deletions rpcclient/chain.go

Large diffs are not rendered by default.

248 changes: 124 additions & 124 deletions rpcclient/extensions.go

Large diffs are not rendered by default.

27 changes: 22 additions & 5 deletions rpcclient/infrastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ var (
// client having already connected to the RPC server.
ErrClientAlreadyConnected = errors.New("websocket client has already " +
"connected")

// ErrRequestCanceled is an error to describe the condition where
// a request was canceled by the caller by terminating the passed
// context.
ErrRequestCanceled = errors.New("request was canceled by the caller")
)

const (
Expand All @@ -93,6 +98,12 @@ const (
pingInterval = time.Second * 10
)

// cmdRes holds command results.
type cmdRes struct {
ctx context.Context
c chan *response
}

// sendPostDetails houses an HTTP POST request to send to an RPC server as well
// as the original JSON-RPC command and a channel to reply on when the server
// responds with the result.
Expand Down Expand Up @@ -840,10 +851,16 @@ func newFutureError(err error) chan *response {
// receiveFuture receives from the passed futureResult channel to extract a
// reply or any errors. The examined errors include an error in the
// futureResult and the error in the reply from the server. This will block
// until the result is available on the passed channel.
func receiveFuture(f chan *response) ([]byte, error) {
// Wait for a response on the returned channel.
r := <-f
// until the result is available on the passed channel or the passed context
// is done.
func receiveFuture(ctx context.Context, f chan *response) ([]byte, error) {
// Wait for a response on the returned channel or context done.
r := new(response)
select {
case r = <-f:
case <-ctx.Done():
r.err = ErrRequestCanceled
}
return r.result, r.err
}

Expand Down Expand Up @@ -947,7 +964,7 @@ func (c *Client) sendCmd(ctx context.Context, cmd interface{}) chan *response {
func (c *Client) sendCmdAndWait(ctx context.Context, cmd interface{}) (interface{}, error) {
// Marshal the command to JSON-RPC, send it to the connected server, and
// wait for a response on the returned channel.
return receiveFuture(c.sendCmd(ctx, cmd))
return receiveFuture(ctx, c.sendCmd(ctx, cmd))
}

// Disconnected returns whether or not the server is disconnected. If a
Expand Down
110 changes: 55 additions & 55 deletions rpcclient/mining.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (

// FutureGenerateResult is a future promise to deliver the result of a
// GenerateAsync RPC invocation (or an applicable error).
type FutureGenerateResult chan *response
type FutureGenerateResult cmdRes

// Receive waits for the response promised by the future and returns a list of
// block hashes generated by the call.
func (r FutureGenerateResult) Receive() ([]*chainhash.Hash, error) {
res, err := receiveFuture(r)
func (r *FutureGenerateResult) Receive() ([]*chainhash.Hash, error) {
res, err := receiveFuture(r.ctx, r.c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -53,9 +53,9 @@ func (r FutureGenerateResult) Receive() ([]*chainhash.Hash, error) {
// the returned instance.
//
// See Generate for the blocking version and more details.
func (c *Client) GenerateAsync(ctx context.Context, numBlocks uint32) FutureGenerateResult {
func (c *Client) GenerateAsync(ctx context.Context, numBlocks uint32) *FutureGenerateResult {
cmd := chainjson.NewGenerateCmd(numBlocks)
return c.sendCmd(ctx, cmd)
return &FutureGenerateResult{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// Generate generates numBlocks blocks and returns their hashes.
Expand All @@ -65,12 +65,12 @@ func (c *Client) Generate(ctx context.Context, numBlocks uint32) ([]*chainhash.H

// FutureGetGenerateResult is a future promise to deliver the result of a
// GetGenerateAsync RPC invocation (or an applicable error).
type FutureGetGenerateResult chan *response
type FutureGetGenerateResult cmdRes

// Receive waits for the response promised by the future and returns true if the
// server is set to mine, otherwise false.
func (r FutureGetGenerateResult) Receive() (bool, error) {
res, err := receiveFuture(r)
func (r *FutureGetGenerateResult) Receive() (bool, error) {
res, err := receiveFuture(r.ctx, r.c)
if err != nil {
return false, err
}
Expand All @@ -90,9 +90,9 @@ func (r FutureGetGenerateResult) Receive() (bool, error) {
// the returned instance.
//
// See GetGenerate for the blocking version and more details.
func (c *Client) GetGenerateAsync(ctx context.Context) FutureGetGenerateResult {
func (c *Client) GetGenerateAsync(ctx context.Context) *FutureGetGenerateResult {
cmd := chainjson.NewGetGenerateCmd()
return c.sendCmd(ctx, cmd)
return &FutureGetGenerateResult{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// GetGenerate returns true if the server is set to mine, otherwise false.
Expand All @@ -102,12 +102,12 @@ func (c *Client) GetGenerate(ctx context.Context) (bool, error) {

// FutureSetGenerateResult is a future promise to deliver the result of a
// SetGenerateAsync RPC invocation (or an applicable error).
type FutureSetGenerateResult chan *response
type FutureSetGenerateResult cmdRes

// Receive waits for the response promised by the future and returns an error if
// any occurred when setting the server to generate coins (mine) or not.
func (r FutureSetGenerateResult) Receive() error {
_, err := receiveFuture(r)
func (r *FutureSetGenerateResult) Receive() error {
_, err := receiveFuture(r.ctx, r.c)
return err
}

Expand All @@ -116,9 +116,9 @@ func (r FutureSetGenerateResult) Receive() error {
// returned instance.
//
// See SetGenerate for the blocking version and more details.
func (c *Client) SetGenerateAsync(ctx context.Context, enable bool, numCPUs int) FutureSetGenerateResult {
func (c *Client) SetGenerateAsync(ctx context.Context, enable bool, numCPUs int) *FutureSetGenerateResult {
cmd := chainjson.NewSetGenerateCmd(enable, &numCPUs)
return c.sendCmd(ctx, cmd)
return &FutureSetGenerateResult{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// SetGenerate sets the server to generate coins (mine) or not.
Expand All @@ -128,13 +128,13 @@ func (c *Client) SetGenerate(ctx context.Context, enable bool, numCPUs int) erro

// FutureGetHashesPerSecResult is a future promise to deliver the result of a
// GetHashesPerSecAsync RPC invocation (or an applicable error).
type FutureGetHashesPerSecResult chan *response
type FutureGetHashesPerSecResult cmdRes

// Receive waits for the response promised by the future and returns a recent
// hashes per second performance measurement while generating coins (mining).
// Zero is returned if the server is not mining.
func (r FutureGetHashesPerSecResult) Receive() (int64, error) {
res, err := receiveFuture(r)
func (r *FutureGetHashesPerSecResult) Receive() (int64, error) {
res, err := receiveFuture(r.ctx, r.c)
if err != nil {
return -1, err
}
Expand All @@ -154,9 +154,9 @@ func (r FutureGetHashesPerSecResult) Receive() (int64, error) {
// the returned instance.
//
// See GetHashesPerSec for the blocking version and more details.
func (c *Client) GetHashesPerSecAsync(ctx context.Context) FutureGetHashesPerSecResult {
func (c *Client) GetHashesPerSecAsync(ctx context.Context) *FutureGetHashesPerSecResult {
cmd := chainjson.NewGetHashesPerSecCmd()
return c.sendCmd(ctx, cmd)
return &FutureGetHashesPerSecResult{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// GetHashesPerSec returns a recent hashes per second performance measurement
Expand All @@ -168,12 +168,12 @@ func (c *Client) GetHashesPerSec(ctx context.Context) (int64, error) {

// FutureGetMiningInfoResult is a future promise to deliver the result of a
// GetMiningInfoAsync RPC invocation (or an applicable error).
type FutureGetMiningInfoResult chan *response
type FutureGetMiningInfoResult cmdRes

// Receive waits for the response promised by the future and returns the mining
// information.
func (r FutureGetMiningInfoResult) Receive() (*chainjson.GetMiningInfoResult, error) {
res, err := receiveFuture(r)
func (r *FutureGetMiningInfoResult) Receive() (*chainjson.GetMiningInfoResult, error) {
res, err := receiveFuture(r.ctx, r.c)
if err != nil {
return nil, err
}
Expand All @@ -193,9 +193,9 @@ func (r FutureGetMiningInfoResult) Receive() (*chainjson.GetMiningInfoResult, er
// the returned instance.
//
// See GetMiningInfo for the blocking version and more details.
func (c *Client) GetMiningInfoAsync(ctx context.Context) FutureGetMiningInfoResult {
func (c *Client) GetMiningInfoAsync(ctx context.Context) *FutureGetMiningInfoResult {
cmd := chainjson.NewGetMiningInfoCmd()
return c.sendCmd(ctx, cmd)
return &FutureGetMiningInfoResult{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// GetMiningInfo returns mining information.
Expand All @@ -205,13 +205,13 @@ func (c *Client) GetMiningInfo(ctx context.Context) (*chainjson.GetMiningInfoRes

// FutureGetNetworkHashPS is a future promise to deliver the result of a
// GetNetworkHashPSAsync RPC invocation (or an applicable error).
type FutureGetNetworkHashPS chan *response
type FutureGetNetworkHashPS cmdRes

// Receive waits for the response promised by the future and returns the
// estimated network hashes per second for the block heights provided by the
// parameters.
func (r FutureGetNetworkHashPS) Receive() (int64, error) {
res, err := receiveFuture(r)
func (r *FutureGetNetworkHashPS) Receive() (int64, error) {
res, err := receiveFuture(r.ctx, r.c)
if err != nil {
return -1, err
}
Expand All @@ -231,9 +231,9 @@ func (r FutureGetNetworkHashPS) Receive() (int64, error) {
// the returned instance.
//
// See GetNetworkHashPS for the blocking version and more details.
func (c *Client) GetNetworkHashPSAsync(ctx context.Context) FutureGetNetworkHashPS {
func (c *Client) GetNetworkHashPSAsync(ctx context.Context) *FutureGetNetworkHashPS {
cmd := chainjson.NewGetNetworkHashPSCmd(nil, nil)
return c.sendCmd(ctx, cmd)
return &FutureGetNetworkHashPS{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// GetNetworkHashPS returns the estimated network hashes per second using the
Expand All @@ -250,9 +250,9 @@ func (c *Client) GetNetworkHashPS(ctx context.Context) (int64, error) {
// the returned instance.
//
// See GetNetworkHashPS2 for the blocking version and more details.
func (c *Client) GetNetworkHashPS2Async(ctx context.Context, blocks int) FutureGetNetworkHashPS {
func (c *Client) GetNetworkHashPS2Async(ctx context.Context, blocks int) *FutureGetNetworkHashPS {
cmd := chainjson.NewGetNetworkHashPSCmd(&blocks, nil)
return c.sendCmd(ctx, cmd)
return &FutureGetNetworkHashPS{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// GetNetworkHashPS2 returns the estimated network hashes per second for the
Expand All @@ -271,9 +271,9 @@ func (c *Client) GetNetworkHashPS2(ctx context.Context, blocks int) (int64, erro
// the returned instance.
//
// See GetNetworkHashPS3 for the blocking version and more details.
func (c *Client) GetNetworkHashPS3Async(ctx context.Context, blocks, height int) FutureGetNetworkHashPS {
func (c *Client) GetNetworkHashPS3Async(ctx context.Context, blocks, height int) *FutureGetNetworkHashPS {
cmd := chainjson.NewGetNetworkHashPSCmd(&blocks, &height)
return c.sendCmd(ctx, cmd)
return &FutureGetNetworkHashPS{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// GetNetworkHashPS3 returns the estimated network hashes per second for the
Expand All @@ -288,12 +288,12 @@ func (c *Client) GetNetworkHashPS3(ctx context.Context, blocks, height int) (int

// FutureGetWork is a future promise to deliver the result of a
// GetWorkAsync RPC invocation (or an applicable error).
type FutureGetWork chan *response
type FutureGetWork cmdRes

// Receive waits for the response promised by the future and returns the hash
// data to work on.
func (r FutureGetWork) Receive() (*chainjson.GetWorkResult, error) {
res, err := receiveFuture(r)
func (r *FutureGetWork) Receive() (*chainjson.GetWorkResult, error) {
res, err := receiveFuture(r.ctx, r.c)
if err != nil {
return nil, err
}
Expand All @@ -313,9 +313,9 @@ func (r FutureGetWork) Receive() (*chainjson.GetWorkResult, error) {
// returned instance.
//
// See GetWork for the blocking version and more details.
func (c *Client) GetWorkAsync(ctx context.Context) FutureGetWork {
func (c *Client) GetWorkAsync(ctx context.Context) *FutureGetWork {
cmd := chainjson.NewGetWorkCmd(nil)
return c.sendCmd(ctx, cmd)
return &FutureGetWork{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// GetWork returns hash data to work on.
Expand All @@ -327,12 +327,12 @@ func (c *Client) GetWork(ctx context.Context) (*chainjson.GetWorkResult, error)

// FutureGetWorkSubmit is a future promise to deliver the result of a
// GetWorkSubmitAsync RPC invocation (or an applicable error).
type FutureGetWorkSubmit chan *response
type FutureGetWorkSubmit cmdRes

// Receive waits for the response promised by the future and returns whether
// or not the submitted block header was accepted.
func (r FutureGetWorkSubmit) Receive() (bool, error) {
res, err := receiveFuture(r)
func (r *FutureGetWorkSubmit) Receive() (bool, error) {
res, err := receiveFuture(r.ctx, r.c)
if err != nil {
return false, err
}
Expand All @@ -352,9 +352,9 @@ func (r FutureGetWorkSubmit) Receive() (bool, error) {
// returned instance.
//
// See GetWorkSubmit for the blocking version and more details.
func (c *Client) GetWorkSubmitAsync(ctx context.Context, data string) FutureGetWorkSubmit {
func (c *Client) GetWorkSubmitAsync(ctx context.Context, data string) *FutureGetWorkSubmit {
cmd := chainjson.NewGetWorkCmd(&data)
return c.sendCmd(ctx, cmd)
return &FutureGetWorkSubmit{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// GetWorkSubmit submits a block header which is a solution to previously
Expand All @@ -367,12 +367,12 @@ func (c *Client) GetWorkSubmit(ctx context.Context, data string) (bool, error) {

// FutureSubmitBlockResult is a future promise to deliver the result of a
// SubmitBlockAsync RPC invocation (or an applicable error).
type FutureSubmitBlockResult chan *response
type FutureSubmitBlockResult cmdRes

// Receive waits for the response promised by the future and returns an error if
// any occurred when submitting the block.
func (r FutureSubmitBlockResult) Receive() error {
res, err := receiveFuture(r)
func (r *FutureSubmitBlockResult) Receive() error {
res, err := receiveFuture(r.ctx, r.c)
if err != nil {
return err
}
Expand All @@ -396,19 +396,19 @@ func (r FutureSubmitBlockResult) Receive() error {
// returned instance.
//
// See SubmitBlock for the blocking version and more details.
func (c *Client) SubmitBlockAsync(ctx context.Context, block *dcrutil.Block, options *chainjson.SubmitBlockOptions) FutureSubmitBlockResult {
func (c *Client) SubmitBlockAsync(ctx context.Context, block *dcrutil.Block, options *chainjson.SubmitBlockOptions) *FutureSubmitBlockResult {
blockHex := ""
if block != nil {
blockBytes, err := block.Bytes()
if err != nil {
return newFutureError(err)
return &FutureSubmitBlockResult{ctx: ctx, c: newFutureError(err)}
}

blockHex = hex.EncodeToString(blockBytes)
}

cmd := chainjson.NewSubmitBlockCmd(blockHex, options)
return c.sendCmd(ctx, cmd)
return &FutureSubmitBlockResult{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// SubmitBlock attempts to submit a new block into the Decred network.
Expand All @@ -418,11 +418,11 @@ func (c *Client) SubmitBlock(ctx context.Context, block *dcrutil.Block, options

// FutureRegenTemplateResult is a future promise to deliver the result of a
// RegenTemplate RPC invocation (or an applicable error).
type FutureRegenTemplateResult chan *response
type FutureRegenTemplateResult cmdRes

// Receive waits for the response and returns an error if any has occurred.
func (r FutureRegenTemplateResult) Receive() error {
_, err := receiveFuture(r)
func (r *FutureRegenTemplateResult) Receive() error {
_, err := receiveFuture(r.ctx, r.c)
return err
}

Expand All @@ -431,9 +431,9 @@ func (r FutureRegenTemplateResult) Receive() error {
// the returned instance.
//
// See RegenTemplate for the blocking version and more details.
func (c *Client) RegenTemplateAsync(ctx context.Context) FutureRegenTemplateResult {
func (c *Client) RegenTemplateAsync(ctx context.Context) *FutureRegenTemplateResult {
cmd := chainjson.NewRegenTemplateCmd()
return c.sendCmd(ctx, cmd)
return &FutureRegenTemplateResult{ctx: ctx, c: c.sendCmd(ctx, cmd)}
}

// RegenTemplate asks the node to regenerate its current block template. Note
Expand Down
Loading

0 comments on commit 0d483f5

Please sign in to comment.