Skip to content

Commit

Permalink
chore: rename pubsub processor struct
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Yuichi Okimoto <yuichijpn@gmail.com>
  • Loading branch information
cre8ivejp committed Jan 7, 2025
1 parent a6fe791 commit 0669b83
Show file tree
Hide file tree
Showing 12 changed files with 33 additions and 33 deletions.
14 changes: 7 additions & 7 deletions pkg/subscriber/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
}
defer batchClient.Close()

processors, err := s.registerProcessorMap(
pubSubProcessors, err := s.registerPubSubProcessorMap(
ctx,
environmentClient,
mysqlClient,
Expand All @@ -337,7 +337,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
return err
}

multiPubSub, err := s.startMultiPubSub(ctx, processors, logger)
multiPubSub, err := s.startMultiPubSub(ctx, pubSubProcessors, logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func (s *server) createMySQLClient(

func (s *server) startMultiPubSub(
ctx context.Context,
processors *processor.Processors,
processors *processor.PubSubProcessors,
logger *zap.Logger,
) (*subscriber.MultiSubscriber, error) {
multiSubscriber := subscriber.NewMultiSubscriber(
Expand Down Expand Up @@ -422,7 +422,7 @@ func (s *server) startMultiPubSub(
// we should skip the error, just log it here
continue
}
multiSubscriber.AddSubscriber(subscriber.NewSubscriber(
multiSubscriber.AddSubscriber(subscriber.NewPubSubSubscriber(
name, config, p,
subscriber.WithLogger(logger),
))
Expand Down Expand Up @@ -461,7 +461,7 @@ func (s *server) startMultiPubSub(
return multiSubscriber, nil
}

func (s *server) registerProcessorMap(
func (s *server) registerPubSubProcessorMap(
ctx context.Context,
environmentClient environmentclient.Client,
mysqlClient mysql.Client,
Expand All @@ -474,8 +474,8 @@ func (s *server) registerProcessorMap(
sender notificationsender.Sender,
registerer metrics.Registerer,
logger *zap.Logger,
) (*processor.Processors, error) {
processors := processor.NewProcessors(registerer)
) (*processor.PubSubProcessors, error) {
processors := processor.NewPubSubProcessors(registerer)
writer.RegisterMetrics(registerer)

processorsConfigBytes, err := os.ReadFile(*s.processorsConfig)
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscriber/processor/auditlog_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewAuditLogPersister(
config interface{},
mysqlClient mysql.Client,
logger *zap.Logger,
) (subscriber.Processor, error) {
) (subscriber.PubSubProcessor, error) {
auditLogPersisterJsonConfig, ok := config.(map[string]interface{})
if !ok {
logger.Error("AuditLogPersister: invalid config")
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscriber/processor/domain_event_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewDomainEventInformer(
environmentClient environmentclient.Client,
sender sender.Sender,
logger *zap.Logger,
) subscriber.Processor {
) subscriber.PubSubProcessor {
return &domainEventInformer{
environmentClient: environmentClient,
sender: sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewEvaluationCountEventPersister(
mysqlClient mysql.Client,
evaluationCountCacher cache.MultiGetDeleteCountCache,
logger *zap.Logger,
) (subscriber.Processor, error) {
) (subscriber.PubSubProcessor, error) {
evaluationCountEventPersisterJsonConfig, ok := config.(map[string]interface{})
if !ok {
logger.Error("EvaluationCountEventPersister: invalid config")
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscriber/processor/events_dwh_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewEventsDWHPersister(
ftClient featureclient.Client,
persisterName string,
logger *zap.Logger,
) (subscriber.Processor, error) {
) (subscriber.PubSubProcessor, error) {
jsonConfig, ok := config.(map[string]interface{})
if !ok {
logger.Error("eventsDWHPersister: invalid config")
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscriber/processor/events_ops_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewEventsOPSPersister(
ftClient featureclient.Client,
persisterName string,
logger *zap.Logger,
) (subscriber.Processor, error) {
) (subscriber.PubSubProcessor, error) {
jsonConfig, ok := config.(map[string]interface{})
if !ok {
logger.Error("eventsOPSPersister: invalid config")
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscriber/processor/metrics_event_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type metricsEventPersister struct {
func NewMetricsEventPersister(
registerer metrics.Registerer,
logger *zap.Logger,
) subscriber.Processor {
) subscriber.PubSubProcessor {
return &metricsEventPersister{
storage: storage.NewStorage(logger, registerer),
logger: logger,
Expand Down
14 changes: 7 additions & 7 deletions pkg/subscriber/processor/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,22 @@ var (
unsupportedProcessorErr = errors.New("subscriber: unsupported processor")
)

type Processors struct {
processorMap map[string]subscriber.Processor
type PubSubProcessors struct {
processorMap map[string]subscriber.PubSubProcessor
}

func NewProcessors(r metrics.Registerer) *Processors {
func NewPubSubProcessors(r metrics.Registerer) *PubSubProcessors {
registerMetrics(r)
return &Processors{
processorMap: make(map[string]subscriber.Processor),
return &PubSubProcessors{
processorMap: make(map[string]subscriber.PubSubProcessor),
}
}

func (p *Processors) RegisterProcessor(name string, processor subscriber.Processor) {
func (p *PubSubProcessors) RegisterProcessor(name string, processor subscriber.PubSubProcessor) {
p.processorMap[name] = processor
}

func (p *Processors) GetProcessorByName(name string) (subscriber.Processor, error) {
func (p *PubSubProcessors) GetProcessorByName(name string) (subscriber.PubSubProcessor, error) {
if p, ok := p.processorMap[name]; ok {
return p, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscriber/processor/push_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewPushSender(
batchClient btclient.Client,
mysqlClient mysql.Client,
logger *zap.Logger,
) subscriber.Processor {
) subscriber.PubSubProcessor {
return &pushSender{
featureClient: featureClient,
batchClient: batchClient,
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscriber/processor/segment_user_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewSegmentUserPersister(
batchClient btclient.Client,
mysqlClient mysql.Client,
logger *zap.Logger,
) (subscriber.Processor, error) {
) (subscriber.PubSubProcessor, error) {
segmentPersisterJsonConfig, ok := config.(map[string]interface{})
if !ok {
logger.Error("SegmentUserPersister: invalid config")
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscriber/processor/user_event_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewUserEventPersister(
config interface{},
mysqlClient mysql.Client,
logger *zap.Logger,
) (subscriber.Processor, error) {
) (subscriber.PubSubProcessor, error) {
userEventPerisiterJsonConfig, ok := config.(map[string]interface{})
if !ok {
logger.Error("UserEventPersister: invalid config")
Expand Down
20 changes: 10 additions & 10 deletions pkg/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ type Subscriber interface {
Stop()
}

type Processor interface {
type PubSubProcessor interface {
Process(ctx context.Context, msgChan <-chan *puller.Message) error
}

type OnDemandProcessor interface {
Processor
PubSubProcessor
Switch(ctx context.Context) (bool, error)
}

Expand All @@ -73,26 +73,26 @@ type Configuration struct {
WorkerNum int `json:"workerNum"`
}

type subscriber struct {
type pubSubSubscriber struct {
name string
configuration Configuration
processor Processor
processor PubSubProcessor
cancel context.CancelFunc
opts options
logger *zap.Logger
}

func NewSubscriber(
func NewPubSubSubscriber(
name string,
configuration Configuration,
processor Processor,
processor PubSubProcessor,
opts ...Option,
) Subscriber {
options := defaultOptions
for _, o := range opts {
o(&options)
}
return &subscriber{
return &pubSubSubscriber{
name: name,
configuration: configuration,
processor: processor,
Expand All @@ -101,7 +101,7 @@ func NewSubscriber(
}
}

func (s subscriber) Run(ctx context.Context) {
func (s pubSubSubscriber) Run(ctx context.Context) {
s.logger.Debug("subscriber starting",
zap.String("name", s.name),
zap.String("project", s.configuration.Project),
Expand Down Expand Up @@ -134,13 +134,13 @@ func (s subscriber) Run(ctx context.Context) {
zap.String("name", s.name))
}

func (s subscriber) Stop() {
func (s pubSubSubscriber) Stop() {
if s.cancel != nil {
s.cancel()
}
}

func (s subscriber) createPuller(
func (s pubSubSubscriber) createPuller(
ctx context.Context,
) puller.RateLimitedPuller {
pubsubClient, err := pubsub.NewClient(
Expand Down

0 comments on commit 0669b83

Please sign in to comment.