From ad9abadcfafb3aca4e7b3c27c852f3d220d9677b Mon Sep 17 00:00:00 2001 From: shiqizng <80276844+shiqizng@users.noreply.github.com> Date: Fri, 10 Mar 2023 16:30:34 -0500 Subject: [PATCH] filter plugin: include all transaction in group (#5) --- .../processors/filterprocessor/config.go | 2 + .../filterprocessor/fields/filter.go | 22 +- .../filterprocessor/filter_processor.go | 1 + .../filter_processor_bench_test.go | 40 +- .../filterprocessor/filter_processor_test.go | 432 ++++++++++++++++++ .../processors/filterprocessor/sample.yaml | 5 +- .../processors/filterprocessor/sample_test.go | 48 -- 7 files changed, 489 insertions(+), 61 deletions(-) delete mode 100644 conduit/plugins/processors/filterprocessor/sample_test.go diff --git a/conduit/plugins/processors/filterprocessor/config.go b/conduit/plugins/processors/filterprocessor/config.go index 4d45eb91..54b92c9b 100644 --- a/conduit/plugins/processors/filterprocessor/config.go +++ b/conduit/plugins/processors/filterprocessor/config.go @@ -37,6 +37,8 @@ type SubConfig struct { type Config struct { // search-inner configures the filter processor to recursively search inner transactions for expressions. SearchInner bool `yaml:"search-inner"` + // omit-group-transactions configures the filter processor to return the matched transaction without its grouped transactions. + OmitGroupTransactions bool `yaml:"omit-group-transactions"` /* filters are a list of SubConfig objects with an operation acting as the string key in the map filters: diff --git a/conduit/plugins/processors/filterprocessor/fields/filter.go b/conduit/plugins/processors/filterprocessor/fields/filter.go index 7fea35de..e44ae8c2 100644 --- a/conduit/plugins/processors/filterprocessor/fields/filter.go +++ b/conduit/plugins/processors/filterprocessor/fields/filter.go @@ -26,6 +26,7 @@ func ValidFieldOperation(input string) bool { type Filter struct { Op Operation Searchers []*Searcher + OmitGroup bool } func (f Filter) matches(txn *sdk.SignedTxnWithAD) (bool, error) { @@ -55,13 +56,28 @@ func (f Filter) matches(txn *sdk.SignedTxnWithAD) (bool, error) { // SearchAndFilter searches through the block data and applies the operation to the results func (f Filter) SearchAndFilter(payset []sdk.SignedTxnInBlock) ([]sdk.SignedTxnInBlock, error) { var result []sdk.SignedTxnInBlock - for _, txn := range payset { - match, err := f.matches(&txn.SignedTxnWithAD) + firstGroupIdx := 0 + for i := 0; i < len(payset); i++ { + if payset[firstGroupIdx].Txn.Group != payset[i].Txn.Group { + firstGroupIdx = i + } + match, err := f.matches(&payset[i].SignedTxnWithAD) if err != nil { return nil, err } if match { - result = append(result, txn) + // if txn.Group is set and omit group is false + if payset[i].Txn.Group != (sdk.Digest{}) && !f.OmitGroup { + j := firstGroupIdx + // append all txns with same group ID + for ; j < len(payset) && payset[j].Txn.Group == payset[firstGroupIdx].Txn.Group; j++ { + result = append(result, payset[j]) + } + // skip txns that are already added, set i to the index of the last added txn + i = j - 1 + } else { + result = append(result, payset[i]) + } } } diff --git a/conduit/plugins/processors/filterprocessor/filter_processor.go b/conduit/plugins/processors/filterprocessor/filter_processor.go index 2d536fb5..a0925bb7 100644 --- a/conduit/plugins/processors/filterprocessor/filter_processor.go +++ b/conduit/plugins/processors/filterprocessor/filter_processor.go @@ -106,6 +106,7 @@ func (a *FilterProcessor) Init(ctx context.Context, _ data.InitProvider, cfg plu ff := fields.Filter{ Op: fields.Operation(key), Searchers: searcherList, + OmitGroup: a.cfg.OmitGroupTransactions, } a.FieldFilters = append(a.FieldFilters, ff) diff --git a/conduit/plugins/processors/filterprocessor/filter_processor_bench_test.go b/conduit/plugins/processors/filterprocessor/filter_processor_bench_test.go index d609e660..8ef73d13 100644 --- a/conduit/plugins/processors/filterprocessor/filter_processor_bench_test.go +++ b/conduit/plugins/processors/filterprocessor/filter_processor_bench_test.go @@ -24,6 +24,22 @@ func blockData(addr sdk.Address, numInner int) (block data.BlockData, searchTag SignedTxnWithAD: sdk.SignedTxnWithAD{ SignedTxn: sdk.SignedTxn{ AuthAddr: addr, + Txn: sdk.Transaction{ + Header: sdk.Header{ + Group: sdk.Digest{1}, + }, + }, + }, + }, + }, + { + SignedTxnWithAD: sdk.SignedTxnWithAD{ + SignedTxn: sdk.SignedTxn{ + Txn: sdk.Transaction{ + Header: sdk.Header{ + Group: sdk.Digest{1}, + }, + }, }, }, }, @@ -52,21 +68,27 @@ func BenchmarkProcess(b *testing.B) { addr[0] = 0x01 var table = []struct { - input int + input int + outputlen int + omitGroupTxns bool }{ - {input: 0}, - {input: 10}, - {input: 100}, + {input: 0, outputlen: 1, omitGroupTxns: true}, + {input: 10, outputlen: 1, omitGroupTxns: true}, + {input: 100, outputlen: 1, omitGroupTxns: true}, + {input: 0, outputlen: 2, omitGroupTxns: false}, + {input: 10, outputlen: 2, omitGroupTxns: false}, + {input: 100, outputlen: 2, omitGroupTxns: false}, } for _, v := range table { - b.Run(fmt.Sprintf("inner_txn_count_%d", v.input), func(b *testing.B) { + b.Run(fmt.Sprintf("inner_txn_count_%d_omitGrouptxns_%t", v.input, v.omitGroupTxns), func(b *testing.B) { bd, tag := blockData(addr, v.input) - cfgStr := fmt.Sprintf(`filters: + cfgStr := fmt.Sprintf(`search-inner: true +omit-group-transactions: %t +filters: - all: - tag: %s - search-inner: true expression-type: equal - expression: "%s"`, tag, addr.String()) + expression: "%s"`, v.omitGroupTxns, tag, addr.String()) fp := &FilterProcessor{} err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfgStr), logrus.New()) @@ -76,7 +98,7 @@ func BenchmarkProcess(b *testing.B) { { out, err := fp.Process(bd) require.NoError(b, err) - require.Len(b, out.Payset, 1) + require.Len(b, out.Payset, v.outputlen) } // Ignore the setup cost above. diff --git a/conduit/plugins/processors/filterprocessor/filter_processor_test.go b/conduit/plugins/processors/filterprocessor/filter_processor_test.go index a9f994e4..15b7eb56 100644 --- a/conduit/plugins/processors/filterprocessor/filter_processor_test.go +++ b/conduit/plugins/processors/filterprocessor/filter_processor_test.go @@ -1058,3 +1058,435 @@ filters: require.NoError(t, err) assert.Equal(t, output.Payset, []sdk.SignedTxnInBlock{bd.Payset[1]}) } + +func createPaysetGroupedTxns() []sdk.SignedTxnInBlock { + sampleAddr1 := sdk.Address{1} + sampleAddr2 := sdk.Address{2} + return []sdk.SignedTxnInBlock{ + // a txn in group 1 with an inner txn + { + SignedTxnWithAD: sdk.SignedTxnWithAD{ + SignedTxn: sdk.SignedTxn{ + AuthAddr: sampleAddr1, + Txn: sdk.Transaction{ + PaymentTxnFields: sdk.PaymentTxnFields{ + Receiver: sampleAddr1, + Amount: 123, + }, + Header: sdk.Header{ + Sender: sampleAddr2, + Group: sdk.Digest{1}, + }, + }, + }, + ApplyData: sdk.ApplyData{ + EvalDelta: sdk.EvalDelta{ + InnerTxns: []sdk.SignedTxnWithAD{ + { + SignedTxn: sdk.SignedTxn{ + Txn: sdk.Transaction{ + Header: sdk.Header{ + Sender: sampleAddr1, + GenesisID: "testnet", + }, + }, + }, + }, + }, + }, + }, + }, + }, + // 2nd txn in group 1 + { + SignedTxnWithAD: sdk.SignedTxnWithAD{ + SignedTxn: sdk.SignedTxn{ + AuthAddr: sampleAddr1, + Txn: sdk.Transaction{ + Header: sdk.Header{ + Sender: sampleAddr2, + Group: sdk.Digest{1}, + }, + AssetConfigTxnFields: sdk.AssetConfigTxnFields{ + ConfigAsset: 0, + AssetParams: sdk.AssetParams{ + Total: 10, + UnitName: "assetA", + AssetName: "assetA", + }, + }, + }, + }, + }, + }, + // a simple txn in group 2 + { + SignedTxnWithAD: sdk.SignedTxnWithAD{ + SignedTxn: sdk.SignedTxn{ + AuthAddr: sampleAddr1, + Txn: sdk.Transaction{ + PaymentTxnFields: sdk.PaymentTxnFields{ + Receiver: sampleAddr1, + Amount: 99, + }, + Header: sdk.Header{ + Sender: sampleAddr2, + Group: sdk.Digest{2}, + }, + }, + }, + }, + }, + // 2nd txn in group 2, nested inner txns + { + SignedTxnWithAD: sdk.SignedTxnWithAD{ + SignedTxn: sdk.SignedTxn{ + AuthAddr: sampleAddr1, + Txn: sdk.Transaction{ + Header: sdk.Header{ + Sender: sampleAddr2, + Group: sdk.Digest{2}, + }, + ApplicationFields: sdk.ApplicationFields{ + ApplicationCallTxnFields: sdk.ApplicationCallTxnFields{ + ApplicationID: 1, + }, + }, + }, + }, + ApplyData: sdk.ApplyData{ + EvalDelta: sdk.EvalDelta{ + InnerTxns: []sdk.SignedTxnWithAD{ + { + SignedTxn: sdk.SignedTxn{ + Txn: sdk.Transaction{ + Header: sdk.Header{ + Sender: sampleAddr1, + }, + }, + }, + ApplyData: sdk.ApplyData{ + EvalDelta: sdk.EvalDelta{ + InnerTxns: []sdk.SignedTxnWithAD{ + { + SignedTxn: sdk.SignedTxn{ + Txn: sdk.Transaction{ + Header: sdk.Header{ + Sender: sampleAddr1, + LastValid: 10, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + // 3rd txn in group 2 + { + SignedTxnWithAD: sdk.SignedTxnWithAD{ + SignedTxn: sdk.SignedTxn{ + AuthAddr: sampleAddr1, + Txn: sdk.Transaction{ + Header: sdk.Header{ + Sender: sampleAddr2, + Group: sdk.Digest{2}, + GenesisID: "group2", + }, + }, + }, + }, + }, + // a txn with inner txn + { + SignedTxnWithAD: sdk.SignedTxnWithAD{ + SignedTxn: sdk.SignedTxn{ + AuthAddr: sampleAddr1, + Txn: sdk.Transaction{ + PaymentTxnFields: sdk.PaymentTxnFields{ + Receiver: sampleAddr1, + Amount: 1, + }, + Header: sdk.Header{ + Sender: sampleAddr1, + Note: []byte("I don't have a group id."), + }, + }, + }, + ApplyData: sdk.ApplyData{ + EvalDelta: sdk.EvalDelta{ + InnerTxns: []sdk.SignedTxnWithAD{ + { + SignedTxn: sdk.SignedTxn{ + Txn: sdk.Transaction{ + Header: sdk.Header{ + Sender: sampleAddr1, + }, + }, + }, + }, + }, + }, + }, + }, + }, + // a simple txn + { + SignedTxnWithAD: sdk.SignedTxnWithAD{ + SignedTxn: sdk.SignedTxn{ + AuthAddr: sampleAddr1, + Txn: sdk.Transaction{ + PaymentTxnFields: sdk.PaymentTxnFields{ + Receiver: sampleAddr2, + Amount: 120, + }, + Header: sdk.Header{ + Sender: sampleAddr1, + }, + }, + }, + }, + }, + } +} + +func TestFilterProcessor_OmitGroupedTxnsDefault(t *testing.T) { + sampleAddr := sdk.Address{2} + bd := data.BlockData{} + bd.Payset = append(bd.Payset, + createPaysetGroupedTxns()..., + ) + { + // matched txns and txns in the same group are returned + cfg := `--- +filters: + - any: + - tag: txn.amt + expression-type: greater-than + expression: 100 +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + + output, err := fp.Process(bd) + require.NoError(t, err) + // txns in the same group should be returned + require.Equal(t, 3, len(output.Payset)) + // txns in group 1 + assert.Equal(t, bd.Payset[0], output.Payset[0]) + assert.Equal(t, bd.Payset[1], output.Payset[1]) + // a payment txn + assert.Equal(t, bd.Payset[6], output.Payset[2]) + } + + { + // multiple matched txns and their grouped txns + cfg := `--- +filters: + - any: + - tag: txn.snd + expression-type: equal + expression: "` + sampleAddr.String() + `" +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + output, err := fp.Process(bd) + require.NoError(t, err) + // both txn groups should be returned + require.Equal(t, 5, len(output.Payset)) + // group 1 txns + assert.Equal(t, bd.Payset[0], output.Payset[0]) + assert.Equal(t, bd.Payset[1], output.Payset[1]) + // group 2 txns + assert.Equal(t, bd.Payset[2], output.Payset[2]) + assert.Equal(t, bd.Payset[3], output.Payset[3]) + assert.Equal(t, bd.Payset[4], output.Payset[4]) + } + + { + // match inner txn and return its top level txn and grouped txns + cfg := `--- +search-inner: true +filters: + - any: + - tag: txn.gen + expression-type: equal + expression: "testnet" +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + output, err := fp.Process(bd) + require.NoError(t, err) + require.Equal(t, 2, len(output.Payset)) + // group 1 txns + assert.Equal(t, bd.Payset[0], output.Payset[0]) + assert.Equal(t, bd.Payset[1], output.Payset[1]) + } + + { + // match inner txn of an inner txn and return its top level txn and grouped txns + cfg := `--- +search-inner: true +filters: + - any: + - tag: txn.lv + expression-type: equal + expression: 10 +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + output, err := fp.Process(bd) + require.NoError(t, err) + require.Equal(t, 3, len(output.Payset)) + // group 2 txns + assert.Equal(t, bd.Payset[2], output.Payset[0]) + assert.Equal(t, bd.Payset[3], output.Payset[1]) + assert.Equal(t, bd.Payset[4], output.Payset[2]) + } + + { + // match last txn in the group, return grouped txns + cfg := `--- +search-inner: true +filters: + - any: + - tag: txn.gen + expression-type: equal + expression: "group2" +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + output, err := fp.Process(bd) + require.NoError(t, err) + require.Equal(t, 3, len(output.Payset)) + // group 2 txns + assert.Equal(t, bd.Payset[2], output.Payset[0]) + assert.Equal(t, bd.Payset[3], output.Payset[1]) + assert.Equal(t, bd.Payset[4], output.Payset[2]) + } +} + +func TestFilterProcessor_OmitGroupedTxnsTrue(t *testing.T) { + sampleAddr := sdk.Address{2} + + bd := data.BlockData{} + bd.Payset = append(bd.Payset, + createPaysetGroupedTxns()..., + ) + { + // matched txns are returned, exclude grouped txns + cfg := `--- +omit-group-transactions: true +filters: + - any: + - tag: txn.amt + expression-type: greater-than + expression: 100 +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + + output, err := fp.Process(bd) + require.NoError(t, err) + require.Equal(t, 2, len(output.Payset)) + // txn with groupID 1 + assert.Equal(t, bd.Payset[0], output.Payset[0]) + // a payment txn + assert.Equal(t, bd.Payset[6], output.Payset[1]) + } + + { + // return all matched txns + cfg := `--- +omit-group-transactions: true +filters: + - any: + - tag: txn.snd + expression-type: equal + expression: "` + sampleAddr.String() + `" +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + output, err := fp.Process(bd) + require.NoError(t, err) + require.Equal(t, 5, len(output.Payset)) + assert.Equal(t, bd.Payset[0], output.Payset[0]) + assert.Equal(t, bd.Payset[1], output.Payset[1]) + assert.Equal(t, bd.Payset[2], output.Payset[2]) + assert.Equal(t, bd.Payset[3], output.Payset[3]) + assert.Equal(t, bd.Payset[4], output.Payset[4]) + + } + + { + // match inner txn, exclude grouped txns + cfg := `--- +search-inner: true +omit-group-transactions: true +filters: + - any: + - tag: txn.gen + expression-type: equal + expression: "testnet" +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + output, err := fp.Process(bd) + require.NoError(t, err) + require.Equal(t, 1, len(output.Payset)) + assert.Equal(t, bd.Payset[0], output.Payset[0]) + } + + { + // match inner txn of an inner txn, exclude grouped txns + cfg := `--- +search-inner: true +omit-group-transactions: true +filters: + - any: + - tag: txn.lv + expression-type: equal + expression: 10 +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + output, err := fp.Process(bd) + require.NoError(t, err) + require.Equal(t, 1, len(output.Payset)) + assert.Equal(t, bd.Payset[3], output.Payset[0]) + } + + { + // match last txn in the group, exclude group txns + cfg := `--- +search-inner: true +omit-group-transactions: true +filters: + - any: + - tag: txn.gen + expression-type: equal + expression: "group2" +` + fp := FilterProcessor{} + err := fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(cfg), logrus.New()) + require.NoError(t, err) + output, err := fp.Process(bd) + require.NoError(t, err) + require.Equal(t, 1, len(output.Payset)) + assert.Equal(t, bd.Payset[4], output.Payset[0]) + } +} diff --git a/conduit/plugins/processors/filterprocessor/sample.yaml b/conduit/plugins/processors/filterprocessor/sample.yaml index e8733ca7..638cab5a 100644 --- a/conduit/plugins/processors/filterprocessor/sample.yaml +++ b/conduit/plugins/processors/filterprocessor/sample.yaml @@ -1,7 +1,10 @@ name: filter_processor config: + # Search inner configures whether the expression will search for matches in inner transactions. + search-inner: true + # Omit group transactions turns off group transaction matches, so only a single transaction from the group may be matched. + omit-group-transactions: true # Filters is a list of boolean expressions that can search the payset transactions. - # See README for more info. filters: - any: - tag: txn.rcv diff --git a/conduit/plugins/processors/filterprocessor/sample_test.go b/conduit/plugins/processors/filterprocessor/sample_test.go deleted file mode 100644 index 15d9213d..00000000 --- a/conduit/plugins/processors/filterprocessor/sample_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package filterprocessor - -import ( - "context" - "gopkg.in/yaml.v3" - "os" - "reflect" - "testing" - - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - - "github.com/algorand/conduit/conduit" - "github.com/algorand/conduit/conduit/plugins" - "github.com/algorand/conduit/conduit/plugins/processors" -) - -// TestFilterProcessorSampleConfigInit validates that all fields in the sample config are valid for a filter processor -func TestFilterProcessorSampleConfigInit(t *testing.T) { - - fpBuilder, err := processors.ProcessorBuilderByName("filter_processor") - assert.NoError(t, err) - - sampleConfigStr, err := os.ReadFile("sample.yaml") - assert.NoError(t, err) - - fp := fpBuilder.New() - err = fp.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(string(sampleConfigStr)), logrus.New()) - assert.NoError(t, err) -} - -// TestFilterProcessorSampleConfigNotEmpty tests that all fields in the sample config are filled in -func TestFilterProcessorSampleConfigNotEmpty(t *testing.T) { - - sampleConfigStr, err := os.ReadFile("sample.yaml") - assert.NoError(t, err) - pCfg := Config{} - - err = yaml.Unmarshal(sampleConfigStr, &pCfg) - assert.NoError(t, err) - - v := reflect.ValueOf(pCfg) - - for i := 0; i < v.NumField(); i++ { - assert.True(t, v.Field(i).Interface() != nil) - } - -}