Skip to content

Commit

Permalink
Fabric support, test coverage, and misc fixes for contract migration
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed May 18, 2022
1 parent ba4b7d9 commit e101f69
Show file tree
Hide file tree
Showing 19 changed files with 788 additions and 123 deletions.
18 changes: 16 additions & 2 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ nav_order: 3
|---|-----------|----|-------------|
|batchSize|The number of events Fabconnect should batch together for delivery to FireFly core. Only applies when automatically creating a new event stream.|`int`|`50`
|batchTimeout|The maximum amount of time to wait for a batch to complete|[`time.Duration`](https://pkg.go.dev/time#Duration)|`500`
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions|`string`|`<nil>`
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions (deprecated - use fireflyContract[].chaincode)|`string`|`<nil>`
|channel|The Fabric channel that FireFly will use for BatchPin transactions|`string`|`<nil>`
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
Expand Down Expand Up @@ -303,6 +303,13 @@ nav_order: 3
|readBufferSize|The size in bytes of the read buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`
|writeBufferSize|The size in bytes of the write buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`

## blockchain.fabric.fireflyContract[]

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions|`string`|`<nil>`
|fromBlock|The first event this FireFly instance should listen to from the BatchPin chaincode. Default=0. Only affects initial creation of the event stream|Address `string`|`<nil>`

## blockchainevent.cache

|Key|Description|Type|Default Value|
Expand Down Expand Up @@ -831,7 +838,7 @@ nav_order: 3
|---|-----------|----|-------------|
|batchSize|The number of events Fabconnect should batch together for delivery to FireFly core. Only applies when automatically creating a new event stream.|`int`|`50`
|batchTimeout|The maximum amount of time to wait for a batch to complete|[`time.Duration`](https://pkg.go.dev/time#Duration)|`500`
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions|`string`|`<nil>`
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions (deprecated - use fireflyContract[].chaincode)|`string`|`<nil>`
|channel|The Fabric channel that FireFly will use for BatchPin transactions|`string`|`<nil>`
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
Expand Down Expand Up @@ -878,6 +885,13 @@ nav_order: 3
|readBufferSize|The size in bytes of the read buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`
|writeBufferSize|The size in bytes of the write buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`

## plugins.blockchain[].fabric.fireflyContract[]

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions|`string`|`<nil>`
|fromBlock|The first event this FireFly instance should listen to from the BatchPin chaincode. Default=0. Only affects initial creation of the event stream|Address `string`|`<nil>`

## plugins.database[]

|Key|Description|Type|Default Value|
Expand Down
8 changes: 7 additions & 1 deletion docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22437,7 +22437,13 @@ paths:
"202":
content:
application/json:
schema: {}
schema:
properties:
contractIndex:
description: The index of the configured FireFly smart contract
currently being used
type: integer
type: object
description: Success
default:
description: ""
Expand Down
4 changes: 2 additions & 2 deletions internal/apiserver/route_post_network_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ var postNetworkMigrate = &oapispec.Route{
FilterFactory: nil,
Description: coremsgs.APIEndpointsPostNetworkMigrate,
JSONInputValue: func() interface{} { return &core.NamespaceMigration{} },
JSONOutputValue: func() interface{} { return &core.EmptyInput{} },
JSONOutputValue: func() interface{} { return &core.NamespaceMigration{} },
JSONOutputCodes: []int{http.StatusAccepted},
JSONHandler: func(r *oapispec.APIRequest) (output interface{}, err error) {
err = getOr(r.Ctx).MigrateNetwork(r.Ctx, r.Input.(*core.NamespaceMigration).ContractIndex)
return &core.EmptyInput{}, err
return r.Input, err
},
}
43 changes: 43 additions & 0 deletions internal/apiserver/route_post_network_migrate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"bytes"
"encoding/json"
"net/http/httptest"
"testing"

"github.com/hyperledger/firefly/pkg/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestPostNetworkMigrate(t *testing.T) {
o, r := newTestAPIServer()
input := core.NamespaceMigration{ContractIndex: 1}
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&input)
req := httptest.NewRequest("POST", "/api/v1/network/migrate", &buf)
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

o.On("MigrateNetwork", mock.Anything, 1).Return(nil)
r.ServeHTTP(res, req)

assert.Equal(t, 202, res.Result().StatusCode)
}
2 changes: 1 addition & 1 deletion internal/blockchain/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *Ethereum) InitConfig(config config.Section) {

e.contractConf = config.SubArray(FireFlyContractConfigKey)
e.contractConf.AddKnownKey(FireFlyContractAddress)
e.contractConf.AddKnownKey(FireFlyContractFromBlock)
e.contractConf.AddKnownKey(FireFlyContractFromBlock, "oldest")

fftmConf := config.SubSection(FFTMConfigKey)
ffresty.InitConfig(fftmConf)
Expand Down
12 changes: 5 additions & 7 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,8 @@ func (e *Ethereum) buildStreamManager(ctx context.Context, contractIndex int) (s

func (e *Ethereum) MigrateContract(ctx context.Context, contractIndex int) (err error) {
e.wsconn.Close()
if e.streams, err = e.buildStreamManager(ctx, contractIndex); err != nil {
return err
}
return e.Start()
e.streams, err = e.buildStreamManager(ctx, contractIndex)
return err
}

func (e *Ethereum) Start() error {
Expand Down Expand Up @@ -343,15 +341,15 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
log.L(ctx).Errorf("BatchPin event is not valid - bad from address (%s): %+v", err, msgJSON)
return nil // move on
}
signer := &core.VerifierRef{
verifier := &core.VerifierRef{
Type: core.VerifierTypeEthAddress,
Value: authorAddress,
}

// Check if this is actually an operator action
if strings.HasPrefix(nsOrAction, blockchain.FireFlyActionPrefix) {
action := nsOrAction[len(blockchain.FireFlyActionPrefix):]
return e.callbacks.BlockchainOperatorAction(action, sPayloadRef, signer)
return e.callbacks.BlockchainOperatorAction(action, sPayloadRef, verifier)
}

hexUUIDs, err := hex.DecodeString(strings.TrimPrefix(sUUIDs, "0x"))
Expand Down Expand Up @@ -404,7 +402,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
}

// If there's an error dispatching the event, we must return the error and shutdown
return e.callbacks.BatchPinComplete(batch, signer)
return e.callbacks.BatchPinComplete(batch, verifier)
}

func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
Expand Down
145 changes: 145 additions & 0 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,25 @@ func TestInitNewConfigError(t *testing.T) {
assert.Regexp(t, "FF10138", err)
}

func TestInitNewConfigBadIndex(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
resetConf(e)

mockedClient := &http.Client{}
httpmock.ActivateNonDefault(mockedClient)
defer httpmock.DeactivateAndReset()

resetConf(e)
utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")
utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient)
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")
utConfig.AddKnownKey(FireFlyContractConfigKey+".0."+FireFlyContractAddress, "")

err := e.Init(e.ctx, utConfig, &blockchainmocks.Callbacks{}, &metricsmocks.Manager{}, 1)
assert.Regexp(t, "FF10392", err)
}

func TestStreamQueryError(t *testing.T) {

e, cancel := newTestEthereum()
Expand Down Expand Up @@ -2559,3 +2578,129 @@ func TestGenerateEventSignatureInvalid(t *testing.T) {
signature := e.GenerateEventSignature(context.Background(), event)
assert.Equal(t, "", signature)
}

func TestSubmitOperatorAction(t *testing.T) {
e, _ := newTestEthereum()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()
httpmock.RegisterResponder("POST", `http://localhost:12345/`,
func(req *http.Request) (*http.Response, error) {
var body map[string]interface{}
json.NewDecoder(req.Body).Decode(&body)
params := body["params"].([]interface{})
headers := body["headers"].(map[string]interface{})
assert.Equal(t, "SendTransaction", headers["type"])
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000000", params[1])
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000000", params[2])
assert.Equal(t, "1", params[3])
return httpmock.NewJsonResponderOrPanic(200, "")(req)
})

err := e.SubmitOperatorAction(context.Background(), fftypes.NewUUID(), "0x123", blockchain.OperatorActionMigrate, "1")
assert.NoError(t, err)
}

func TestMigrateContract(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()

wsm := e.wsconn.(*wsmocks.WSClient)
wsm.On("Close").Return()

mockedClient := &http.Client{}
httpmock.ActivateNonDefault(mockedClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{}))
httpmock.RegisterResponder("POST", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, eventStream{ID: "es12345"}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{}))
httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions",
func(req *http.Request) (*http.Response, error) {
var body map[string]interface{}
json.NewDecoder(req.Body).Decode(&body)
assert.Equal(t, "es12345", body["stream"])
return httpmock.NewJsonResponderOrPanic(200, subscription{ID: "sub12345"})(req)
})

resetConf(e)
utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")
utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient)
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")
utConfig.AddKnownKey(FireFlyContractConfigKey+".0."+FireFlyContractAddress, "test")

e.client = ffresty.New(e.ctx, e.ethconnectConf)
err := e.MigrateContract(context.Background(), 0)
assert.NoError(t, err)
}

func TestMigrateContractBadIndex(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()

wsm := e.wsconn.(*wsmocks.WSClient)
wsm.On("Close").Return()

mockedClient := &http.Client{}
httpmock.ActivateNonDefault(mockedClient)
defer httpmock.DeactivateAndReset()

resetConf(e)
utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")
utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient)
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")
utConfig.AddKnownKey(FireFlyContractConfigKey+".0."+FireFlyContractAddress, "")

err := e.MigrateContract(context.Background(), 1)
assert.Regexp(t, "FF10392", err)
}

func TestHandleOperatorAction(t *testing.T) {
data := fftypes.JSONAnyPtr(`
[
{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
"blockNumber": "38011",
"transactionIndex": "0x0",
"transactionHash": "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628",
"data": {
"author": "0X91D2B4381A4CD5C7C0F27565A7D4B829844C8635",
"namespace": "firefly:migrate",
"uuids": "0x0000000000000000000000000000000000000000000000000000000000000000",
"batchHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"payloadRef": "1",
"contexts": []
},
"subId": "sb-b5b97a4e-a317-4053-6400-1474650efcb5",
"signature": "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])",
"logIndex": "50",
"timestamp": "1620576488"
}
]`)

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: em,
}
e.initInfo.sub = &subscription{
ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5",
}

expectedSigningKeyRef := &core.VerifierRef{
Type: core.VerifierTypeEthAddress,
Value: "0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635",
}

em.On("BlockchainOperatorAction", "migrate", "1", expectedSigningKeyRef).Return(nil)

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)

em.AssertExpectations(t)

}
3 changes: 1 addition & 2 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import (
)

type streamManager struct {
client *resty.Client

client *resty.Client
fireFlySubscriptionFromBlock string
}

Expand Down
35 changes: 23 additions & 12 deletions internal/blockchain/fabric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ const (

// FabconnectConfigDefaultChannel is the default Fabric channel to use if no "ledger" is specified in requests
FabconnectConfigDefaultChannel = "channel"
// FabconnectConfigChaincode is the Fabric Firefly chaincode deployed to the Firefly channels
FabconnectConfigChaincode = "chaincode"
// FabconnectConfigSigner is the signer identity used to subscribe to FireFly chaincode events
FabconnectConfigSigner = "signer"
// FabconnectConfigTopic is the websocket listen topic that the node should register on, which is important if there are multiple
Expand All @@ -49,17 +47,30 @@ const (
FabconnectPrefixShort = "prefixShort"
// FabconnectPrefixLong is used in HTTP headers in requests to ethconnect
FabconnectPrefixLong = "prefixLong"
// FabconnectConfigChaincodeDeprecated is the Fabric Firefly chaincode deployed to the Firefly channels
FabconnectConfigChaincodeDeprecated = "chaincode"

// FireFlyContractConfigKey is a sub-key in the config to contain the info on the deployed FireFly contract
FireFlyContractConfigKey = "fireflyContract"
// FireFlyContractChaincode is the Fabric Firefly chaincode deployed to the Firefly channels
FireFlyContractChaincode = "chaincode"
// FireFlyContractFromBlock is the configuration of the first block to listen to when creating the listener
FireFlyContractFromBlock = "fromBlock"
)

func (f *Fabric) InitConfig(config config.Section) {
fabconnectConf := config.SubSection(FabconnectConfigKey)
wsclient.InitConfig(fabconnectConf)
fabconnectConf.AddKnownKey(FabconnectConfigDefaultChannel)
fabconnectConf.AddKnownKey(FabconnectConfigChaincode)
fabconnectConf.AddKnownKey(FabconnectConfigSigner)
fabconnectConf.AddKnownKey(FabconnectConfigTopic)
fabconnectConf.AddKnownKey(FabconnectConfigBatchSize, defaultBatchSize)
fabconnectConf.AddKnownKey(FabconnectConfigBatchTimeout, defaultBatchTimeout)
fabconnectConf.AddKnownKey(FabconnectPrefixShort, defaultPrefixShort)
fabconnectConf.AddKnownKey(FabconnectPrefixLong, defaultPrefixLong)
f.fabconnectConf = config.SubSection(FabconnectConfigKey)
wsclient.InitConfig(f.fabconnectConf)
f.fabconnectConf.AddKnownKey(FabconnectConfigDefaultChannel)
f.fabconnectConf.AddKnownKey(FabconnectConfigChaincodeDeprecated)
f.fabconnectConf.AddKnownKey(FabconnectConfigSigner)
f.fabconnectConf.AddKnownKey(FabconnectConfigTopic)
f.fabconnectConf.AddKnownKey(FabconnectConfigBatchSize, defaultBatchSize)
f.fabconnectConf.AddKnownKey(FabconnectConfigBatchTimeout, defaultBatchTimeout)
f.fabconnectConf.AddKnownKey(FabconnectPrefixShort, defaultPrefixShort)
f.fabconnectConf.AddKnownKey(FabconnectPrefixLong, defaultPrefixLong)

f.contractConf = config.SubArray(FireFlyContractConfigKey)
f.contractConf.AddKnownKey(FireFlyContractChaincode)
f.contractConf.AddKnownKey(FireFlyContractFromBlock, "oldest")
}
Loading

0 comments on commit e101f69

Please sign in to comment.