Skip to content

Commit

Permalink
feat_: extract storenode cycle to go-waku api
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Sep 20, 2024
1 parent 09ee23a commit f4aa80e
Show file tree
Hide file tree
Showing 39 changed files with 1,496 additions and 1,361 deletions.
62 changes: 53 additions & 9 deletions eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"

"github.com/waku-org/go-waku/waku/v2/api/history"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/connection"
Expand Down Expand Up @@ -272,10 +274,6 @@ func (w *GethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash)
}

func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
return nil, 0, errors.New("not implemented")
}

func (w *GethWakuWrapper) ConnectionChanged(_ connection.State) {}

func (w *GethWakuWrapper) ClearEnvelopesCache() {
Expand Down Expand Up @@ -312,13 +310,59 @@ func (w *wakuFilterWrapper) ID() string {
func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) {
}

func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) {
func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not implemented")
func (w *GethWakuWrapper) GetActiveStorenode() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeAvailableOneShot() <-chan struct{} {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeChanged() <-chan peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeNotWorking() <-chan struct{} {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeAvailable() <-chan peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) WaitForAvailableStoreNode(timeout time.Duration) bool {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
return errors.New("not available in WakuV1")
}

func (w *GethWakuWrapper) IsStorenodeAvailable(peerID peer.ID) bool {
panic("not available in WakuV1")

}

func (w *GethWakuWrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
panic("not available in WakuV1")

}

func (w *GethWakuWrapper) PingPeer(context.Context, peer.ID) (time.Duration, error) {
return 0, errors.New("not available in WakuV1")
func (w *GethWakuWrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
panic("not available in WakuV1")
}
109 changes: 69 additions & 40 deletions eth-node/bridge/geth/wakuv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"

"github.com/waku-org/go-waku/waku/v2/api/history"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/store"

Expand Down Expand Up @@ -173,39 +174,6 @@ func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.Privat
}, id), nil
}

func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
options := []store.RequestOption{
store.WithPaging(false, uint64(r.Limit)),
}

var cursor []byte
if r.StoreCursor != nil {
cursor = r.StoreCursor
}

contentTopics := []string{}
for _, topic := range r.ContentTopics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic).ContentTopic())
}

query := store.FilterCriteria{
TimeStart: proto.Int64(int64(r.From) * int64(time.Second)),
TimeEnd: proto.Int64(int64(r.To) * int64(time.Second)),
ContentFilter: protocol.NewContentFilter(w.waku.GetPubsubTopic(r.PubsubTopic), contentTopics...),
}

pbCursor, envelopesCount, err := w.waku.Query(ctx, peerID, query, cursor, options, processEnvelopes)
if err != nil {
return nil, 0, err
}

if pbCursor != nil {
return pbCursor, envelopesCount, nil
}

return nil, envelopesCount, nil
}

func (w *gethWakuV2Wrapper) StartDiscV5() error {
return w.waku.StartDiscV5()
}
Expand Down Expand Up @@ -286,7 +254,7 @@ func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSub
func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []types.TopicType) error {
var cTopics []string
for _, ct := range contentTopics {
cTopics = append(cTopics, wakucommon.TopicType(ct).ContentTopic())
cTopics = append(cTopics, wakucommon.BytesToTopic(ct.Bytes()).ContentTopic())
}
pubsubTopic = w.waku.GetPubsubTopic(pubsubTopic)
w.waku.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic, cTopics)
Expand Down Expand Up @@ -335,14 +303,75 @@ func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) {
w.waku.ConfirmMessageDelivered(hashes)
}

func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) {
w.waku.SetStorePeerID(peerID)
}

func (w *gethWakuV2Wrapper) PeerID() peer.ID {
return w.waku.PeerID()
}

func (w *gethWakuV2Wrapper) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
return w.waku.PingPeer(ctx, peerID)
func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.ID {
return w.waku.StorenodeCycle.GetActiveStorenode()
}

func (w *gethWakuV2Wrapper) OnStorenodeAvailableOneShot() <-chan struct{} {
return w.waku.StorenodeCycle.StorenodeAvailableOneshotEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeChanged() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeChangedEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeNotWorking() <-chan struct{} {
return w.waku.StorenodeCycle.StorenodeNotWorkingEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeAvailableEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(timeout time.Duration) bool {
return w.waku.StorenodeCycle.WaitForAvailableStoreNode(context.TODO(), timeout)
}

func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
w.waku.StorenodeCycle.SetStorenodeConfigProvider(c)
}

func (w *gethWakuV2Wrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
pubsubTopic := w.waku.GetPubsubTopic(batch.PubsubTopic)
contentTopics := []string{}
for _, topic := range batch.Topics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic.Bytes()).ContentTopic())
}

criteria := store.FilterCriteria{
TimeStart: proto.Int64(int64(batch.From) * int64(time.Second)),
TimeEnd: proto.Int64(int64(batch.To) * int64(time.Second)),
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
}

return w.waku.HistoryRetriever.Query(ctx, criteria, storenodeID, pageLimit, shouldProcessNextPage, processEnvelopes)
}

func (w *gethWakuV2Wrapper) IsStorenodeAvailable(peerID peer.ID) bool {
return w.waku.StorenodeCycle.IsStorenodeAvailable(peerID)
}

func (w *gethWakuV2Wrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
return w.waku.StorenodeCycle.PerformStorenodeTask(fn, opts...)
}

func (w *gethWakuV2Wrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
w.waku.StorenodeCycle.Lock()
defer w.waku.StorenodeCycle.Unlock()

w.waku.StorenodeCycle.DisconnectActiveStorenode(backoff)
if shouldCycle {
w.waku.StorenodeCycle.Cycle(ctx)
}
}
54 changes: 0 additions & 54 deletions eth-node/types/mailserver.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,5 @@
package types

import (
"time"
)

const (
// MaxLimitInMessagesRequest represents the maximum number of messages
// that can be requested from the mailserver
MaxLimitInMessagesRequest = 1000
)

// MessagesRequest contains details of a request of historic messages.
type MessagesRequest struct {
// ID of the request. The current implementation requires ID to be 32-byte array,
// however, it's not enforced for future implementation.
ID []byte `json:"id"`
// From is a lower bound of time range.
From uint32 `json:"from"`
// To is a upper bound of time range.
To uint32 `json:"to"`
// Limit determines the number of messages sent by the mail server
// for the current paginated request.
Limit uint32 `json:"limit"`
// Cursor is used as starting point for paginated requests.
Cursor []byte `json:"cursor"`
// StoreCursor is used as starting point for WAKUV2 paginatedRequests
StoreCursor StoreRequestCursor `json:"storeCursor"`
// Bloom is a filter to match requested messages.
Bloom []byte `json:"bloom"`
// PubsubTopic is the gossipsub topic on which the message was broadcasted
PubsubTopic string `json:"pubsubTopic"`
// ContentTopics is a list of topics. A returned message should
// belong to one of the topics from the list.
ContentTopics [][]byte `json:"contentTopics"`
}

type StoreRequestCursor []byte

// SetDefaults sets the From and To defaults
func (r *MessagesRequest) SetDefaults(now time.Time) {
// set From and To defaults
if r.To == 0 {
r.To = uint32(now.UTC().Unix())
}

if r.From == 0 {
oneDay := uint32(86400) // -24 hours
if r.To < oneDay {
r.From = 0
} else {
r.From = r.To - oneDay
}
}
}

// MailServerResponse is the response payload sent by the mailserver.
type MailServerResponse struct {
LastEnvelopeHash Hash
Expand Down
4 changes: 4 additions & 0 deletions eth-node/types/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (t TopicType) String() string {
return EncodeHex(t[:])
}

func (t TopicType) Bytes() []byte {
return TopicTypeToByteArray(t)
}

// MarshalText returns the hex representation of t.
func (t TopicType) MarshalText() ([]byte, error) {
return HexBytes(t[:]).MarshalText()
Expand Down
57 changes: 49 additions & 8 deletions eth-node/types/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/pborman/uuid"

"github.com/waku-org/go-waku/waku/v2/api/history"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/connection"
Expand Down Expand Up @@ -176,9 +178,6 @@ type Waku interface {
Unsubscribe(ctx context.Context, id string) error
UnsubscribeMany(ids []string) error

// RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages
RequestStoreMessages(ctx context.Context, peerID peer.ID, request MessagesRequest, processEnvelopes bool) (StoreRequestCursor, int, error)

// ProcessingP2PMessages indicates whether there are in-flight p2p messages
ProcessingP2PMessages() bool

Expand All @@ -194,12 +193,54 @@ type Waku interface {
// ConfirmMessageDelivered updates a message has been delivered in waku
ConfirmMessageDelivered(hash []common.Hash)

// SetStorePeerID updates the peer id of store node
SetStorePeerID(peerID peer.ID)

// PeerID returns node's PeerID
PeerID() peer.ID

// PingPeer returns the reply time
PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error)
// GetActiveStorenode returns the peer ID of the currently active storenode. It will be empty if no storenode is active
GetActiveStorenode() peer.ID

// OnStorenodeAvailableOneShot returns a channel that will be triggered only once when a storenode becomes available
OnStorenodeAvailableOneShot() <-chan struct{}

// OnStorenodeChanged is triggered when a new storenode is promoted to become the active storenode or when the active storenode is removed
OnStorenodeChanged() <-chan peer.ID

// OnStorenodeNotWorking is triggered when the last active storenode fails to return results consistently
OnStorenodeNotWorking() <-chan struct{}

// OnStorenodeAvailable is triggered when there is a new active storenode selected
OnStorenodeAvailable() <-chan peer.ID

// WaitForAvailableStoreNode will wait for a storenode to be available until `timeout` happens
WaitForAvailableStoreNode(timeout time.Duration) bool

// SetStorenodeConfigProvider will set the configuration provider for the storenode cycle
SetStorenodeConfigProvider(c history.StorenodeConfigProvider)

// ProcessMailserverBatch will receive a criteria and storenode and execute a query
ProcessMailserverBatch(
ctx context.Context,
batch MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error

// IsStorenodeAvailable is used to determine whether a storenode is available or not
IsStorenodeAvailable(peerID peer.ID) bool

PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error

// DisconnectActiveStorenode will trigger a disconnection of the active storenode, and potentially execute a cycling so a new storenode is promoted
DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool)
}

type MailserverBatch struct {
From uint32
To uint32
Cursor string
PubsubTopic string
Topics []TopicType
ChatIDs []string
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da
github.com/waku-org/go-waku v0.8.1-0.20240920190728-3988fc6ec319
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2137,8 +2137,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da h1:bkAJVlJL4Ba83frABWjI9p9MeLGmEHuD/QcjYu3HNbQ=
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-waku v0.8.1-0.20240920190728-3988fc6ec319 h1:JmaIUFCta0sCqt4WaK5F3yUnFQ8nKGD0ie0cC2IGhmc=
github.com/waku-org/go-waku v0.8.1-0.20240920190728-3988fc6ec319/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
Expand Down
Loading

0 comments on commit f4aa80e

Please sign in to comment.