Skip to content

Commit

Permalink
restore gRPC GetMempoolTx
Browse files Browse the repository at this point in the history
This was removed as part of PR #358 (commit "remove gRPC GetMempoolTx")
but should not have been, so it's being restored.
  • Loading branch information
Larry Ruane authored and LarryRuane committed Dec 9, 2021
1 parent c7b1be9 commit 83bb199
Show file tree
Hide file tree
Showing 5 changed files with 511 additions and 159 deletions.
55 changes: 55 additions & 0 deletions frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,58 @@ func TestNewZRPCFromConf(t *testing.T) {
t.Fatal("NewZRPCFromClient unexpected success")
}
}

func TestMempoolFilter(t *testing.T) {
txidlist := []string{
"2e819d0bab5c819dc7d5f92d1bfb4127ce321daf847f6602",
"29e594c312eee49bc2c9ad37367ba58f857c4a7387ec9715",
"d4d090e60bf9141c6573f0598b84cc1f9817543e55a4d84d",
"d4714779c6dd32a72077bd79d4a70cb2153b552d7addec15",
"9839c1d4deca000656caff57c1f720f4fbd114b52239edde",
"ce5a28854a509ab309faa433542e73414fef6e903a3d52f5",
}
exclude := []string{
"98aa", // common prefix (98) but no match
"19", // no match
"29", // one match (should not appear)
"d4", // 2 matches (both should appear in result)
"ce5a28854a509ab309faa433542e73414fef6e903a3d52f5", // exact match
"ce5a28854a509ab309faa433542e73414fef6e903a3d52f500", // extra stuff ignored
}
expected := []string{
"2e819d0bab5c819dc7d5f92d1bfb4127ce321daf847f6602",
"9839c1d4deca000656caff57c1f720f4fbd114b52239edde",
"d4714779c6dd32a72077bd79d4a70cb2153b552d7addec15",
"d4d090e60bf9141c6573f0598b84cc1f9817543e55a4d84d",
}
actual := MempoolFilter(txidlist, exclude)
if len(actual) != len(expected) {
t.Fatal("mempool: wrong number of filter results")
}
for i := 0; i < len(actual); i++ {
if actual[i] != expected[i] {
t.Fatal(fmt.Sprintf("mempool: expected: %s actual: %s",
expected[i], actual[i]))
}
}
// If the exclude list is empty, return the entire mempool.
actual = MempoolFilter(txidlist, []string{})
expected = []string{
"29e594c312eee49bc2c9ad37367ba58f857c4a7387ec9715",
"2e819d0bab5c819dc7d5f92d1bfb4127ce321daf847f6602",
"9839c1d4deca000656caff57c1f720f4fbd114b52239edde",
"ce5a28854a509ab309faa433542e73414fef6e903a3d52f5",
"d4714779c6dd32a72077bd79d4a70cb2153b552d7addec15",
"d4d090e60bf9141c6573f0598b84cc1f9817543e55a4d84d",
}
if len(actual) != len(expected) {
t.Fatal("mempool: wrong number of filter results")
}
for i := 0; i < len(actual); i++ {
if actual[i] != expected[i] {
t.Fatal(fmt.Sprintf("mempool: expected: %s actual: %s",
expected[i], actual[i]))
}
}

}
131 changes: 131 additions & 0 deletions frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"errors"
"io"
"regexp"
"sort"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -391,6 +392,134 @@ func (s *lwdStreamer) GetMempoolStream(_empty *walletrpc.Empty, resp walletrpc.C
return err
}

// Key is 32-byte txid (as a 64-character string), data is pointer to compact tx.
var mempoolMap *map[string]*walletrpc.CompactTx
var mempoolList []string

// Last time we pulled a copy of the mempool from zcashd.
var lastMempool time.Time

func (s *lwdStreamer) GetMempoolTx(exclude *walletrpc.Exclude, resp walletrpc.CompactTxStreamer_GetMempoolTxServer) error {
if time.Now().Sub(lastMempool).Seconds() >= 2 {
lastMempool = time.Now()
// Refresh our copy of the mempool.
params := make([]json.RawMessage, 0)
result, rpcErr := common.RawRequest("getrawmempool", params)
if rpcErr != nil {
return rpcErr
}
err := json.Unmarshal(result, &mempoolList)
if err != nil {
return err
}
newmempoolMap := make(map[string]*walletrpc.CompactTx)
if mempoolMap == nil {
mempoolMap = &newmempoolMap
}
for _, txidstr := range mempoolList {
if ctx, ok := (*mempoolMap)[txidstr]; ok {
// This ctx has already been fetched, copy pointer to it.
newmempoolMap[txidstr] = ctx
continue
}
txidJSON, err := json.Marshal(txidstr)
if err != nil {
return err
}
// The "0" is because we only need the raw hex, which is returned as
// just a hex string, and not even a json string (with quotes).
params := []json.RawMessage{txidJSON, json.RawMessage("0")}
result, rpcErr := common.RawRequest("getrawtransaction", params)
if rpcErr != nil {
// Not an error; mempool transactions can disappear
continue
}
// strip the quotes
var txStr string
err = json.Unmarshal(result, &txStr)
if err != nil {
return err
}

// conver to binary
txBytes, err := hex.DecodeString(txStr)
if err != nil {
return err
}
tx := parser.NewTransaction()
txdata, err := tx.ParseFromSlice(txBytes)
if len(txdata) > 0 {
return errors.New("extra data deserializing transaction")
}
newmempoolMap[txidstr] = &walletrpc.CompactTx{}
if tx.HasShieldedElements() {
newmempoolMap[txidstr] = tx.ToCompact( /* height */ 0)
}
}
mempoolMap = &newmempoolMap
}
excludeHex := make([]string, len(exclude.Txid))
for i := 0; i < len(exclude.Txid); i++ {
excludeHex[i] = hex.EncodeToString(parser.Reverse(exclude.Txid[i]))
}
for _, txid := range MempoolFilter(mempoolList, excludeHex) {
tx := (*mempoolMap)[txid]
if len(tx.Hash) > 0 {
err := resp.Send(tx)
if err != nil {
return err
}
}
}
return nil
}

// Return the subset of items that aren't excluded, but
// if more than one item matches an exclude entry, return
// all those items.
func MempoolFilter(items, exclude []string) []string {
sort.Slice(items, func(i, j int) bool {
return items[i] < items[j]
})
sort.Slice(exclude, func(i, j int) bool {
return exclude[i] < exclude[j]
})
// Determine how many items match each exclude item.
nmatches := make([]int, len(exclude))
// is the exclude string less than the item string?
lessthan := func(e, i string) bool {
l := len(e)
if l > len(i) {
l = len(i)
}
return e < i[0:l]
}
ei := 0
for _, item := range items {
for ei < len(exclude) && lessthan(exclude[ei], item) {
ei++
}
match := ei < len(exclude) && strings.HasPrefix(item, exclude[ei])
if match {
nmatches[ei]++
}
}

// Add each item that isn't uniquely excluded to the results.
tosend := make([]string, 0)
ei = 0
for _, item := range items {
for ei < len(exclude) && lessthan(exclude[ei], item) {
ei++
}
match := ei < len(exclude) && strings.HasPrefix(item, exclude[ei])
if !match || nmatches[ei] > 1 {
tosend = append(tosend, item)
}
}
return tosend
}

func getAddressUtxos(arg *walletrpc.GetAddressUtxosArg, f func(*walletrpc.GetAddressUtxosReply) error) error {
for _, a := range arg.Addresses {
if err := checkTaddress(a); err != nil {
Expand Down Expand Up @@ -501,6 +630,8 @@ func (s *DarksideStreamer) Reset(ctx context.Context, ms *walletrpc.DarksideMeta
if err != nil {
return nil, err
}
mempoolMap = nil
mempoolList = nil
return &walletrpc.Empty{}, nil
}

Expand Down
Loading

0 comments on commit 83bb199

Please sign in to comment.