diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index ca102b5637..28df1d5c5e 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -129,7 +129,7 @@ var _ event.Subscription = (*sub)(nil) // publishers to get blocked. CancelFunc is guaranteed to return after last send // to the channel func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) { - var settings subSettings + settings := subSettings(subSettingsDefault) for _, opt := range opts { if err := opt(&settings); err != nil { return nil, err @@ -184,6 +184,7 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt // emit(EventT{}) func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e event.Emitter, err error) { var settings emitterSettings + for _, opt := range opts { if err := opt(&settings); err != nil { return nil, err diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index deddc6b558..0f0ff6feea 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -25,6 +25,17 @@ func (EventA) String() string { return "Oh, Hello" } +func TestDefaultSubIsBuffered(t *testing.T) { + bus := NewBus() + s, err := bus.Subscribe(new(EventA)) + if err != nil { + t.Fatal(err) + } + if cap(s.(*sub).ch) == 0 { + t.Fatalf("without any options subscribe should be buffered. was %d", cap(s.(*sub).ch)) + } +} + func TestEmit(t *testing.T) { bus := NewBus() sub, err := bus.Subscribe(new(EventA)) diff --git a/p2p/host/eventbus/opts.go b/p2p/host/eventbus/opts.go index 50c673685b..921923500e 100644 --- a/p2p/host/eventbus/opts.go +++ b/p2p/host/eventbus/opts.go @@ -4,6 +4,10 @@ type subSettings struct { buffer int } +var subSettingsDefault = subSettings{ + buffer: 16, +} + func BufSize(n int) func(interface{}) error { return func(s interface{}) error { s.(*subSettings).buffer = n