diff --git a/eventbus/gcp/eventbus.go b/eventbus/gcp/eventbus.go index 17a0b8b9..7923b309 100644 --- a/eventbus/gcp/eventbus.go +++ b/eventbus/gcp/eventbus.go @@ -36,6 +36,7 @@ type EventBus struct { client *pubsub.Client clientOpts []option.ClientOption topic *pubsub.Topic + topicConfig *pubsub.TopicConfig registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex errCh chan error @@ -84,8 +85,14 @@ func NewEventBus(projectID, appID string, options ...Option) (*EventBus, error) if ok, err := b.topic.Exists(b.cctx); err != nil { return nil, err } else if !ok { - if b.topic, err = b.client.CreateTopic(b.cctx, name); err != nil { - return nil, err + if b.topicConfig != nil { + if b.topic, err = b.client.CreateTopicWithConfig(b.cctx, name, b.topicConfig); err != nil { + return nil, err + } + } else { + if b.topic, err = b.client.CreateTopic(b.cctx, name); err != nil { + return nil, err + } } } @@ -115,6 +122,16 @@ func WithPubSubOptions(opts ...option.ClientOption) Option { } } +// WithTopicOptions adds the options to the pubsub.TopicConfig. +// This allows control over the Topic creation, including message retention. +func WithTopicOptions(topicConfig *pubsub.TopicConfig) Option { + return func(b *EventBus) error { + b.topicConfig = topicConfig + + return nil + } +} + // HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface. func (b *EventBus) HandlerType() eh.EventHandlerType { return "eventbus" diff --git a/eventbus/gcp/eventbus_test.go b/eventbus/gcp/eventbus_test.go index c7a78c6a..84ef32e9 100644 --- a/eventbus/gcp/eventbus_test.go +++ b/eventbus/gcp/eventbus_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "cloud.google.com/go/pubsub" eh "github.com/looplab/eventhorizon" "github.com/looplab/eventhorizon/eventbus" ) @@ -59,6 +60,26 @@ func TestEventBusIntegration(t *testing.T) { eventbus.AcceptanceTest(t, bus1, bus2, time.Second) } +func TestEventBusWithConfigIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + bus1, appID, err := newTestEventBusWithTopicConfig("") + if err != nil { + t.Fatal("there should be no error:", err) + } + + bus2, _, err := newTestEventBusWithTopicConfig(appID) + if err != nil { + t.Fatal("there should be no error:", err) + } + + t.Logf("using topic: %s_events", appID) + + eventbus.AcceptanceTest(t, bus1, bus2, time.Second) +} + func TestEventBusLoadtest(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") @@ -108,3 +129,31 @@ func newTestEventBus(appID string) (eh.EventBus, string, error) { return bus, appID, nil } + +func newTestEventBusWithTopicConfig(appID string) (eh.EventBus, string, error) { + // Connect to localhost if not running inside docker + if os.Getenv("PUBSUB_EMULATOR_HOST") == "" { + os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:8793") + } + + // Get a random app ID. + if appID == "" { + bts := make([]byte, 8) + if _, err := rand.Read(bts); err != nil { + return nil, "", fmt.Errorf("could not randomize app ID: %w", err) + } + + appID = "app-" + hex.EncodeToString(bts) + } + + // Create an empty config. The emulator doesn't support configurable message retention. + topicConfig := &pubsub.TopicConfig{ + // RetentionDuration: 7 * 24 * time.Hour, + } + bus, err := NewEventBus("project_id", appID, WithTopicOptions(topicConfig)) + if err != nil { + return nil, "", fmt.Errorf("could not create event bus: %w", err) + } + + return bus, appID, nil +}