From b112fc8db2212ddbcb641d85f309db103bff75d6 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Thu, 28 Sep 2023 14:15:02 -0400 Subject: [PATCH] Do not block startup to check chain head Signed-off-by: Peter Broadhurst --- config.md | 35 ++++++++++++++++++++++- go.mod | 6 ++-- go.sum | 12 ++++---- internal/ethereum/blocklistener.go | 8 ++++-- internal/ethereum/blocklistener_test.go | 8 ++++-- internal/ethereum/event_actions.go | 26 ++++++----------- internal/ethereum/event_listener.go | 4 +-- internal/ethereum/event_stream.go | 37 +++++++++++++++++++++++-- internal/ethereum/event_stream_test.go | 3 ++ 9 files changed, 101 insertions(+), 38 deletions(-) diff --git a/config.md b/config.md index ba4f845..d850557 100644 --- a/config.md +++ b/config.md @@ -69,6 +69,7 @@ |headers|Adds custom headers to HTTP requests|`map[string]string`|`` |idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms` |maxConcurrentRequests|Maximum of concurrent requests to be submitted to the blockchain|`int`|`50` +|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0` |maxIdleConns|The max number of idle connections to hold pooled|`int`|`100` |passthroughHeadersEnabled|Enable passing through the set of allowed HTTP request headers|`boolean`|`false` |requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` @@ -138,7 +139,36 @@ |Key|Description|Type|Default Value| |---|-----------|----|-------------| -|port|An HTTP port on which to enable the go debugger|`int`|`-1` +|address|Listener address|`int`|`127.0.0.1` +|enabled|Whether the debug HTTP endpoint is enabled|`boolean`|`true` +|port|An HTTP port on which to enable the go debugger|`int`|`0` +|publicURL|Externally available URL for the HTTP endpoint|`string`|`` +|readTimeout|HTTP server read timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s` +|shutdownTimeout|HTTP server shutdown timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10s` +|writeTimeout|HTTP server write timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s` + +## debug.auth + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|type|The auth plugin to use for server side authentication of requests|`string`|`` + +## debug.auth.basic + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|passwordfile|The path to a .htpasswd file to use for authenticating requests. Passwords should be hashed with bcrypt.|`string`|`` + +## debug.tls + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|caFile|The path to the CA file for TLS on this API|`string`|`` +|certFile|The path to the certificate file for TLS on this API|`string`|`` +|clientAuth|Enables or disables client auth for TLS on this API|`string`|`` +|enabled|Enables or disables TLS on this API|`boolean`|`false` +|keyFile|The path to the private key file for TLS on this API|`string`|`` +|requiredDNAttributes|A set of required subject DN attributes. Each entry is a regular expression, and the subject certificate must have a matching attribute of the specified type (CN, C, O, OU, ST, L, STREET, POSTALCODE, SERIALNUMBER are valid attributes)|`map[string]string`|`` ## eventstreams @@ -297,6 +327,7 @@ |expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s` |headers|Adds custom headers to HTTP requests|`map[string]string`|`` |idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms` +|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0` |maxIdleConns|The max number of idle connections to hold pooled|`int`|`100` |method|Deprecated: Please use 'transactions.handler.simple.gasOracle.method' instead|`string`|`GET` |mode|Deprecated: Please use 'transactions.handler.simple.gasOracle.mode' instead|'connector', 'restapi', 'fixed', or 'disabled'|`connector` @@ -385,6 +416,7 @@ |expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s` |headers|Adds custom headers to HTTP requests|`map[string]string`|`` |idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms` +|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0` |maxIdleConns|The max number of idle connections to hold pooled|`int`|`100` |method|The HTTP Method to use when invoking the Gas Oracle REST API|`string`|`GET` |mode|The gas oracle mode|'connector', 'restapi', 'fixed', or 'disabled'|`connector` @@ -445,6 +477,7 @@ |expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s` |headers|Adds custom headers to HTTP requests|`map[string]string`|`` |idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms` +|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0` |maxIdleConns|The max number of idle connections to hold pooled|`int`|`100` |passthroughHeadersEnabled|Enable passing through the set of allowed HTTP request headers|`boolean`|`false` |requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` diff --git a/go.mod b/go.mod index c86155d..397aeed 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,9 @@ go 1.19 require ( github.com/hashicorp/golang-lru v0.5.4 - github.com/hyperledger/firefly-common v1.2.18 - github.com/hyperledger/firefly-signer v1.1.9 - github.com/hyperledger/firefly-transaction-manager v1.3.3 + github.com/hyperledger/firefly-common v1.3.0 + github.com/hyperledger/firefly-signer v1.1.10-0.20230928181245-a5e84e149fdb + github.com/hyperledger/firefly-transaction-manager v1.3.4 github.com/sirupsen/logrus v1.9.2 github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.1 diff --git a/go.sum b/go.sum index bec27f0..e46f69f 100644 --- a/go.sum +++ b/go.sum @@ -216,12 +216,12 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huandu/xstrings v1.3.1 h1:4jgBlKK6tLKFvO8u5pmYjG91cqytmDCDvGh7ECVFfFs= github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= -github.com/hyperledger/firefly-common v1.2.18 h1:oMxmhVsVhitoEeZXJPVIM10RxwK0Z33GeR+VPXxULms= -github.com/hyperledger/firefly-common v1.2.18/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM= -github.com/hyperledger/firefly-signer v1.1.9 h1:Tx1iPTOLTpdFOLtkKFUWpjNsL/+oswnAaU4CTLMDw4Q= -github.com/hyperledger/firefly-signer v1.1.9/go.mod h1:vNbbROziwqkOmO0b+9ky3devjcFg0JIkR2M1KG7seTQ= -github.com/hyperledger/firefly-transaction-manager v1.3.3 h1:jOBlFljFgz/pn8g1DlSGMkuJyGmf/Scsu3SDCqIJlZ8= -github.com/hyperledger/firefly-transaction-manager v1.3.3/go.mod h1:Bbp4hDoOFbu463pTfyFY/MPDWyTq89uAFk4OcJS7UXY= +github.com/hyperledger/firefly-common v1.3.0 h1:eLFUJuPU8E5iZXYGHlXghQuN+opWG/qp7zvMKavKEPU= +github.com/hyperledger/firefly-common v1.3.0/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM= +github.com/hyperledger/firefly-signer v1.1.10-0.20230928181245-a5e84e149fdb h1:0tlOFV8x9NBYvHUfJp5dwS9cZdwqRXuipGLOreh67Gw= +github.com/hyperledger/firefly-signer v1.1.10-0.20230928181245-a5e84e149fdb/go.mod h1:0w6HjamOI21i9oGYDzW5p1A2ijLenM+liciGRzLhV5w= +github.com/hyperledger/firefly-transaction-manager v1.3.4 h1:L3KNuyVdOpw+wgS44gUBs+5dh3vxL921h1rlKGZFz6s= +github.com/hyperledger/firefly-transaction-manager v1.3.4/go.mod h1:Bbp4hDoOFbu463pTfyFY/MPDWyTq89uAFk4OcJS7UXY= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= diff --git a/internal/ethereum/blocklistener.go b/internal/ethereum/blocklistener.go index 20b4302..9dafdad 100644 --- a/internal/ethereum/blocklistener.go +++ b/internal/ethereum/blocklistener.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -404,7 +404,7 @@ func (bl *blockListener) addConsumer(c *blockUpdateConsumer) { bl.consumers[*c.id] = c } -func (bl *blockListener) getHighestBlock(ctx context.Context) int64 { +func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) { bl.mux.Lock() bl.checkStartedLocked() highestBlock := bl.highestBlock @@ -414,13 +414,15 @@ func (bl *blockListener) getHighestBlock(ctx context.Context) int64 { select { case <-bl.initialBlockHeightObtained: case <-ctx.Done(): + // Inform caller we timed out, or were closed + return -1, false } } bl.mux.Lock() highestBlock = bl.highestBlock bl.mux.Unlock() log.L(ctx).Debugf("ChainHead=%d", highestBlock) - return highestBlock + return highestBlock, true } func (bl *blockListener) waitClosed() { diff --git a/internal/ethereum/blocklistener_test.go b/internal/ethereum/blocklistener_test.go index 843a27c..08663ac 100644 --- a/internal/ethereum/blocklistener_test.go +++ b/internal/ethereum/blocklistener_test.go @@ -41,7 +41,9 @@ func TestBlockListenerStartGettingHighestBlockRetry(t *testing.T) { *hbh = *ethtypes.NewHexInteger64(12345) }) - assert.Equal(t, int64(12345), bl.getHighestBlock(bl.ctx)) + h, ok := bl.getHighestBlock(bl.ctx) + assert.Equal(t, int64(12345), h) + assert.True(t, ok) done() // Stop immediately in this case, while we're in the polling interval <-bl.listenLoopDone @@ -59,7 +61,9 @@ func TestBlockListenerStartGettingHighestBlockFailBeforeStop(t *testing.T) { mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber"). Return(&rpcbackend.RPCError{Message: "pop"}).Maybe() - assert.Equal(t, int64(-1), bl.getHighestBlock(bl.ctx)) + h, ok := bl.getHighestBlock(bl.ctx) + assert.False(t, ok) + assert.Equal(t, int64(-1), h) <-bl.listenLoopDone diff --git a/internal/ethereum/event_actions.go b/internal/ethereum/event_actions.go index 188c495..3a7df50 100644 --- a/internal/ethereum/event_actions.go +++ b/internal/ethereum/event_actions.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -44,30 +44,20 @@ func (c *ethConnector) EventStreamStart(ctx context.Context, req *ffcapi.EventSt streamLoopDone: make(chan struct{}), } - chainHead := c.blockListener.getHighestBlock(ctx) - for _, lReq := range req.InitialListeners { - l, err := es.addEventListener(ctx, lReq) + // We add all the initial event listeners, checking for errors, before kicking off the streamLoop(). + for _, il := range req.InitialListeners { + // Add, but do NOT start, the listener. + // We do pre-startup processing on this special set in streamLoop() after we've established + // the chain head, then start them all after that. + _, err := es.addEventListener(ctx, il) if err != nil { return nil, "", err } - // During initial start we move the "head" block forwards to be the highest of all the initial streams - if l.hwmBlock > es.headBlock { - if l.hwmBlock > chainHead { - es.headBlock = chainHead - } else { - es.headBlock = l.hwmBlock - } - } } - // From this point we consider ourselves started + // From this point we consider ourselves as in started state, but we might not actually be ready c.eventStreams[*req.ID] = es - // Start all the listeners - for _, l := range es.listeners { - es.startEventListener(l) - } - // Start the listener head routine, which reads events for all listeners that are not in catchup mode go es.streamLoop() diff --git a/internal/ethereum/event_listener.go b/internal/ethereum/event_listener.go index 980bc47..d340ae7 100644 --- a/internal/ethereum/event_listener.go +++ b/internal/ethereum/event_listener.go @@ -97,8 +97,8 @@ func (cp *listenerCheckpoint) LessThan(b ffcapi.EventListenerCheckpoint) bool { func (l *listener) getInitialBlock(ctx context.Context, fromBlockInstruction string) (int64, error) { if fromBlockInstruction == ffcapi.FromBlockLatest || fromBlockInstruction == "" { // Get the latest block number of the chain - chainHead := l.c.blockListener.getHighestBlock(ctx) - if chainHead < 0 { + chainHead, ok := l.c.blockListener.getHighestBlock(ctx) + if !ok { return -1, i18n.NewError(ctx, msgs.MsgTimedOutQueryingChainHead) } return chainHead, nil diff --git a/internal/ethereum/event_stream.go b/internal/ethereum/event_stream.go index 518264f..1c7b0ae 100644 --- a/internal/ethereum/event_stream.go +++ b/internal/ethereum/event_stream.go @@ -228,7 +228,11 @@ func (es *eventStream) leadGroupCatchup() bool { return true } - chainHeadBlock := es.c.blockListener.getHighestBlock(es.ctx) + chainHeadBlock, ok := es.c.blockListener.getHighestBlock(es.ctx) + if !ok { + log.L(es.ctx).Debugf("Stream catchup exiting (closed checking block height)") + return true + } // Build the aggregated listener list (doesn't matter if it's changed, as we build the list each time) _ = es.buildReuseLeadGroupListener(&lastUpdate, &ag) @@ -311,7 +315,8 @@ func (es *eventStream) leadGroupSteadyState() bool { // High water mark is a point safely behind the head of the chain in this case, // where re-orgs are not expected. - hwmBlock := es.c.blockListener.getHighestBlock(es.ctx) - es.c.checkpointBlockGap + bh, _ := es.c.blockListener.getHighestBlock(es.ctx) /* note we know we're initialized here and will not block */ + hwmBlock := bh - es.c.checkpointBlockGap if hwmBlock < 0 { hwmBlock = 0 } @@ -333,7 +338,7 @@ func (es *eventStream) leadGroupSteadyState() bool { } // Check we're not outside of the steady state window, and need to fall back to catchup mode - chainHeadBlock := es.c.blockListener.getHighestBlock(es.ctx) + chainHeadBlock, _ := es.c.blockListener.getHighestBlock(es.ctx) /* note we know we're initialized here and will not block */ blockGapEstimate := (chainHeadBlock - fromBlock) if blockGapEstimate > es.c.catchupThreshold { log.L(es.ctx).Warnf("Block gap estimate reached %d (above threshold of %d) - reverting to catchup mode", blockGapEstimate, es.c.catchupThreshold) @@ -405,9 +410,35 @@ func (es *eventStream) leadGroupSteadyState() bool { } } +func (es *eventStream) preStartProcessing() { + ctx := es.ctx + chainHead, ok := es.c.blockListener.getHighestBlock(ctx) + if !ok { + log.L(ctx).Warnf("Event stream closed before establishing block height") + return + } + for _, l := range es.listeners { + // During initial start we move the "head" block forwards to be the highest of all the initial streams + if l.hwmBlock > es.headBlock { + if l.hwmBlock > chainHead { + es.headBlock = chainHead + } else { + es.headBlock = l.hwmBlock + } + } + } + + // Now we've done that, we can start all the listeners + for _, l := range es.listeners { + es.startEventListener(l) + } +} + func (es *eventStream) streamLoop() { defer close(es.streamLoopDone) + es.preStartProcessing() + for { // When we first start, we might find our leading pack of listeners are all way behind // the head of the chain. So we run a catchup mode loop to ensure we don't ask the blockchain diff --git a/internal/ethereum/event_stream_test.go b/internal/ethereum/event_stream_test.go index 13ff6a4..53cb2a5 100644 --- a/internal/ethereum/event_stream_test.go +++ b/internal/ethereum/event_stream_test.go @@ -52,6 +52,9 @@ func testEventStreamExistingConnector(t *testing.T, ctx context.Context, done fu es.c.eventFilterPollingInterval = 1 * time.Millisecond es.c.retry.MaximumDelay = 1 * time.Microsecond assert.NotNil(t, es) + + es.preStartProcessing() + return es, events, mRPC, func() { done() _, _, err := c.EventStreamStopped(ctx, &ffcapi.EventStreamStoppedRequest{