Skip to content

Commit

Permalink
refactor(hooks): use external pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Apr 23, 2020
1 parent 1c39fe6 commit 32234c6
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 278 deletions.
17 changes: 13 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ module github.com/ipfs/go-graphsync
go 1.12

require (
github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6
github.com/filecoin-project/go-fil-markets v0.0.0-20200408062434-d92f329a6428
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.5
Expand All @@ -16,6 +17,7 @@ require (
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-ipld-cbor v0.0.4 // indirect
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log v1.0.2
github.com/ipfs/go-merkledag v0.3.1
Expand All @@ -26,8 +28,15 @@ require (
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-libp2p v0.6.0
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-record v0.1.1 // indirect
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multihash v0.0.13
github.com/stretchr/testify v1.4.0
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a // indirect
github.com/smartystreets/assertions v1.0.1 // indirect
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
github.com/stretchr/testify v1.5.1
github.com/whyrusleeping/cbor-gen v0.0.0-20200402171437-3d27c146c105 // indirect
go.uber.org/multierr v1.4.0 // indirect
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
golang.org/x/tools v0.0.0-20191216173652-a0e659d51361 // indirect
)
82 changes: 15 additions & 67 deletions go.sum

Large diffs are not rendered by default.

51 changes: 20 additions & 31 deletions requestmanager/hooks/requesthooks.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,40 @@
package hooks

import (
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-graphsync"
"github.com/ipld/go-ipld-prime/traversal"
peer "github.com/libp2p/go-libp2p-core/peer"
)

type requestHook struct {
key uint64
hook graphsync.OnOutgoingRequestHook
}

// OutgoingRequestHooks is a set of incoming request hooks that can be processed
type OutgoingRequestHooks struct {
nextKey uint64
hooksLk sync.RWMutex
hooks []requestHook
pubSub *pubsub.PubSub
}

type internalRequestHookEvent struct {
p peer.ID
request graphsync.RequestData
hookActions *requestHookActions
}

func requestHooksDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalRequestHookEvent)
hook := subscriberFn.(graphsync.OnOutgoingRequestHook)
hook(ie.p, ie.request, ie.hookActions)
return nil
}

// NewRequestHooks returns a new list of incoming request hooks
func NewRequestHooks() *OutgoingRequestHooks {
return &OutgoingRequestHooks{}
return &OutgoingRequestHooks{
pubSub: pubsub.New(requestHooksDispatcher),
}
}

// Register registers an extension to process outgoing requests
func (orh *OutgoingRequestHooks) Register(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
orh.hooksLk.Lock()
rh := requestHook{orh.nextKey, hook}
orh.nextKey++
orh.hooks = append(orh.hooks, rh)
orh.hooksLk.Unlock()
return func() {
orh.hooksLk.Lock()
defer orh.hooksLk.Unlock()
for i, matchHook := range orh.hooks {
if rh.key == matchHook.key {
orh.hooks = append(orh.hooks[:i], orh.hooks[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(orh.pubSub.Subscribe(hook))
}

// RequestResult is the outcome of running requesthooks
Expand All @@ -52,12 +45,8 @@ type RequestResult struct {

// ProcessRequestHooks runs request hooks against an outgoing request
func (orh *OutgoingRequestHooks) ProcessRequestHooks(p peer.ID, request graphsync.RequestData) RequestResult {
orh.hooksLk.RLock()
defer orh.hooksLk.RUnlock()
rha := &requestHookActions{}
for _, requestHook := range orh.hooks {
requestHook.hook(p, request, rha)
}
_ = orh.pubSub.Publish(internalRequestHookEvent{p, request, rha})
return rha.result()
}

Expand Down
56 changes: 18 additions & 38 deletions requestmanager/hooks/responsehooks.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,38 @@
package hooks

import (
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
)

type responseHook struct {
key uint64
hook graphsync.OnIncomingResponseHook
}

// IncomingResponseHooks is a set of incoming response hooks that can be processed
type IncomingResponseHooks struct {
nextKey uint64
hooksLk sync.RWMutex
hooks []responseHook
pubSub *pubsub.PubSub
}

type internalResponseHookEvent struct {
p peer.ID
response graphsync.ResponseData
rha *responseHookActions
}

func responseHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalResponseHookEvent)
hook := subscriberFn.(graphsync.OnIncomingResponseHook)
hook(ie.p, ie.response, ie.rha)
return ie.rha.err
}

// NewResponseHooks returns a new list of incoming request hooks
func NewResponseHooks() *IncomingResponseHooks {
return &IncomingResponseHooks{}
return &IncomingResponseHooks{pubSub: pubsub.New(responseHookDispatcher)}
}

// Register registers an extension to process incoming responses
func (irh *IncomingResponseHooks) Register(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
irh.hooksLk.Lock()
rh := responseHook{irh.nextKey, hook}
irh.nextKey++
irh.hooks = append(irh.hooks, rh)
irh.hooksLk.Unlock()
return func() {
irh.hooksLk.Lock()
defer irh.hooksLk.Unlock()
for i, matchHook := range irh.hooks {
if rh.key == matchHook.key {
irh.hooks = append(irh.hooks[:i], irh.hooks[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(irh.pubSub.Subscribe(hook))
}

// ResponseResult is the outcome of running response hooks
Expand All @@ -52,15 +43,8 @@ type ResponseResult struct {

// ProcessResponseHooks runs response hooks against an incoming response
func (irh *IncomingResponseHooks) ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) ResponseResult {
irh.hooksLk.Lock()
defer irh.hooksLk.Unlock()
rha := &responseHookActions{}
for _, responseHooks := range irh.hooks {
responseHooks.hook(p, response, rha)
if rha.hasError() {
break
}
}
_ = irh.pubSub.Publish(internalResponseHookEvent{p, response, rha})
return rha.result()
}

Expand All @@ -76,10 +60,6 @@ func (rha *responseHookActions) result() ResponseResult {
}
}

func (rha *responseHookActions) hasError() bool {
return rha.err != nil
}

func (rha *responseHookActions) TerminateWithError(err error) {
rha.err = err
}
Expand Down
56 changes: 19 additions & 37 deletions responsemanager/hooks/blockhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,42 @@ package hooks

import (
"errors"
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-graphsync"
peer "github.com/libp2p/go-libp2p-core/peer"
)

// ErrPaused indicates a request should stop processing, but only cause it's paused
var ErrPaused = errors.New("request has been paused")

type blockHook struct {
key uint64
hook graphsync.OnOutgoingBlockHook
}

// OutgoingBlockHooks is a set of outgoing block hooks that can be processed
type OutgoingBlockHooks struct {
hooksLk sync.RWMutex
nextKey uint64
hooks []blockHook
pubSub *pubsub.PubSub
}

type internalBlockHookEvent struct {
p peer.ID
request graphsync.RequestData
block graphsync.BlockData
bha *blockHookActions
}

func blockHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalBlockHookEvent)
hook := subscriberFn.(graphsync.OnOutgoingBlockHook)
hook(ie.p, ie.request, ie.block, ie.bha)
return ie.bha.err
}

// NewBlockHooks returns a new list of outgoing block hooks
func NewBlockHooks() *OutgoingBlockHooks {
return &OutgoingBlockHooks{}
return &OutgoingBlockHooks{pubSub: pubsub.New(blockHookDispatcher)}
}

// Register registers an hook to process outgoing blocks in a response
func (obh *OutgoingBlockHooks) Register(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
obh.hooksLk.Lock()
bh := blockHook{obh.nextKey, hook}
obh.nextKey++
obh.hooks = append(obh.hooks, bh)
obh.hooksLk.Unlock()
return func() {
obh.hooksLk.Lock()
defer obh.hooksLk.Unlock()
for i, matchHook := range obh.hooks {
if bh.key == matchHook.key {
obh.hooks = append(obh.hooks[:i], obh.hooks[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(obh.pubSub.Subscribe(hook))
}

// BlockResult is the result of processing block hooks
Expand All @@ -55,15 +48,8 @@ type BlockResult struct {

// ProcessBlockHooks runs block hooks against a request and block data
func (obh *OutgoingBlockHooks) ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) BlockResult {
obh.hooksLk.RLock()
defer obh.hooksLk.RUnlock()
bha := &blockHookActions{}
for _, bh := range obh.hooks {
bh.hook(p, request, blockData, bha)
if bha.hasError() {
break
}
}
obh.pubSub.Publish(internalBlockHookEvent{p, request, blockData, bha})
return bha.result()
}

Expand All @@ -72,10 +58,6 @@ type blockHookActions struct {
extensions []graphsync.ExtensionData
}

func (bha *blockHookActions) hasError() bool {
return bha.err != nil
}

func (bha *blockHookActions) result() BlockResult {
return BlockResult{bha.err, bha.extensions}
}
Expand Down
49 changes: 18 additions & 31 deletions responsemanager/hooks/completedlisteners.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,40 @@
package hooks

import (
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-graphsync"
peer "github.com/libp2p/go-libp2p-core/peer"
)

type completedListener struct {
key uint64
listener graphsync.OnResponseCompletedListener
}

// CompletedResponseListeners is a set of listeners for completed responses
type CompletedResponseListeners struct {
listenersLk sync.RWMutex
nextKey uint64
listeners []completedListener
pubSub *pubsub.PubSub
}

type internalCompletedResponseEvent struct {
p peer.ID
request graphsync.RequestData
status graphsync.ResponseStatusCode
}

func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalCompletedResponseEvent)
listener := subscriberFn.(graphsync.OnResponseCompletedListener)
listener(ie.p, ie.request, ie.status)
return nil
}

// NewCompletedResponseListeners returns a new list of completed response listeners
func NewCompletedResponseListeners() *CompletedResponseListeners {
return &CompletedResponseListeners{}
return &CompletedResponseListeners{pubSub: pubsub.New(completedResponseDispatcher)}
}

// Register registers an listener for completed responses
func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
crl.listenersLk.Lock()
cl := completedListener{crl.nextKey, listener}
crl.nextKey++
crl.listeners = append(crl.listeners, cl)
crl.listenersLk.Unlock()
return func() {
crl.listenersLk.Lock()
defer crl.listenersLk.Unlock()
for i, matchListener := range crl.listeners {
if cl.key == matchListener.key {
crl.listeners = append(crl.listeners[:i], crl.listeners[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(listener))
}

// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
crl.listenersLk.RLock()
defer crl.listenersLk.RUnlock()
for _, listener := range crl.listeners {
listener.listener(p, request, status)
}
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status})
}
Loading

0 comments on commit 32234c6

Please sign in to comment.