Skip to content

Commit

Permalink
Add new Ethereum contract config
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed May 16, 2022
1 parent 3cf5f35 commit 56d1fe6
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 137 deletions.
11 changes: 9 additions & 2 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ nav_order: 3
|batchTimeout|How long Ethconnect should wait for new events to arrive and fill a batch, before sending the batch to FireFly core. Only applies when automatically creating a new event stream.|[`time.Duration`](https://pkg.go.dev/time#Duration)|`500`
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|fromBlock|The first event this FireFly instance should listen to from the BatchPin smart contract. Default=0. Only affects initial creation of the event stream|Address `string`|`0`
|fromBlock|The first event this FireFly instance should listen to from the BatchPin smart contract. Default=0. Only affects initial creation of the event stream (deprecated - use fireflyContract[].fromBlock)|Address `string`|`0`
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|instance|The Ethereum address of the FireFly BatchPin smart contract that has been deployed to the blockchain|Address `string`|`<nil>`
|instance|The Ethereum address of the FireFly BatchPin smart contract that has been deployed to the blockchain (deprecated - use fireflyContract[].address)|Address `string`|`<nil>`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|prefixLong|The prefix that will be used for Ethconnect specific HTTP headers when FireFly makes requests to Ethconnect|`string`|`firefly`
|prefixShort|The prefix that will be used for Ethconnect specific query parameters when FireFly makes requests to Ethconnect|`string`|`fly`
Expand Down Expand Up @@ -243,6 +243,13 @@ nav_order: 3
|initWaitTime|The initial retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|maxWaitTime|The maximum retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`

## blockchain.ethereum.fireflyContract[]

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|address|The Ethereum address of the FireFly BatchPin smart contract that has been deployed to the blockchain|Address `string`|`<nil>`
|fromBlock|The first event this FireFly instance should listen to from the BatchPin smart contract. Default=0. Only affects initial creation of the event stream|Address `string`|`<nil>`

## blockchain.fabric.fabconnect

|Key|Description|Type|Default Value|
Expand Down
25 changes: 18 additions & 7 deletions internal/blockchain/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ const (
)

const (
// EthconnectConfigKey is a sub-key in the config to contain all the ethconnect specific config,
// EthconnectConfigKey is a sub-key in the config to contain all the ethconnect specific config
EthconnectConfigKey = "ethconnect"
// EthconnectConfigInstancePath is the ethereum address of the contract
EthconnectConfigInstancePath = "instance"
// EthconnectConfigTopic is the websocket listen topic that the node should register on, which is important if there are multiple
// nodes using a single ethconnect
EthconnectConfigTopic = "topic"
Expand All @@ -51,8 +49,17 @@ const (
EthconnectPrefixShort = "prefixShort"
// EthconnectPrefixLong is used in HTTP headers in requests to ethconnect
EthconnectPrefixLong = "prefixLong"
// EthconnectConfigFromBlock is the configuration of the first block to listen to when creating the listener for the FireFly contract
EthconnectConfigFromBlock = "fromBlock"
// EthconnectConfigInstanceDeprecated is the ethereum address of the FireFly contract
EthconnectConfigInstanceDeprecated = "instance"
// EthconnectConfigFromBlockDeprecated is the configuration of the first block to listen to when creating the listener for the FireFly contract
EthconnectConfigFromBlockDeprecated = "fromBlock"

// FireFlyContractConfigKey is a sub-key in the config to contain the info on the deployed FireFly contract
FireFlyContractConfigKey = "fireflyContract"
// FireFlyContractAddress is the ethereum address of the FireFly contract
FireFlyContractAddress = "address"
// FireFlyContractFromBlock is the configuration of the first block to listen to when creating the listener
FireFlyContractFromBlock = "fromBlock"

// AddressResolverConfigKey is a sub-key in the config to contain an address resolver config.
AddressResolverConfigKey = "addressResolver"
Expand All @@ -78,13 +85,17 @@ const (
func (e *Ethereum) InitConfig(config config.Section) {
ethconnectConf := config.SubSection(EthconnectConfigKey)
wsclient.InitConfig(ethconnectConf)
ethconnectConf.AddKnownKey(EthconnectConfigInstancePath)
ethconnectConf.AddKnownKey(EthconnectConfigTopic)
ethconnectConf.AddKnownKey(EthconnectConfigBatchSize, defaultBatchSize)
ethconnectConf.AddKnownKey(EthconnectConfigBatchTimeout, defaultBatchTimeout)
ethconnectConf.AddKnownKey(EthconnectPrefixShort, defaultPrefixShort)
ethconnectConf.AddKnownKey(EthconnectPrefixLong, defaultPrefixLong)
ethconnectConf.AddKnownKey(EthconnectConfigFromBlock, defaultFromBlock)
ethconnectConf.AddKnownKey(EthconnectConfigInstanceDeprecated)
ethconnectConf.AddKnownKey(EthconnectConfigFromBlockDeprecated, defaultFromBlock)

e.contractConf = config.SubArray(FireFlyContractConfigKey)
e.contractConf.AddKnownKey(FireFlyContractAddress)
e.contractConf.AddKnownKey(FireFlyContractFromBlock)

fftmConf := config.SubSection(FFTMConfigKey)
ffresty.InitConfig(fftmConf)
Expand Down
79 changes: 49 additions & 30 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,25 @@ const (
)

type Ethereum struct {
ctx context.Context
topic string
instancePath string
prefixShort string
prefixLong string
capabilities *blockchain.Capabilities
callbacks blockchain.Callbacks
client *resty.Client
fftmClient *resty.Client
streams *streamManager
initInfo struct {
ctx context.Context
topic string
contractAddress string
prefixShort string
prefixLong string
capabilities *blockchain.Capabilities
callbacks blockchain.Callbacks
client *resty.Client
fftmClient *resty.Client
streams *streamManager
initInfo struct {
stream *eventStream
sub *subscription
}
wsconn wsclient.WSClient
closed chan struct{}
addressResolver *addressResolver
metrics metrics.Manager
contractConf config.ArraySection
}

type eventStreamWebsocket struct {
Expand Down Expand Up @@ -156,7 +157,8 @@ func (e *Ethereum) VerifierType() core.VerifierType {
return core.VerifierTypeEthAddress
}

func (e *Ethereum) Init(ctx context.Context, config config.Section, callbacks blockchain.Callbacks, metrics metrics.Manager) (err error) {
func (e *Ethereum) Init(ctx context.Context, config config.Section, callbacks blockchain.Callbacks, metrics metrics.Manager, contractIndex int) (err error) {
e.InitConfig(config)
ethconnectConf := config.SubSection(EthconnectConfigKey)
addressResolverConf := config.SubSection(AddressResolverConfigKey)
fftmConf := config.SubSection(FFTMConfigKey)
Expand All @@ -172,7 +174,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, callbacks bl
}

if ethconnectConf.GetString(ffresty.HTTPConfigURL) == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "blockchain.ethconnect")
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "blockchain.ethereum.ethconnect")
}

e.client = ffresty.New(e.ctx, ethconnectConf)
Expand All @@ -185,30 +187,51 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, callbacks bl
GlobalSequencer: true,
}

e.instancePath = ethconnectConf.GetString(EthconnectConfigInstancePath)
if e.instancePath == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "instance", "blockchain.ethconnect")
e.streams = &streamManager{client: e.client}
if e.contractConf.ArraySize() > contractIndex {
// New config (array of contracts)
e.contractAddress = e.contractConf.ArrayEntry(contractIndex).GetString(FireFlyContractAddress)
if e.contractAddress == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "address", "blockchain.fireflyContract")
}
e.streams.fireFlySubscriptionFromBlock = e.contractConf.ArrayEntry(contractIndex).GetString(FireFlyContractFromBlock)
} else {
// Old config (attributes under "ethconnect")
e.contractAddress = ethconnectConf.GetString(EthconnectConfigInstanceDeprecated)
if e.contractAddress != "" {
log.L(ctx).Warnf("The %s.%s config key has been deprecated. Please use %s.%s instead",
EthconnectConfigKey, EthconnectConfigInstanceDeprecated,
FireFlyContractConfigKey, FireFlyContractAddress)
} else {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "instance", "blockchain.ethereum.ethconnect")
}
e.streams.fireFlySubscriptionFromBlock = ethconnectConf.GetString(EthconnectConfigFromBlockDeprecated)
if e.streams.fireFlySubscriptionFromBlock != "" {
log.L(ctx).Warnf("The %s.%s config key has been deprecated. Please use %s.%s instead",
EthconnectConfigKey, EthconnectConfigFromBlockDeprecated,
FireFlyContractConfigKey, FireFlyContractFromBlock)
}
}

// Backwards compatibility from when instance path was not a contract address
if strings.HasPrefix(strings.ToLower(e.instancePath), "/contracts/") {
address, err := e.getContractAddress(ctx, e.instancePath)
if strings.HasPrefix(strings.ToLower(e.contractAddress), "/contracts/") {
address, err := e.getContractAddress(ctx, e.contractAddress)
if err != nil {
return err
}
e.instancePath = address
} else if strings.HasPrefix(e.instancePath, "/instances/") {
e.instancePath = strings.Replace(e.instancePath, "/instances/", "", 1)
e.contractAddress = address
} else if strings.HasPrefix(e.contractAddress, "/instances/") {
e.contractAddress = strings.Replace(e.contractAddress, "/instances/", "", 1)
}

// Ethconnect needs the "0x" prefix in some cases
if !strings.HasPrefix(e.instancePath, "0x") {
e.instancePath = fmt.Sprintf("0x%s", e.instancePath)
if !strings.HasPrefix(e.contractAddress, "0x") {
e.contractAddress = fmt.Sprintf("0x%s", e.contractAddress)
}

e.topic = ethconnectConf.GetString(EthconnectConfigTopic)
if e.topic == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", "blockchain.ethconnect")
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", "blockchain.ethereum.ethconnect")
}

e.prefixShort = ethconnectConf.GetString(EthconnectPrefixShort)
Expand All @@ -225,17 +248,13 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, callbacks bl
return err
}

e.streams = &streamManager{
client: e.client,
fireFlySubscriptionFromBlock: ethconnectConf.GetString(EthconnectConfigFromBlock),
}
batchSize := ethconnectConf.GetUint(EthconnectConfigBatchSize)
batchTimeout := uint(ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds())
if e.initInfo.stream, err = e.streams.ensureEventStream(e.ctx, e.topic, batchSize, batchTimeout); err != nil {
return err
}
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.initInfo.stream.ID, e.topic)
if e.initInfo.sub, err = e.streams.ensureFireFlySubscription(e.ctx, e.instancePath, e.initInfo.stream.ID, batchPinEventABI); err != nil {
if e.initInfo.sub, err = e.streams.ensureFireFlySubscription(e.ctx, e.contractAddress, e.initInfo.stream.ID, batchPinEventABI); err != nil {
return err
}

Expand Down Expand Up @@ -602,7 +621,7 @@ func (e *Ethereum) SubmitBatchPin(ctx context.Context, operationID *fftypes.UUID
batch.BatchPayloadRef,
ethHashes,
}
return e.invokeContractMethod(ctx, e.instancePath, signingKey, batchPinMethodABI, operationID.String(), input)
return e.invokeContractMethod(ctx, e.contractAddress, signingKey, batchPinMethodABI, operationID.String(), input)
}

func (e *Ethereum) InvokeContract(ctx context.Context, operationID *fftypes.UUID, signingKey string, location *fftypes.JSONAny, method *core.FFIMethod, input map[string]interface{}) error {
Expand Down
Loading

0 comments on commit 56d1fe6

Please sign in to comment.