-
-
Notifications
You must be signed in to change notification settings - Fork 604
/
Copy pathconsuming.go
58 lines (49 loc) · 2.12 KB
/
consuming.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package consuming
import (
"context"
"fmt"
"github.com/centrifugal/centrifugo/internal/apiproto"
"github.com/centrifugal/centrifugo/internal/configtypes"
"github.com/centrifugal/centrifugo/internal/service"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"
)
type ConsumerConfig = configtypes.Consumer
type Dispatcher interface {
Dispatch(ctx context.Context, method string, data []byte) error
Publish(ctx context.Context, req *apiproto.PublishRequest) error
Broadcast(ctx context.Context, req *apiproto.BroadcastRequest) error
}
func New(nodeID string, dispatcher Dispatcher, configs []ConsumerConfig) ([]service.Service, error) {
metrics := newCommonMetrics(prometheus.DefaultRegisterer)
var services []service.Service
for _, config := range configs {
if !config.Enabled { // Important to keep this check inside specific type for proper config validation.
log.Info().Str("consumer_name", config.Name).Str("consumer_type", config.Type).Msg("consumer is not enabled, skip")
continue
}
if config.Type == configtypes.ConsumerTypePostgres {
consumer, err := NewPostgresConsumer(config.Name, dispatcher, config.Postgres, metrics)
if err != nil {
return nil, fmt.Errorf("error initializing PostgreSQL consumer (%s): %w", config.Name, err)
}
log.Info().Str("consumer_name", config.Name).Msg("running consumer")
services = append(services, consumer)
} else if config.Type == configtypes.ConsumerTypeKafka {
consumer, err := NewKafkaConsumer(config.Name, nodeID, dispatcher, config.Kafka, metrics)
if err != nil {
return nil, fmt.Errorf("error initializing Kafka consumer (%s): %w", config.Name, err)
}
log.Info().Str("consumer_name", config.Name).Msg("running consumer")
services = append(services, consumer)
} else {
return nil, fmt.Errorf("unknown consumer type: %s", config.Type)
}
log.Info().Str("consumer_name", config.Name).Str("consumer_type", config.Type).Msg("running consumer")
}
for _, config := range configs {
metrics.processedTotal.WithLabelValues(config.Name).Add(0)
metrics.errorsTotal.WithLabelValues(config.Name).Add(0)
}
return services, nil
}