Skip to content

Commit

Permalink
Support migration of the FireFly contract from one location to another
Browse files Browse the repository at this point in the history
Send a specially formatted "BatchPin" transaction to signal that all members should
migrate to a specific version of the contract from their config.

Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed May 17, 2022
1 parent 56d1fe6 commit e999cad
Show file tree
Hide file tree
Showing 26 changed files with 408 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE namespaces DROP COLUMN contract_index;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE namespaces ADD COLUMN contract_index INTEGER DEFAULT 0;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE namespaces DROP COLUMN contract_index;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE namespaces ADD COLUMN contract_index INTEGER DEFAULT 0;
48 changes: 48 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8136,6 +8136,10 @@ paths:
schema:
items:
properties:
contractIndex:
description: The index of the configured FireFly smart contract
currently being used
type: integer
created:
description: The time the namespace was created
format: date-time
Expand Down Expand Up @@ -8206,6 +8210,10 @@ paths:
application/json:
schema:
properties:
contractIndex:
description: The index of the configured FireFly smart contract
currently being used
type: integer
created:
description: The time the namespace was created
format: date-time
Expand Down Expand Up @@ -8241,6 +8249,10 @@ paths:
application/json:
schema:
properties:
contractIndex:
description: The index of the configured FireFly smart contract
currently being used
type: integer
created:
description: The time the namespace was created
format: date-time
Expand Down Expand Up @@ -8300,6 +8312,10 @@ paths:
application/json:
schema:
properties:
contractIndex:
description: The index of the configured FireFly smart contract
currently being used
type: integer
created:
description: The time the namespace was created
format: date-time
Expand Down Expand Up @@ -22395,6 +22411,38 @@ paths:
description: ""
tags:
- Global
/network/migrate:
post:
description: Instruct the network to migrate its set of rules
operationId: postNetworkMigrate
parameters:
- description: Server-side request timeout (millseconds, or set a custom suffix
like 10s)
in: header
name: Request-Timeout
schema:
default: 120s
type: string
requestBody:
content:
application/json:
schema:
properties:
contractIndex:
description: The index of the configured FireFly smart contract
currently being used
type: integer
type: object
responses:
"202":
content:
application/json:
schema: {}
description: Success
default:
description: ""
tags:
- Global
/network/nodes:
get:
description: Gets a list of nodes in the network
Expand Down
42 changes: 42 additions & 0 deletions internal/apiserver/route_post_network_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"net/http"

"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/oapispec"
"github.com/hyperledger/firefly/pkg/core"
)

var postNetworkMigrate = &oapispec.Route{
Name: "postNetworkMigrate",
Path: "network/migrate",
Method: http.MethodPost,
PathParams: nil,
QueryParams: nil,
FilterFactory: nil,
Description: coremsgs.APIEndpointsPostNetworkMigrate,
JSONInputValue: func() interface{} { return &core.NamespaceMigration{} },
JSONOutputValue: func() interface{} { return &core.EmptyInput{} },
JSONOutputCodes: []int{http.StatusAccepted},
JSONHandler: func(r *oapispec.APIRequest) (output interface{}, err error) {
err = getOr(r.Ctx).MigrateNetwork(r.Ctx, r.Input.(*core.NamespaceMigration).ContractIndex)
return &core.EmptyInput{}, err
},
}
1 change: 1 addition & 0 deletions internal/apiserver/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var routes = append(
getStatusBatchManager,
getStatusPins,
getStatusWebSockets,
postNetworkMigrate,
postNewNamespace,
postNewOrganization,
postNewOrganizationSelf,
Expand Down
18 changes: 9 additions & 9 deletions internal/blockchain/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ const (
)

func (e *Ethereum) InitConfig(config config.Section) {
ethconnectConf := config.SubSection(EthconnectConfigKey)
wsclient.InitConfig(ethconnectConf)
ethconnectConf.AddKnownKey(EthconnectConfigTopic)
ethconnectConf.AddKnownKey(EthconnectConfigBatchSize, defaultBatchSize)
ethconnectConf.AddKnownKey(EthconnectConfigBatchTimeout, defaultBatchTimeout)
ethconnectConf.AddKnownKey(EthconnectPrefixShort, defaultPrefixShort)
ethconnectConf.AddKnownKey(EthconnectPrefixLong, defaultPrefixLong)
ethconnectConf.AddKnownKey(EthconnectConfigInstanceDeprecated)
ethconnectConf.AddKnownKey(EthconnectConfigFromBlockDeprecated, defaultFromBlock)
e.ethconnectConf = config.SubSection(EthconnectConfigKey)
wsclient.InitConfig(e.ethconnectConf)
e.ethconnectConf.AddKnownKey(EthconnectConfigTopic)
e.ethconnectConf.AddKnownKey(EthconnectConfigBatchSize, defaultBatchSize)
e.ethconnectConf.AddKnownKey(EthconnectConfigBatchTimeout, defaultBatchTimeout)
e.ethconnectConf.AddKnownKey(EthconnectPrefixShort, defaultPrefixShort)
e.ethconnectConf.AddKnownKey(EthconnectPrefixLong, defaultPrefixLong)
e.ethconnectConf.AddKnownKey(EthconnectConfigInstanceDeprecated)
e.ethconnectConf.AddKnownKey(EthconnectConfigFromBlockDeprecated, defaultFromBlock)

e.contractConf = config.SubArray(FireFlyContractConfigKey)
e.contractConf.AddKnownKey(FireFlyContractAddress)
Expand Down
140 changes: 86 additions & 54 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Ethereum struct {
closed chan struct{}
addressResolver *addressResolver
metrics metrics.Manager
ethconnectConf config.Section
contractConf config.ArraySection
}

Expand Down Expand Up @@ -159,7 +160,6 @@ func (e *Ethereum) VerifierType() core.VerifierType {

func (e *Ethereum) Init(ctx context.Context, config config.Section, callbacks blockchain.Callbacks, metrics metrics.Manager, contractIndex int) (err error) {
e.InitConfig(config)
ethconnectConf := config.SubSection(EthconnectConfigKey)
addressResolverConf := config.SubSection(AddressResolverConfigKey)
fftmConf := config.SubSection(FFTMConfigKey)

Expand All @@ -173,12 +173,11 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, callbacks bl
}
}

if ethconnectConf.GetString(ffresty.HTTPConfigURL) == "" {
if e.ethconnectConf.GetString(ffresty.HTTPConfigURL) == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "blockchain.ethereum.ethconnect")
}

e.client = ffresty.New(e.ctx, ethconnectConf)

e.client = ffresty.New(e.ctx, e.ethconnectConf)
if fftmConf.GetString(ffresty.HTTPConfigURL) != "" {
e.fftmClient = ffresty.New(e.ctx, fftmConf)
}
Expand All @@ -187,30 +186,15 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, callbacks bl
GlobalSequencer: true,
}

e.streams = &streamManager{client: e.client}
if e.contractConf.ArraySize() > contractIndex {
// New config (array of contracts)
e.contractAddress = e.contractConf.ArrayEntry(contractIndex).GetString(FireFlyContractAddress)
if e.contractAddress == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "address", "blockchain.fireflyContract")
}
e.streams.fireFlySubscriptionFromBlock = e.contractConf.ArrayEntry(contractIndex).GetString(FireFlyContractFromBlock)
} else {
// Old config (attributes under "ethconnect")
e.contractAddress = ethconnectConf.GetString(EthconnectConfigInstanceDeprecated)
if e.contractAddress != "" {
log.L(ctx).Warnf("The %s.%s config key has been deprecated. Please use %s.%s instead",
EthconnectConfigKey, EthconnectConfigInstanceDeprecated,
FireFlyContractConfigKey, FireFlyContractAddress)
} else {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "instance", "blockchain.ethereum.ethconnect")
}
e.streams.fireFlySubscriptionFromBlock = ethconnectConf.GetString(EthconnectConfigFromBlockDeprecated)
if e.streams.fireFlySubscriptionFromBlock != "" {
log.L(ctx).Warnf("The %s.%s config key has been deprecated. Please use %s.%s instead",
EthconnectConfigKey, EthconnectConfigFromBlockDeprecated,
FireFlyContractConfigKey, FireFlyContractFromBlock)
}
e.topic = e.ethconnectConf.GetString(EthconnectConfigTopic)
if e.topic == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", "blockchain.ethereum.ethconnect")
}
e.prefixShort = e.ethconnectConf.GetString(EthconnectPrefixShort)
e.prefixLong = e.ethconnectConf.GetString(EthconnectPrefixLong)

if e.streams, err = e.buildStreamManager(ctx, contractIndex); err != nil {
return err
}

// Backwards compatibility from when instance path was not a contract address
Expand All @@ -229,39 +213,69 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, callbacks bl
e.contractAddress = fmt.Sprintf("0x%s", e.contractAddress)
}

e.topic = ethconnectConf.GetString(EthconnectConfigTopic)
if e.topic == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", "blockchain.ethereum.ethconnect")
}

e.prefixShort = ethconnectConf.GetString(EthconnectPrefixShort)
e.prefixLong = ethconnectConf.GetString(EthconnectPrefixLong)
e.closed = make(chan struct{})
go e.eventLoop()

wsConfig := wsclient.GenerateConfig(ethconnectConf)
return nil
}

func (e *Ethereum) buildStreamManager(ctx context.Context, contractIndex int) (streams *streamManager, err error) {
wsConfig := wsclient.GenerateConfig(e.ethconnectConf)
if wsConfig.WSKeyPath == "" {
wsConfig.WSKeyPath = "/ws"
}
if e.wsconn, err = wsclient.New(ctx, wsConfig, nil, e.afterConnect); err != nil {
return nil, err
}

e.wsconn, err = wsclient.New(ctx, wsConfig, nil, e.afterConnect)
if err != nil {
return err
streams = &streamManager{client: e.client}
if e.contractConf.ArraySize() > 0 || contractIndex > 0 {
// New config (array of objects under "fireflyContract")
if contractIndex >= e.contractConf.ArraySize() {
return nil, i18n.NewError(ctx, coremsgs.MsgInvalidFireFlyContractIndex, fmt.Sprintf("blockchain.ethereum.fireflyContract[%d]", contractIndex))
}
e.contractAddress = e.contractConf.ArrayEntry(contractIndex).GetString(FireFlyContractAddress)
if e.contractAddress == "" {
return nil, i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "address", "blockchain.ethereum.fireflyContract")
}
streams.fireFlySubscriptionFromBlock = e.contractConf.ArrayEntry(contractIndex).GetString(FireFlyContractFromBlock)
} else {
// Old config (attributes under "ethconnect")
e.contractAddress = e.ethconnectConf.GetString(EthconnectConfigInstanceDeprecated)
if e.contractAddress != "" {
log.L(ctx).Warnf("The %s.%s config key has been deprecated. Please use %s.%s instead",
EthconnectConfigKey, EthconnectConfigInstanceDeprecated,
FireFlyContractConfigKey, FireFlyContractAddress)
} else {
return nil, i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "instance", "blockchain.ethereum.ethconnect")
}
streams.fireFlySubscriptionFromBlock = e.ethconnectConf.GetString(EthconnectConfigFromBlockDeprecated)
if streams.fireFlySubscriptionFromBlock != "" {
log.L(ctx).Warnf("The %s.%s config key has been deprecated. Please use %s.%s instead",
EthconnectConfigKey, EthconnectConfigFromBlockDeprecated,
FireFlyContractConfigKey, FireFlyContractFromBlock)
}
}

batchSize := ethconnectConf.GetUint(EthconnectConfigBatchSize)
batchTimeout := uint(ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds())
if e.initInfo.stream, err = e.streams.ensureEventStream(e.ctx, e.topic, batchSize, batchTimeout); err != nil {
return err
batchSize := e.ethconnectConf.GetUint(EthconnectConfigBatchSize)
batchTimeout := uint(e.ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds())
if e.initInfo.stream, err = streams.ensureEventStream(e.ctx, e.topic, batchSize, batchTimeout); err != nil {
return nil, err
}
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.initInfo.stream.ID, e.topic)
if e.initInfo.sub, err = e.streams.ensureFireFlySubscription(e.ctx, e.contractAddress, e.initInfo.stream.ID, batchPinEventABI); err != nil {
return err
if e.initInfo.sub, err = streams.ensureFireFlySubscription(e.ctx, e.contractAddress, e.initInfo.stream.ID, batchPinEventABI); err != nil {
return nil, err
}

e.closed = make(chan struct{})
go e.eventLoop()
return streams, nil
}

return nil
func (e *Ethereum) MigrateContract(ctx context.Context, contractIndex int) (err error) {
e.wsconn.Close()
if e.streams, err = e.buildStreamManager(ctx, contractIndex); err != nil {
return err
}
return e.Start()
}

func (e *Ethereum) Start() error {
Expand Down Expand Up @@ -303,7 +317,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
logIndex := msgJSON.GetInt64("logIndex")
dataJSON := msgJSON.GetObject("data")
authorAddress := dataJSON.GetString("author")
ns := dataJSON.GetString("namespace")
nsOrAction := dataJSON.GetString("namespace")
sUUIDs := dataJSON.GetString("uuids")
sBatchHash := dataJSON.GetString("batchHash")
sPayloadRef := dataJSON.GetString("payloadRef")
Expand All @@ -329,6 +343,16 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
log.L(ctx).Errorf("BatchPin event is not valid - bad from address (%s): %+v", err, msgJSON)
return nil // move on
}
signer := &core.VerifierRef{
Type: core.VerifierTypeEthAddress,
Value: authorAddress,
}

// Check if this is actually an operator action
if strings.HasPrefix(nsOrAction, blockchain.FireFlyActionPrefix) {
action := nsOrAction[len(blockchain.FireFlyActionPrefix):]
return e.callbacks.BlockchainOperatorAction(action, sPayloadRef, signer)
}

hexUUIDs, err := hex.DecodeString(strings.TrimPrefix(sUUIDs, "0x"))
if err != nil || len(hexUUIDs) != 32 {
Expand Down Expand Up @@ -360,7 +384,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON

delete(msgJSON, "data")
batch := &blockchain.BatchPin{
Namespace: ns,
Namespace: nsOrAction,
TransactionID: &txnID,
BatchID: &batchID,
BatchHash: &batchHash,
Expand All @@ -380,10 +404,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
}

// If there's an error dispatching the event, we must return the error and shutdown
return e.callbacks.BatchPinComplete(batch, &core.VerifierRef{
Type: core.VerifierTypeEthAddress,
Value: authorAddress,
})
return e.callbacks.BatchPinComplete(batch, signer)
}

func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
Expand Down Expand Up @@ -624,6 +645,17 @@ func (e *Ethereum) SubmitBatchPin(ctx context.Context, operationID *fftypes.UUID
return e.invokeContractMethod(ctx, e.contractAddress, signingKey, batchPinMethodABI, operationID.String(), input)
}

func (e *Ethereum) SubmitOperatorAction(ctx context.Context, operationID *fftypes.UUID, signingKey, action, payload string) error {
input := []interface{}{
blockchain.FireFlyActionPrefix + action,
ethHexFormatB32(nil),
ethHexFormatB32(nil),
payload,
[]string{},
}
return e.invokeContractMethod(ctx, e.contractAddress, signingKey, batchPinMethodABI, operationID.String(), input)
}

func (e *Ethereum) InvokeContract(ctx context.Context, operationID *fftypes.UUID, signingKey string, location *fftypes.JSONAny, method *core.FFIMethod, input map[string]interface{}) error {
ethereumLocation, err := parseContractLocation(ctx, location)
if err != nil {
Expand Down
Loading

0 comments on commit e999cad

Please sign in to comment.