Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to provide pubsub configuration for topic #382

Merged
merged 1 commit into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type EventBus struct {
client *pubsub.Client
clientOpts []option.ClientOption
topic *pubsub.Topic
topicConfig *pubsub.TopicConfig
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan error
Expand Down Expand Up @@ -84,8 +85,14 @@ func NewEventBus(projectID, appID string, options ...Option) (*EventBus, error)
if ok, err := b.topic.Exists(b.cctx); err != nil {
return nil, err
} else if !ok {
if b.topic, err = b.client.CreateTopic(b.cctx, name); err != nil {
return nil, err
if b.topicConfig != nil {
if b.topic, err = b.client.CreateTopicWithConfig(b.cctx, name, b.topicConfig); err != nil {
return nil, err
}
} else {
if b.topic, err = b.client.CreateTopic(b.cctx, name); err != nil {
return nil, err
}
}
}

Expand Down Expand Up @@ -115,6 +122,16 @@ func WithPubSubOptions(opts ...option.ClientOption) Option {
}
}

// WithTopicOptions adds the options to the pubsub.TopicConfig.
// This allows control over the Topic creation, including message retention.
func WithTopicOptions(topicConfig *pubsub.TopicConfig) Option {
return func(b *EventBus) error {
b.topicConfig = topicConfig

return nil
}
}

// HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandlerType() eh.EventHandlerType {
return "eventbus"
Expand Down
49 changes: 49 additions & 0 deletions eventbus/gcp/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"cloud.google.com/go/pubsub"
eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/eventbus"
)
Expand Down Expand Up @@ -59,6 +60,26 @@ func TestEventBusIntegration(t *testing.T) {
eventbus.AcceptanceTest(t, bus1, bus2, time.Second)
}

func TestEventBusWithConfigIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

bus1, appID, err := newTestEventBusWithTopicConfig("")
if err != nil {
t.Fatal("there should be no error:", err)
}

bus2, _, err := newTestEventBusWithTopicConfig(appID)
if err != nil {
t.Fatal("there should be no error:", err)
}

t.Logf("using topic: %s_events", appID)

eventbus.AcceptanceTest(t, bus1, bus2, time.Second)
}

func TestEventBusLoadtest(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
Expand Down Expand Up @@ -108,3 +129,31 @@ func newTestEventBus(appID string) (eh.EventBus, string, error) {

return bus, appID, nil
}

func newTestEventBusWithTopicConfig(appID string) (eh.EventBus, string, error) {
// Connect to localhost if not running inside docker
if os.Getenv("PUBSUB_EMULATOR_HOST") == "" {
os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:8793")
}

// Get a random app ID.
if appID == "" {
bts := make([]byte, 8)
if _, err := rand.Read(bts); err != nil {
return nil, "", fmt.Errorf("could not randomize app ID: %w", err)
}

appID = "app-" + hex.EncodeToString(bts)
}

// Create an empty config. The emulator doesn't support configurable message retention.
topicConfig := &pubsub.TopicConfig{
// RetentionDuration: 7 * 24 * time.Hour,
}
bus, err := NewEventBus("project_id", appID, WithTopicOptions(topicConfig))
if err != nil {
return nil, "", fmt.Errorf("could not create event bus: %w", err)
}

return bus, appID, nil
}