Skip to content

Commit

Permalink
ignore pending tx
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Nov 1, 2024
1 parent e909eaf commit 7e3a2b7
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions rpc/stream/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ type RPCStream struct {
logger log.Logger
txDecoder sdk.TxDecoder

headerStream *Stream[RPCHeader]
// headerStream/logStream are backed by cometbft event subscription
headerStream *Stream[RPCHeader]
logStream *Stream[*ethtypes.Log]

// pendingTxStream is backed by check-tx ante handler
pendingTxStream *Stream[common.Hash]
logStream *Stream[*ethtypes.Log]

wg sync.WaitGroup
validatorAccount validatorAccountFunc
Expand All @@ -76,17 +79,17 @@ func NewRPCStreams(
logger: logger,
txDecoder: txDecoder,
validatorAccount: validatorAccount,
pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
}
}

func (s *RPCStream) init() {
func (s *RPCStream) initSubscriptions() {
if s.headerStream != nil {
// already initialized
return
}

s.headerStream = NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity)
s.pendingTxStream = NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity)
s.logStream = NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity)

ctx := context.Background()
Expand Down Expand Up @@ -121,17 +124,16 @@ func (s *RPCStream) Close() error {
}

func (s *RPCStream) HeaderStream() *Stream[RPCHeader] {
s.init()
s.initSubscriptions()
return s.headerStream
}

func (s *RPCStream) PendingTxStream() *Stream[common.Hash] {
s.init()
return s.pendingTxStream
}

func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] {
s.init()
s.initSubscriptions()
return s.logStream
}

Expand Down

0 comments on commit 7e3a2b7

Please sign in to comment.