Skip to content

Commit

Permalink
Add new Fabric contract config
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed May 20, 2022
1 parent 2ddb05d commit 26fdac4
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 92 deletions.
18 changes: 16 additions & 2 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ nav_order: 3
|---|-----------|----|-------------|
|batchSize|The number of events Fabconnect should batch together for delivery to FireFly core. Only applies when automatically creating a new event stream|`int`|`50`
|batchTimeout|The maximum amount of time to wait for a batch to complete|[`time.Duration`](https://pkg.go.dev/time#Duration)|`500`
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions|`string`|`<nil>`
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions (deprecated - use fireflyContract[].chaincode)|`string`|`<nil>`
|channel|The Fabric channel that FireFly will use for BatchPin transactions|`string`|`<nil>`
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
Expand Down Expand Up @@ -302,6 +302,13 @@ nav_order: 3
|readBufferSize|The size in bytes of the read buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`
|writeBufferSize|The size in bytes of the write buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`

## blockchain.fabric.fireflyContract[]

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions|`string`|`<nil>`
|fromBlock|The first event this FireFly instance should listen to from the BatchPin chaincode. Default=0. Only affects initial creation of the event stream|Address `string`|`<nil>`

## blockchainevent.cache

|Key|Description|Type|Default Value|
Expand Down Expand Up @@ -830,7 +837,7 @@ nav_order: 3
|---|-----------|----|-------------|
|batchSize|The number of events Fabconnect should batch together for delivery to FireFly core. Only applies when automatically creating a new event stream|`int`|`50`
|batchTimeout|The maximum amount of time to wait for a batch to complete|[`time.Duration`](https://pkg.go.dev/time#Duration)|`500`
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions|`string`|`<nil>`
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions (deprecated - use fireflyContract[].chaincode)|`string`|`<nil>`
|channel|The Fabric channel that FireFly will use for BatchPin transactions|`string`|`<nil>`
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
Expand Down Expand Up @@ -877,6 +884,13 @@ nav_order: 3
|readBufferSize|The size in bytes of the read buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`
|writeBufferSize|The size in bytes of the write buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb`

## plugins.blockchain[].fabric.fireflyContract[]

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|chaincode|The name of the Fabric chaincode that FireFly will use for BatchPin transactions|`string`|`<nil>`
|fromBlock|The first event this FireFly instance should listen to from the BatchPin chaincode. Default=0. Only affects initial creation of the event stream|Address `string`|`<nil>`

## plugins.database[]

|Key|Description|Type|Default Value|
Expand Down
35 changes: 23 additions & 12 deletions internal/blockchain/fabric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ const (

// FabconnectConfigDefaultChannel is the default Fabric channel to use if no "ledger" is specified in requests
FabconnectConfigDefaultChannel = "channel"
// FabconnectConfigChaincode is the Fabric Firefly chaincode deployed to the Firefly channels
FabconnectConfigChaincode = "chaincode"
// FabconnectConfigSigner is the signer identity used to subscribe to FireFly chaincode events
FabconnectConfigSigner = "signer"
// FabconnectConfigTopic is the websocket listen topic that the node should register on, which is important if there are multiple
Expand All @@ -49,17 +47,30 @@ const (
FabconnectPrefixShort = "prefixShort"
// FabconnectPrefixLong is used in HTTP headers in requests to ethconnect
FabconnectPrefixLong = "prefixLong"
// FabconnectConfigChaincodeDeprecated is the Fabric Firefly chaincode deployed to the Firefly channels
FabconnectConfigChaincodeDeprecated = "chaincode"

// FireFlyContractConfigKey is a sub-key in the config to contain the info on the deployed FireFly contract
FireFlyContractConfigKey = "fireflyContract"
// FireFlyContractChaincode is the Fabric Firefly chaincode deployed to the Firefly channels
FireFlyContractChaincode = "chaincode"
// FireFlyContractFromBlock is the configuration of the first block to listen to when creating the listener
FireFlyContractFromBlock = "fromBlock"
)

func (f *Fabric) InitConfig(config config.Section) {
fabconnectConf := config.SubSection(FabconnectConfigKey)
wsclient.InitConfig(fabconnectConf)
fabconnectConf.AddKnownKey(FabconnectConfigDefaultChannel)
fabconnectConf.AddKnownKey(FabconnectConfigChaincode)
fabconnectConf.AddKnownKey(FabconnectConfigSigner)
fabconnectConf.AddKnownKey(FabconnectConfigTopic)
fabconnectConf.AddKnownKey(FabconnectConfigBatchSize, defaultBatchSize)
fabconnectConf.AddKnownKey(FabconnectConfigBatchTimeout, defaultBatchTimeout)
fabconnectConf.AddKnownKey(FabconnectPrefixShort, defaultPrefixShort)
fabconnectConf.AddKnownKey(FabconnectPrefixLong, defaultPrefixLong)
f.fabconnectConf = config.SubSection(FabconnectConfigKey)
wsclient.InitConfig(f.fabconnectConf)
f.fabconnectConf.AddKnownKey(FabconnectConfigDefaultChannel)
f.fabconnectConf.AddKnownKey(FabconnectConfigChaincodeDeprecated)
f.fabconnectConf.AddKnownKey(FabconnectConfigSigner)
f.fabconnectConf.AddKnownKey(FabconnectConfigTopic)
f.fabconnectConf.AddKnownKey(FabconnectConfigBatchSize, defaultBatchSize)
f.fabconnectConf.AddKnownKey(FabconnectConfigBatchTimeout, defaultBatchTimeout)
f.fabconnectConf.AddKnownKey(FabconnectPrefixShort, defaultPrefixShort)
f.fabconnectConf.AddKnownKey(FabconnectPrefixLong, defaultPrefixLong)

f.contractConf = config.SubArray(FireFlyContractConfigKey)
f.contractConf.AddKnownKey(FireFlyContractChaincode)
f.contractConf.AddKnownKey(FireFlyContractFromBlock, "oldest")
}
7 changes: 4 additions & 3 deletions internal/blockchain/fabric/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

type streamManager struct {
client *resty.Client
signer string
client *resty.Client
signer string
fireFlySubscriptionFromBlock string
}

type eventStream struct {
Expand Down Expand Up @@ -164,7 +165,7 @@ func (s *streamManager) ensureSubscription(ctx context.Context, location *Locati
}

if sub == nil {
if sub, err = s.createSubscription(ctx, location, stream, subName, event, string(core.SubOptsFirstEventOldest)); err != nil {
if sub, err = s.createSubscription(ctx, location, stream, subName, event, s.fireFlySubscriptionFromBlock); err != nil {
return nil, err
}
}
Expand Down
76 changes: 50 additions & 26 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ type Fabric struct {
stream *eventStream
sub *subscription
}
idCache map[string]*fabIdentity
wsconn wsclient.WSClient
closed chan struct{}
metrics metrics.Manager
idCache map[string]*fabIdentity
wsconn wsclient.WSClient
closed chan struct{}
metrics metrics.Manager
fabconnectConf config.Section
contractConf config.ArraySection
}

type eventStreamWebsocket struct {
Expand Down Expand Up @@ -161,51 +163,70 @@ func (f *Fabric) VerifierType() core.VerifierType {
}

func (f *Fabric) Init(ctx context.Context, config config.Section, callbacks blockchain.Callbacks, metrics metrics.Manager) (err error) {
fabconnectConf := config.SubSection(FabconnectConfigKey)
f.InitConfig(config)

f.ctx = log.WithLogField(ctx, "proto", "fabric")
f.callbacks = callbacks
f.idCache = make(map[string]*fabIdentity)
f.metrics = metrics
f.capabilities = &blockchain.Capabilities{}

if fabconnectConf.GetString(ffresty.HTTPConfigURL) == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "blockchain.fabconnect")
}
f.defaultChannel = fabconnectConf.GetString(FabconnectConfigDefaultChannel)
f.chaincode = fabconnectConf.GetString(FabconnectConfigChaincode)
if f.chaincode == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "chaincode", "blockchain.fabconnect")
if f.fabconnectConf.GetString(ffresty.HTTPConfigURL) == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "blockchain.fabric.fabconnect")
}
f.defaultChannel = f.fabconnectConf.GetString(FabconnectConfigDefaultChannel)
// the org identity is guaranteed to be configured by the core
f.signer = fabconnectConf.GetString(FabconnectConfigSigner)
f.topic = fabconnectConf.GetString(FabconnectConfigTopic)
f.signer = f.fabconnectConf.GetString(FabconnectConfigSigner)
f.topic = f.fabconnectConf.GetString(FabconnectConfigTopic)
if f.topic == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", "blockchain.fabconnect")
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", "blockchain.fabric.fabconnect")
}

f.prefixShort = fabconnectConf.GetString(FabconnectPrefixShort)
f.prefixLong = fabconnectConf.GetString(FabconnectPrefixLong)
f.prefixShort = f.fabconnectConf.GetString(FabconnectPrefixShort)
f.prefixLong = f.fabconnectConf.GetString(FabconnectPrefixLong)

f.client = ffresty.New(f.ctx, fabconnectConf)
f.client = ffresty.New(f.ctx, f.fabconnectConf)

wsConfig := wsclient.GenerateConfig(fabconnectConf)
return nil
}

func (f *Fabric) initStreams(contractIndex int) (err error) {
ctx := f.ctx
wsConfig := wsclient.GenerateConfig(f.fabconnectConf)
if wsConfig.WSKeyPath == "" {
wsConfig.WSKeyPath = "/ws"
}

f.wsconn, err = wsclient.New(ctx, wsConfig, nil, f.afterConnect)
if err != nil {
return err
}

f.streams = &streamManager{
client: f.client,
signer: f.signer,
f.streams = &streamManager{client: f.client, signer: f.signer}
if f.contractConf.ArraySize() > 0 || contractIndex > 0 {
// New config (array of objects under "fireflyContract")
if contractIndex >= f.contractConf.ArraySize() {
return i18n.NewError(ctx, coremsgs.MsgInvalidFireFlyContractIndex, fmt.Sprintf("blockchain.fabric.fireflyContract[%d]", contractIndex))
}
f.chaincode = f.contractConf.ArrayEntry(contractIndex).GetString(FireFlyContractChaincode)
if f.chaincode == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "address", "blockchain.fabric.fireflyContract")
}
f.streams.fireFlySubscriptionFromBlock = f.contractConf.ArrayEntry(contractIndex).GetString(FireFlyContractFromBlock)
} else {
// Old config (attributes under "ethconnect")
f.chaincode = f.fabconnectConf.GetString(FabconnectConfigChaincodeDeprecated)
if f.chaincode != "" {
log.L(ctx).Warnf("The %s.%s config key has been deprecated. Please use %s.%s instead",
FabconnectConfigKey, FabconnectConfigChaincodeDeprecated,
FireFlyContractConfigKey, FireFlyContractChaincode)
f.streams.fireFlySubscriptionFromBlock = string(core.SubOptsFirstEventOldest)
} else {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "chaincode", "blockchain.fabric.fabconnect")
}
}
batchSize := fabconnectConf.GetUint(FabconnectConfigBatchSize)
batchTimeout := uint(fabconnectConf.GetDuration(FabconnectConfigBatchTimeout).Milliseconds())

batchSize := f.fabconnectConf.GetUint(FabconnectConfigBatchSize)
batchTimeout := uint(f.fabconnectConf.GetDuration(FabconnectConfigBatchTimeout).Milliseconds())
if f.initInfo.stream, err = f.streams.ensureEventStream(f.ctx, f.topic, batchSize, batchTimeout); err != nil {
return err
}
Expand All @@ -224,7 +245,10 @@ func (f *Fabric) Init(ctx context.Context, config config.Section, callbacks bloc
return nil
}

func (f *Fabric) Start(contractIndex int) error {
func (f *Fabric) Start(contractIndex int) (err error) {
if err = f.initStreams(contractIndex); err != nil {
return err
}
return f.wsconn.Connect()
}

Expand Down
Loading

0 comments on commit 26fdac4

Please sign in to comment.