Skip to content

Commit

Permalink
Add new publisher for Google Pub Sub.
Browse files Browse the repository at this point in the history
  • Loading branch information
emidander committed Mar 6, 2024
1 parent 32a1984 commit 4f96b14
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 13 deletions.
10 changes: 9 additions & 1 deletion cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type eventPublisher interface {
}

// factoryPublisher represents a factory function for creating a eventPublisher.
func factoryPublisher(cfg *config.PublisherCfg, logger *slog.Logger) (eventPublisher, error) {
func factoryPublisher(ctx context.Context, cfg *config.PublisherCfg, logger *slog.Logger) (eventPublisher, error) {
switch cfg.Type {
case config.PublisherTypeKafka:
producer, err := publisher.NewProducer(cfg)
Expand Down Expand Up @@ -94,6 +94,14 @@ func factoryPublisher(cfg *config.PublisherCfg, logger *slog.Logger) (eventPubli
}

return pub, nil
case config.PublisherTypeGooglePubSub:
pubSubConn, err := publisher.NewPubSubConnection(ctx, logger, cfg.PubSubProjectID)
if err != nil {
return nil, fmt.Errorf("could not create pubsub connection: %w", err)
}

pubSubPublisher := publisher.NewGooglePubSubPublisher(pubSubConn)
return pubSubPublisher, nil
default:
return nil, fmt.Errorf("unknown publisher type: %s", cfg.Type)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
return fmt.Errorf("pgx connection: %w", err)
}

pub, err := factoryPublisher(cfg.Publisher, logger)
pub, err := factoryPublisher(c.Context, cfg.Publisher, logger)
if err != nil {
return fmt.Errorf("factory publisher: %w", err)
}
Expand Down
24 changes: 13 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
type PublisherType string

const (
PublisherTypeNats PublisherType = "nats"
PublisherTypeKafka PublisherType = "kafka"
PublisherTypeRabbitMQ PublisherType = "rabbitmq"
PublisherTypeNats PublisherType = "nats"
PublisherTypeKafka PublisherType = "kafka"
PublisherTypeRabbitMQ PublisherType = "rabbitmq"
PublisherTypeGooglePubSub PublisherType = "googlepubsub"
)

// Config for wal-listener.
Expand All @@ -39,14 +40,15 @@ type ListenerCfg struct {

// PublisherCfg represent configuration for any types publisher.
type PublisherCfg struct {
Type PublisherType `valid:"required"`
Address string `valid:"required"`
Topic string `valid:"required"`
TopicPrefix string
EnableTLS bool `json:"enable_tls"`
ClientCert string `json:"client_cert"`
ClientKey string `json:"client_key"`
CACert string `json:"ca_cert"`
Type PublisherType `valid:"required"`
Address string
Topic string `valid:"required"`
TopicPrefix string
EnableTLS bool `json:"enable_tls"`
ClientCert string `json:"client_cert"`
ClientKey string `json:"client_key"`
CACert string `json:"ca_cert"`
PubSubProjectID string
}

// DatabaseCfg path of the PostgreSQL DB config.
Expand Down
30 changes: 30 additions & 0 deletions publisher/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package publisher

import (
"context"
"github.com/goccy/go-json"
)

type GooglePubSubPublisher struct {
pubSubConnection *PubSubConnection
}

func NewGooglePubSubPublisher(pubSubConnection *PubSubConnection) *GooglePubSubPublisher {
return &GooglePubSubPublisher{
pubSubConnection,
}
}

// Publish send events, implements eventPublisher.
func (p *GooglePubSubPublisher) Publish(ctx context.Context, topic string, event Event) error {
body, err := json.Marshal(event)
if err != nil {
return err
}

return p.pubSubConnection.Publish(ctx, topic, body)
}

func (p *GooglePubSubPublisher) Close() error {
return p.pubSubConnection.Close()
}
79 changes: 79 additions & 0 deletions publisher/pubsubconnection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package publisher

import (
"context"
"fmt"
"log/slog"
"sync"

"cloud.google.com/go/pubsub"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type PubSubConnection struct {
logger *slog.Logger
client *pubsub.Client
projectID string
topics map[string]*pubsub.Topic
mu sync.RWMutex
}

func NewPubSubConnection(ctx context.Context, logger *slog.Logger, pubSubProjectId string) (*PubSubConnection, error) {
if pubSubProjectId == "" {
return nil, fmt.Errorf("project id is required for pub sub connection")
}

c, err := pubsub.NewClient(ctx, pubSubProjectId)
if err != nil {
return nil, err
}
return &PubSubConnection{
logger: logger,
client: c,
projectID: pubSubProjectId,
topics: make(map[string]*pubsub.Topic),
mu: sync.RWMutex{},
}, nil
}

func (c *PubSubConnection) getTopic(topic string) *pubsub.Topic {
c.mu.Lock()
defer c.mu.Unlock()
if top, ok := c.topics[topic]; ok {
return top
}

t := c.client.TopicInProject(topic, c.projectID)
t.PublishSettings.NumGoroutines = 1
t.PublishSettings.CountThreshold = 1
c.topics[topic] = t
return t
}

func (c *PubSubConnection) Publish(ctx context.Context, topic string, data []byte) error {
t := c.getTopic(topic)
defer t.Flush()

var res *pubsub.PublishResult
var err error
res = t.Publish(ctx, &pubsub.Message{
Data: data,
})

_, err = res.Get(ctx)
if err != nil {
c.logger.Error("Failed to publish message", "err", err)
if status.Code(err) == codes.NotFound {
return fmt.Errorf("topic not found %w", err)
} else {
return err
}
}

return nil
}

func (c *PubSubConnection) Close() error {
return c.client.Close()
}

0 comments on commit 4f96b14

Please sign in to comment.