-
Notifications
You must be signed in to change notification settings - Fork 87
/
Copy pathmempool.go
136 lines (119 loc) · 3.74 KB
/
mempool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package common
import (
"encoding/json"
"sync"
"time"
"github.com/zcash/lightwalletd/walletrpc"
)
type txid string
var (
// Set of mempool txids that have been seen during the current block interval.
// The zcashd RPC `getrawmempool` returns the entire mempool each time, so
// this allows us to ignore the txids that we've already seen.
g_txidSeen map[txid]struct{} = map[txid]struct{}{}
// List of transactions during current block interval, in order received. Each
// client thread can keep an index into this slice to record which transactions
// it's sent back to the client (everything before that index). The g_txidSeen
// map allows this list to not contain duplicates.
g_txList []*walletrpc.RawTransaction
// The most recent absolute time that we fetched the mempool and the latest
// (tip) block hash (so we know when a new block has been mined).
g_lastTime time.Time
// The most recent zcashd getblockchaininfo reply, for height and best block
// hash (tip) which is used to detect when a new block arrives.
g_lastBlockChainInfo *ZcashdRpcReplyGetblockchaininfo = &ZcashdRpcReplyGetblockchaininfo{}
// Mutex to protect the above variables.
g_lock sync.Mutex
)
func GetMempool(sendToClient func(*walletrpc.RawTransaction) error) error {
g_lock.Lock()
index := 0
// Stay in this function until the tip block hash changes.
stayHash := g_lastBlockChainInfo.BestBlockHash
// Wait for more transactions to be added to the list
for {
// Don't fetch the mempool more often than every 2 seconds.
now := Time.Now()
if now.After(g_lastTime.Add(2 * time.Second)) {
blockChainInfo, err := GetBlockChainInfo()
if err != nil {
g_lock.Unlock()
return err
}
if g_lastBlockChainInfo.BestBlockHash != blockChainInfo.BestBlockHash {
// A new block has arrived
g_lastBlockChainInfo = blockChainInfo
// We're the first thread to notice, clear cached state.
g_txidSeen = map[txid]struct{}{}
g_txList = []*walletrpc.RawTransaction{}
g_lastTime = time.Time{}
break
}
if err = refreshMempoolTxns(); err != nil {
g_lock.Unlock()
return err
}
g_lastTime = now
}
// Send transactions we haven't sent yet, best to not do so while
// holding the mutex, since this call may get flow-controlled.
toSend := g_txList[index:]
index = len(g_txList)
g_lock.Unlock()
for _, tx := range toSend {
if err := sendToClient(tx); err != nil {
return err
}
}
Time.Sleep(200 * time.Millisecond)
g_lock.Lock()
if g_lastBlockChainInfo.BestBlockHash != stayHash {
break
}
}
g_lock.Unlock()
return nil
}
// RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients
func refreshMempoolTxns() error {
params := []json.RawMessage{}
result, rpcErr := RawRequest("getrawmempool", params)
if rpcErr != nil {
return rpcErr
}
var mempoolList []string
err := json.Unmarshal(result, &mempoolList)
if err != nil {
return err
}
// Fetch all new mempool txns and add them into `newTxns`
for _, txidstr := range mempoolList {
if _, ok := g_txidSeen[txid(txidstr)]; ok {
// We've already fetched this transaction
continue
}
// We haven't fetched this transaction already.
g_txidSeen[txid(txidstr)] = struct{}{}
txidJSON, err := json.Marshal(txidstr)
if err != nil {
return err
}
params := []json.RawMessage{txidJSON, json.RawMessage("1")}
result, rpcErr := RawRequest("getrawtransaction", params)
if rpcErr != nil {
// Not an error; mempool transactions can disappear
continue
}
rawtx, err := ParseRawTransaction(result)
if err != nil {
return err
}
// Skip any transaction that has been mined since the list of txids
// was retrieved.
if (rawtx.Height != 0) {
continue;
}
g_txList = append(g_txList, rawtx)
}
return nil
}