Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not block startup to check chain head #101

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion config.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|maxConcurrentRequests|Maximum of concurrent requests to be submitted to the blockchain|`int`|`50`
|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|passthroughHeadersEnabled|Enable passing through the set of allowed HTTP request headers|`boolean`|`false`
|requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
Expand Down Expand Up @@ -138,7 +139,36 @@

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|port|An HTTP port on which to enable the go debugger|`int`|`-1`
|address|Listener address|`int`|`127.0.0.1`
|enabled|Whether the debug HTTP endpoint is enabled|`boolean`|`true`
|port|An HTTP port on which to enable the go debugger|`int`|`0`
|publicURL|Externally available URL for the HTTP endpoint|`string`|`<nil>`
|readTimeout|HTTP server read timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`
|shutdownTimeout|HTTP server shutdown timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10s`
|writeTimeout|HTTP server write timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`

## debug.auth

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|type|The auth plugin to use for server side authentication of requests|`string`|`<nil>`

## debug.auth.basic

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|passwordfile|The path to a .htpasswd file to use for authenticating requests. Passwords should be hashed with bcrypt.|`string`|`<nil>`

## debug.tls

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|caFile|The path to the CA file for TLS on this API|`string`|`<nil>`
|certFile|The path to the certificate file for TLS on this API|`string`|`<nil>`
|clientAuth|Enables or disables client auth for TLS on this API|`string`|`<nil>`
|enabled|Enables or disables TLS on this API|`boolean`|`false`
|keyFile|The path to the private key file for TLS on this API|`string`|`<nil>`
|requiredDNAttributes|A set of required subject DN attributes. Each entry is a regular expression, and the subject certificate must have a matching attribute of the specified type (CN, C, O, OU, ST, L, STREET, POSTALCODE, SERIALNUMBER are valid attributes)|`map[string]string`|`<nil>`

## eventstreams

Expand Down Expand Up @@ -297,6 +327,7 @@
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|method|Deprecated: Please use 'transactions.handler.simple.gasOracle.method' instead|`string`|`GET`
|mode|Deprecated: Please use 'transactions.handler.simple.gasOracle.mode' instead|'connector', 'restapi', 'fixed', or 'disabled'|`connector`
Expand Down Expand Up @@ -385,6 +416,7 @@
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|method|The HTTP Method to use when invoking the Gas Oracle REST API|`string`|`GET`
|mode|The gas oracle mode|'connector', 'restapi', 'fixed', or 'disabled'|`connector`
Expand Down Expand Up @@ -445,6 +477,7 @@
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|passthroughHeadersEnabled|Enable passing through the set of allowed HTTP request headers|`boolean`|`false`
|requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ go 1.19

require (
github.com/hashicorp/golang-lru v0.5.4
github.com/hyperledger/firefly-common v1.2.18
github.com/hyperledger/firefly-signer v1.1.9
github.com/hyperledger/firefly-transaction-manager v1.3.3
github.com/hyperledger/firefly-common v1.3.0
github.com/hyperledger/firefly-signer v1.1.10-0.20230928181245-a5e84e149fdb
github.com/hyperledger/firefly-transaction-manager v1.3.4
github.com/sirupsen/logrus v1.9.2
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.1
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huandu/xstrings v1.3.1 h1:4jgBlKK6tLKFvO8u5pmYjG91cqytmDCDvGh7ECVFfFs=
github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/hyperledger/firefly-common v1.2.18 h1:oMxmhVsVhitoEeZXJPVIM10RxwK0Z33GeR+VPXxULms=
github.com/hyperledger/firefly-common v1.2.18/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM=
github.com/hyperledger/firefly-signer v1.1.9 h1:Tx1iPTOLTpdFOLtkKFUWpjNsL/+oswnAaU4CTLMDw4Q=
github.com/hyperledger/firefly-signer v1.1.9/go.mod h1:vNbbROziwqkOmO0b+9ky3devjcFg0JIkR2M1KG7seTQ=
github.com/hyperledger/firefly-transaction-manager v1.3.3 h1:jOBlFljFgz/pn8g1DlSGMkuJyGmf/Scsu3SDCqIJlZ8=
github.com/hyperledger/firefly-transaction-manager v1.3.3/go.mod h1:Bbp4hDoOFbu463pTfyFY/MPDWyTq89uAFk4OcJS7UXY=
github.com/hyperledger/firefly-common v1.3.0 h1:eLFUJuPU8E5iZXYGHlXghQuN+opWG/qp7zvMKavKEPU=
github.com/hyperledger/firefly-common v1.3.0/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM=
github.com/hyperledger/firefly-signer v1.1.10-0.20230928181245-a5e84e149fdb h1:0tlOFV8x9NBYvHUfJp5dwS9cZdwqRXuipGLOreh67Gw=
github.com/hyperledger/firefly-signer v1.1.10-0.20230928181245-a5e84e149fdb/go.mod h1:0w6HjamOI21i9oGYDzW5p1A2ijLenM+liciGRzLhV5w=
github.com/hyperledger/firefly-transaction-manager v1.3.4 h1:L3KNuyVdOpw+wgS44gUBs+5dh3vxL921h1rlKGZFz6s=
github.com/hyperledger/firefly-transaction-manager v1.3.4/go.mod h1:Bbp4hDoOFbu463pTfyFY/MPDWyTq89uAFk4OcJS7UXY=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
Expand Down
8 changes: 5 additions & 3 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -404,7 +404,7 @@ func (bl *blockListener) addConsumer(c *blockUpdateConsumer) {
bl.consumers[*c.id] = c
}

func (bl *blockListener) getHighestBlock(ctx context.Context) int64 {
func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
bl.mux.Lock()
bl.checkStartedLocked()
highestBlock := bl.highestBlock
Expand All @@ -414,13 +414,15 @@ func (bl *blockListener) getHighestBlock(ctx context.Context) int64 {
select {
case <-bl.initialBlockHeightObtained:
case <-ctx.Done():
// Inform caller we timed out, or were closed
return -1, false
}
}
bl.mux.Lock()
highestBlock = bl.highestBlock
bl.mux.Unlock()
log.L(ctx).Debugf("ChainHead=%d", highestBlock)
return highestBlock
return highestBlock, true
}

func (bl *blockListener) waitClosed() {
Expand Down
8 changes: 6 additions & 2 deletions internal/ethereum/blocklistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func TestBlockListenerStartGettingHighestBlockRetry(t *testing.T) {
*hbh = *ethtypes.NewHexInteger64(12345)
})

assert.Equal(t, int64(12345), bl.getHighestBlock(bl.ctx))
h, ok := bl.getHighestBlock(bl.ctx)
assert.Equal(t, int64(12345), h)
assert.True(t, ok)
done() // Stop immediately in this case, while we're in the polling interval

<-bl.listenLoopDone
Expand All @@ -59,7 +61,9 @@ func TestBlockListenerStartGettingHighestBlockFailBeforeStop(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").
Return(&rpcbackend.RPCError{Message: "pop"}).Maybe()

assert.Equal(t, int64(-1), bl.getHighestBlock(bl.ctx))
h, ok := bl.getHighestBlock(bl.ctx)
assert.False(t, ok)
assert.Equal(t, int64(-1), h)

<-bl.listenLoopDone

Expand Down
26 changes: 8 additions & 18 deletions internal/ethereum/event_actions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -44,30 +44,20 @@ func (c *ethConnector) EventStreamStart(ctx context.Context, req *ffcapi.EventSt
streamLoopDone: make(chan struct{}),
}

chainHead := c.blockListener.getHighestBlock(ctx)
for _, lReq := range req.InitialListeners {
l, err := es.addEventListener(ctx, lReq)
// We add all the initial event listeners, checking for errors, before kicking off the streamLoop().
for _, il := range req.InitialListeners {
// Add, but do NOT start, the listener.
// We do pre-startup processing on this special set in streamLoop() after we've established
// the chain head, then start them all after that.
_, err := es.addEventListener(ctx, il)
if err != nil {
return nil, "", err
}
// During initial start we move the "head" block forwards to be the highest of all the initial streams
if l.hwmBlock > es.headBlock {
if l.hwmBlock > chainHead {
es.headBlock = chainHead
} else {
es.headBlock = l.hwmBlock
}
}
}

// From this point we consider ourselves started
// From this point we consider ourselves as in started state, but we might not actually be ready
c.eventStreams[*req.ID] = es

// Start all the listeners
for _, l := range es.listeners {
es.startEventListener(l)
}

// Start the listener head routine, which reads events for all listeners that are not in catchup mode
go es.streamLoop()

Expand Down
4 changes: 2 additions & 2 deletions internal/ethereum/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func (cp *listenerCheckpoint) LessThan(b ffcapi.EventListenerCheckpoint) bool {
func (l *listener) getInitialBlock(ctx context.Context, fromBlockInstruction string) (int64, error) {
if fromBlockInstruction == ffcapi.FromBlockLatest || fromBlockInstruction == "" {
// Get the latest block number of the chain
chainHead := l.c.blockListener.getHighestBlock(ctx)
if chainHead < 0 {
chainHead, ok := l.c.blockListener.getHighestBlock(ctx)
if !ok {
return -1, i18n.NewError(ctx, msgs.MsgTimedOutQueryingChainHead)
}
return chainHead, nil
Expand Down
37 changes: 34 additions & 3 deletions internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ func (es *eventStream) leadGroupCatchup() bool {
return true
}

chainHeadBlock := es.c.blockListener.getHighestBlock(es.ctx)
chainHeadBlock, ok := es.c.blockListener.getHighestBlock(es.ctx)
if !ok {
log.L(es.ctx).Debugf("Stream catchup exiting (closed checking block height)")
return true
}

// Build the aggregated listener list (doesn't matter if it's changed, as we build the list each time)
_ = es.buildReuseLeadGroupListener(&lastUpdate, &ag)
Expand Down Expand Up @@ -311,7 +315,8 @@ func (es *eventStream) leadGroupSteadyState() bool {

// High water mark is a point safely behind the head of the chain in this case,
// where re-orgs are not expected.
hwmBlock := es.c.blockListener.getHighestBlock(es.ctx) - es.c.checkpointBlockGap
bh, _ := es.c.blockListener.getHighestBlock(es.ctx) /* note we know we're initialized here and will not block */
hwmBlock := bh - es.c.checkpointBlockGap
if hwmBlock < 0 {
hwmBlock = 0
}
Expand All @@ -333,7 +338,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
}

// Check we're not outside of the steady state window, and need to fall back to catchup mode
chainHeadBlock := es.c.blockListener.getHighestBlock(es.ctx)
chainHeadBlock, _ := es.c.blockListener.getHighestBlock(es.ctx) /* note we know we're initialized here and will not block */
blockGapEstimate := (chainHeadBlock - fromBlock)
if blockGapEstimate > es.c.catchupThreshold {
log.L(es.ctx).Warnf("Block gap estimate reached %d (above threshold of %d) - reverting to catchup mode", blockGapEstimate, es.c.catchupThreshold)
Expand Down Expand Up @@ -405,9 +410,35 @@ func (es *eventStream) leadGroupSteadyState() bool {
}
}

func (es *eventStream) preStartProcessing() {
ctx := es.ctx
chainHead, ok := es.c.blockListener.getHighestBlock(ctx)
if !ok {
log.L(ctx).Warnf("Event stream closed before establishing block height")
return
}
for _, l := range es.listeners {
// During initial start we move the "head" block forwards to be the highest of all the initial streams
if l.hwmBlock > es.headBlock {
if l.hwmBlock > chainHead {
es.headBlock = chainHead
} else {
es.headBlock = l.hwmBlock
}
}
}

// Now we've done that, we can start all the listeners
for _, l := range es.listeners {
es.startEventListener(l)
}
}

func (es *eventStream) streamLoop() {
defer close(es.streamLoopDone)

es.preStartProcessing()

for {
// When we first start, we might find our leading pack of listeners are all way behind
// the head of the chain. So we run a catchup mode loop to ensure we don't ask the blockchain
Expand Down
3 changes: 3 additions & 0 deletions internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func testEventStreamExistingConnector(t *testing.T, ctx context.Context, done fu
es.c.eventFilterPollingInterval = 1 * time.Millisecond
es.c.retry.MaximumDelay = 1 * time.Microsecond
assert.NotNil(t, es)

es.preStartProcessing()

return es, events, mRPC, func() {
done()
_, _, err := c.EventStreamStopped(ctx, &ffcapi.EventStreamStoppedRequest{
Expand Down