Skip to content

Commit

Permalink
refactor(hooks): use external pubsub (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Apr 28, 2020
1 parent 6e4ddab commit 97a8cf7
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 212 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.5
Expand Down Expand Up @@ -33,7 +34,7 @@ require (
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a // indirect
github.com/smartystreets/assertions v1.0.1 // indirect
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.5.1
github.com/whyrusleeping/cbor-gen v0.0.0-20200402171437-3d27c146c105 // indirect
go.uber.org/multierr v1.4.0 // indirect
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyF
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down Expand Up @@ -539,6 +541,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
Expand Down
51 changes: 20 additions & 31 deletions requestmanager/hooks/requesthooks.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,40 @@
package hooks

import (
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-graphsync"
"github.com/ipld/go-ipld-prime/traversal"
peer "github.com/libp2p/go-libp2p-core/peer"
)

type requestHook struct {
key uint64
hook graphsync.OnOutgoingRequestHook
}

// OutgoingRequestHooks is a set of incoming request hooks that can be processed
type OutgoingRequestHooks struct {
nextKey uint64
hooksLk sync.RWMutex
hooks []requestHook
pubSub *pubsub.PubSub
}

type internalRequestHookEvent struct {
p peer.ID
request graphsync.RequestData
hookActions *requestHookActions
}

func requestHooksDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalRequestHookEvent)
hook := subscriberFn.(graphsync.OnOutgoingRequestHook)
hook(ie.p, ie.request, ie.hookActions)
return nil
}

// NewRequestHooks returns a new list of incoming request hooks
func NewRequestHooks() *OutgoingRequestHooks {
return &OutgoingRequestHooks{}
return &OutgoingRequestHooks{
pubSub: pubsub.New(requestHooksDispatcher),
}
}

// Register registers an extension to process outgoing requests
func (orh *OutgoingRequestHooks) Register(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
orh.hooksLk.Lock()
rh := requestHook{orh.nextKey, hook}
orh.nextKey++
orh.hooks = append(orh.hooks, rh)
orh.hooksLk.Unlock()
return func() {
orh.hooksLk.Lock()
defer orh.hooksLk.Unlock()
for i, matchHook := range orh.hooks {
if rh.key == matchHook.key {
orh.hooks = append(orh.hooks[:i], orh.hooks[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(orh.pubSub.Subscribe(hook))
}

// RequestResult is the outcome of running requesthooks
Expand All @@ -52,12 +45,8 @@ type RequestResult struct {

// ProcessRequestHooks runs request hooks against an outgoing request
func (orh *OutgoingRequestHooks) ProcessRequestHooks(p peer.ID, request graphsync.RequestData) RequestResult {
orh.hooksLk.RLock()
defer orh.hooksLk.RUnlock()
rha := &requestHookActions{}
for _, requestHook := range orh.hooks {
requestHook.hook(p, request, rha)
}
_ = orh.pubSub.Publish(internalRequestHookEvent{p, request, rha})
return rha.result()
}

Expand Down
56 changes: 18 additions & 38 deletions requestmanager/hooks/responsehooks.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,38 @@
package hooks

import (
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
)

type responseHook struct {
key uint64
hook graphsync.OnIncomingResponseHook
}

// IncomingResponseHooks is a set of incoming response hooks that can be processed
type IncomingResponseHooks struct {
nextKey uint64
hooksLk sync.RWMutex
hooks []responseHook
pubSub *pubsub.PubSub
}

type internalResponseHookEvent struct {
p peer.ID
response graphsync.ResponseData
rha *responseHookActions
}

func responseHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalResponseHookEvent)
hook := subscriberFn.(graphsync.OnIncomingResponseHook)
hook(ie.p, ie.response, ie.rha)
return ie.rha.err
}

// NewResponseHooks returns a new list of incoming request hooks
func NewResponseHooks() *IncomingResponseHooks {
return &IncomingResponseHooks{}
return &IncomingResponseHooks{pubSub: pubsub.New(responseHookDispatcher)}
}

// Register registers an extension to process incoming responses
func (irh *IncomingResponseHooks) Register(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
irh.hooksLk.Lock()
rh := responseHook{irh.nextKey, hook}
irh.nextKey++
irh.hooks = append(irh.hooks, rh)
irh.hooksLk.Unlock()
return func() {
irh.hooksLk.Lock()
defer irh.hooksLk.Unlock()
for i, matchHook := range irh.hooks {
if rh.key == matchHook.key {
irh.hooks = append(irh.hooks[:i], irh.hooks[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(irh.pubSub.Subscribe(hook))
}

// ResponseResult is the outcome of running response hooks
Expand All @@ -52,15 +43,8 @@ type ResponseResult struct {

// ProcessResponseHooks runs response hooks against an incoming response
func (irh *IncomingResponseHooks) ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) ResponseResult {
irh.hooksLk.Lock()
defer irh.hooksLk.Unlock()
rha := &responseHookActions{}
for _, responseHooks := range irh.hooks {
responseHooks.hook(p, response, rha)
if rha.hasError() {
break
}
}
_ = irh.pubSub.Publish(internalResponseHookEvent{p, response, rha})
return rha.result()
}

Expand All @@ -76,10 +60,6 @@ func (rha *responseHookActions) result() ResponseResult {
}
}

func (rha *responseHookActions) hasError() bool {
return rha.err != nil
}

func (rha *responseHookActions) TerminateWithError(err error) {
rha.err = err
}
Expand Down
56 changes: 19 additions & 37 deletions responsemanager/hooks/blockhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,42 @@ package hooks

import (
"errors"
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-graphsync"
peer "github.com/libp2p/go-libp2p-core/peer"
)

// ErrPaused indicates a request should stop processing, but only cause it's paused
var ErrPaused = errors.New("request has been paused")

type blockHook struct {
key uint64
hook graphsync.OnOutgoingBlockHook
}

// OutgoingBlockHooks is a set of outgoing block hooks that can be processed
type OutgoingBlockHooks struct {
hooksLk sync.RWMutex
nextKey uint64
hooks []blockHook
pubSub *pubsub.PubSub
}

type internalBlockHookEvent struct {
p peer.ID
request graphsync.RequestData
block graphsync.BlockData
bha *blockHookActions
}

func blockHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalBlockHookEvent)
hook := subscriberFn.(graphsync.OnOutgoingBlockHook)
hook(ie.p, ie.request, ie.block, ie.bha)
return ie.bha.err
}

// NewBlockHooks returns a new list of outgoing block hooks
func NewBlockHooks() *OutgoingBlockHooks {
return &OutgoingBlockHooks{}
return &OutgoingBlockHooks{pubSub: pubsub.New(blockHookDispatcher)}
}

// Register registers an hook to process outgoing blocks in a response
func (obh *OutgoingBlockHooks) Register(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
obh.hooksLk.Lock()
bh := blockHook{obh.nextKey, hook}
obh.nextKey++
obh.hooks = append(obh.hooks, bh)
obh.hooksLk.Unlock()
return func() {
obh.hooksLk.Lock()
defer obh.hooksLk.Unlock()
for i, matchHook := range obh.hooks {
if bh.key == matchHook.key {
obh.hooks = append(obh.hooks[:i], obh.hooks[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(obh.pubSub.Subscribe(hook))
}

// BlockResult is the result of processing block hooks
Expand All @@ -55,15 +48,8 @@ type BlockResult struct {

// ProcessBlockHooks runs block hooks against a request and block data
func (obh *OutgoingBlockHooks) ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) BlockResult {
obh.hooksLk.RLock()
defer obh.hooksLk.RUnlock()
bha := &blockHookActions{}
for _, bh := range obh.hooks {
bh.hook(p, request, blockData, bha)
if bha.hasError() {
break
}
}
_ = obh.pubSub.Publish(internalBlockHookEvent{p, request, blockData, bha})
return bha.result()
}

Expand All @@ -72,10 +58,6 @@ type blockHookActions struct {
extensions []graphsync.ExtensionData
}

func (bha *blockHookActions) hasError() bool {
return bha.err != nil
}

func (bha *blockHookActions) result() BlockResult {
return BlockResult{bha.err, bha.extensions}
}
Expand Down
49 changes: 18 additions & 31 deletions responsemanager/hooks/completedlisteners.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,40 @@
package hooks

import (
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-graphsync"
peer "github.com/libp2p/go-libp2p-core/peer"
)

type completedListener struct {
key uint64
listener graphsync.OnResponseCompletedListener
}

// CompletedResponseListeners is a set of listeners for completed responses
type CompletedResponseListeners struct {
listenersLk sync.RWMutex
nextKey uint64
listeners []completedListener
pubSub *pubsub.PubSub
}

type internalCompletedResponseEvent struct {
p peer.ID
request graphsync.RequestData
status graphsync.ResponseStatusCode
}

func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalCompletedResponseEvent)
listener := subscriberFn.(graphsync.OnResponseCompletedListener)
listener(ie.p, ie.request, ie.status)
return nil
}

// NewCompletedResponseListeners returns a new list of completed response listeners
func NewCompletedResponseListeners() *CompletedResponseListeners {
return &CompletedResponseListeners{}
return &CompletedResponseListeners{pubSub: pubsub.New(completedResponseDispatcher)}
}

// Register registers an listener for completed responses
func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
crl.listenersLk.Lock()
cl := completedListener{crl.nextKey, listener}
crl.nextKey++
crl.listeners = append(crl.listeners, cl)
crl.listenersLk.Unlock()
return func() {
crl.listenersLk.Lock()
defer crl.listenersLk.Unlock()
for i, matchListener := range crl.listeners {
if cl.key == matchListener.key {
crl.listeners = append(crl.listeners[:i], crl.listeners[i+1:]...)
return
}
}
}
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(listener))
}

// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
crl.listenersLk.RLock()
defer crl.listenersLk.RUnlock()
for _, listener := range crl.listeners {
listener.listener(p, request, status)
}
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status})
}
Loading

0 comments on commit 97a8cf7

Please sign in to comment.