Skip to content

Commit

Permalink
ethereum,ethclient,eth/filters: marshal ethereum.FilterQuery struct t…
Browse files Browse the repository at this point in the history
…o JSON correctly, treat -1 as "latest" and -2 as "pending" (use rules from the rpc package), unmarshal it from JSON applying defaults for missing optional fields, reuse this marshalling implementation in ethclient and eth/filters
  • Loading branch information
zenovich committed Nov 11, 2021
1 parent 476fb56 commit 6bddd1a
Show file tree
Hide file tree
Showing 7 changed files with 642 additions and 503 deletions.
139 changes: 6 additions & 133 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@ package filters

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
Expand All @@ -40,7 +36,7 @@ type filter struct {
typ Type
deadline *time.Timer // filter is inactiv when deadline triggers
hashes []common.Hash
crit FilterCriteria
crit ethereum.FilterQuery
logs []*types.Log
s *Subscription // associated subscription in event system
}
Expand Down Expand Up @@ -241,7 +237,7 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
}

// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
func (api *PublicFilterAPI) Logs(ctx context.Context, crit ethereum.FilterQuery) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -252,7 +248,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
matchedLogs = make(chan []*types.Log)
)

logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
logsSub, err := api.events.SubscribeLogs(crit, matchedLogs)
if err != nil {
return nil, err
}
Expand All @@ -278,10 +274,6 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
return rpcSub, nil
}

// FilterCriteria represents a request to create a new filter.
// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
type FilterCriteria ethereum.FilterQuery

// NewFilter creates a new filter and returns the filter id. It can be
// used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state.
Expand All @@ -295,9 +287,9 @@ type FilterCriteria ethereum.FilterQuery
// In case "fromBlock" > "toBlock" an error is returned.
//
// https://eth.wiki/json-rpc/API#eth_newfilter
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
func (api *PublicFilterAPI) NewFilter(crit ethereum.FilterQuery) (rpc.ID, error) {
logs := make(chan []*types.Log)
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
logsSub, err := api.events.SubscribeLogs(crit, logs)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -330,7 +322,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
// GetLogs returns logs matching the given argument that are stored within the state.
//
// https://eth.wiki/json-rpc/API#eth_getlogs
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) ([]*types.Log, error) {
var filter *Filter
if crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
Expand Down Expand Up @@ -462,122 +454,3 @@ func returnLogs(logs []*types.Log) []*types.Log {
}
return logs
}

// UnmarshalJSON sets *args fields with given data.
func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
type input struct {
BlockHash *common.Hash `json:"blockHash"`
FromBlock *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
Addresses interface{} `json:"address"`
Topics []interface{} `json:"topics"`
}

var raw input
if err := json.Unmarshal(data, &raw); err != nil {
return err
}

if raw.BlockHash != nil {
if raw.FromBlock != nil || raw.ToBlock != nil {
// BlockHash is mutually exclusive with FromBlock/ToBlock criteria
return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other")
}
args.BlockHash = raw.BlockHash
} else {
if raw.FromBlock != nil {
args.FromBlock = big.NewInt(raw.FromBlock.Int64())
}

if raw.ToBlock != nil {
args.ToBlock = big.NewInt(raw.ToBlock.Int64())
}
}

args.Addresses = []common.Address{}

if raw.Addresses != nil {
// raw.Address can contain a single address or an array of addresses
switch rawAddr := raw.Addresses.(type) {
case []interface{}:
for i, addr := range rawAddr {
if strAddr, ok := addr.(string); ok {
addr, err := decodeAddress(strAddr)
if err != nil {
return fmt.Errorf("invalid address at index %d: %v", i, err)
}
args.Addresses = append(args.Addresses, addr)
} else {
return fmt.Errorf("non-string address at index %d", i)
}
}
case string:
addr, err := decodeAddress(rawAddr)
if err != nil {
return fmt.Errorf("invalid address: %v", err)
}
args.Addresses = []common.Address{addr}
default:
return errors.New("invalid addresses in query")
}
}

// topics is an array consisting of strings and/or arrays of strings.
// JSON null values are converted to common.Hash{} and ignored by the filter manager.
if len(raw.Topics) > 0 {
args.Topics = make([][]common.Hash, len(raw.Topics))
for i, t := range raw.Topics {
switch topic := t.(type) {
case nil:
// ignore topic when matching logs

case string:
// match specific topic
top, err := decodeTopic(topic)
if err != nil {
return err
}
args.Topics[i] = []common.Hash{top}

case []interface{}:
// or case e.g. [null, "topic0", "topic1"]
for _, rawTopic := range topic {
if rawTopic == nil {
// null component, match all
args.Topics[i] = nil
break
}
if topic, ok := rawTopic.(string); ok {
parsed, err := decodeTopic(topic)
if err != nil {
return err
}
args.Topics[i] = append(args.Topics[i], parsed)
} else {
return fmt.Errorf("invalid topic(s)")
}
}
default:
return fmt.Errorf("invalid topic(s)")
}
}
}

return nil
}

func decodeAddress(s string) (common.Address, error) {
b, err := hexutil.Decode(s)
if err == nil && len(b) != common.AddressLength {
err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength)
}
return common.BytesToAddress(b), err
}

func decodeTopic(s string) (common.Hash, error) {
b, err := hexutil.Decode(s)
if err == nil && len(b) != common.HashLength {
err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength)
}
return common.BytesToHash(b), err
}
185 changes: 0 additions & 185 deletions eth/filters/api_test.go

This file was deleted.

Loading

0 comments on commit 6bddd1a

Please sign in to comment.