Skip to content

Commit

Permalink
Register prometheus metrics only once (#1842)
Browse files Browse the repository at this point in the history
* create separate prometheus registry

* init metrics only once

---------

Co-authored-by: Lyubo Kamenov <lyubo@meroxa.io>
  • Loading branch information
lovromazgon and lyuboxa authored Sep 10, 2024
1 parent 468f998 commit fe22032
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
12 changes: 6 additions & 6 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,19 @@ type Config struct {

ConnectorPlugins map[string]sdk.Connector

dev struct {
cpuprofile string
memprofile string
blockprofile string
}

SchemaRegistry struct {
Type string

Confluent struct {
ConnectionString string
}
}

dev struct {
cpuprofile string
memprofile string
blockprofile string
}
}

func DefaultConfig() Config {
Expand Down
42 changes: 24 additions & 18 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"runtime"
"runtime/pprof"
"strings"
"sync"
"time"

"github.com/conduitio/conduit-commons/database"
Expand Down Expand Up @@ -101,8 +102,7 @@ type Runtime struct {
connectorPersister *connector.Persister
procSchemaService *procutils.SchemaService

logger log.CtxLogger
gRPCStatsHandler *promgrpc.StatsHandler
logger log.CtxLogger
}

// NewRuntime sets up a Runtime instance and primes it for start.
Expand Down Expand Up @@ -141,7 +141,7 @@ func NewRuntime(cfg Config) (*Runtime, error) {
}
}

configurePrometheus()
configureMetrics()
measure.ConduitInfo.WithValues(Version(true)).Inc()

// Start the connector persister
Expand All @@ -157,8 +157,7 @@ func NewRuntime(cfg Config) (*Runtime, error) {

connectorPersister: connectorPersister,

gRPCStatsHandler: newGRPCStatsHandler(),
logger: logger,
logger: logger,
}

err := createServices(r)
Expand Down Expand Up @@ -248,24 +247,31 @@ func createSchemaRegistry(config Config, logger log.CtxLogger, db database.DB) (
return schemaRegistry, nil
}

func newGRPCStatsHandler() *promgrpc.StatsHandler {
h := promgrpc.ServerStatsHandler()
promclient.MustRegister(h)

return h
}

func newLogger(level string, format string) log.CtxLogger {
// TODO make logger hooks configurable
l, _ := zerolog.ParseLevel(level)
f, _ := log.ParseFormat(format)
return log.InitLogger(l, f)
}

func configurePrometheus() {
registry := prometheus.NewRegistry(nil)
promclient.MustRegister(registry)
metrics.Register(registry)
var (
metricsConfigureOnce sync.Once
metricsGrpcStatsHandler *promgrpc.StatsHandler
)

// configureMetrics
func configureMetrics() *promgrpc.StatsHandler {
metricsConfigureOnce.Do(func() {
// conduit metrics
reg := prometheus.NewRegistry(nil)
metrics.Register(reg)
promclient.MustRegister(reg)

// grpc metrics
metricsGrpcStatsHandler = promgrpc.ServerStatsHandler()
promclient.MustRegister(metricsGrpcStatsHandler)
})
return metricsGrpcStatsHandler
}

// Run initializes all of Conduit's underlying services and starts the GRPC and
Expand Down Expand Up @@ -429,7 +435,7 @@ func (r *Runtime) serveGRPCAPI(ctx context.Context, t *tomb.Tomb) (net.Addr, err
grpcutil.RequestIDUnaryServerInterceptor(r.logger),
grpcutil.LoggerUnaryServerInterceptor(r.logger),
),
grpc.StatsHandler(r.gRPCStatsHandler),
grpc.StatsHandler(metricsGrpcStatsHandler),
)

pipelineAPIv1 := api.NewPipelineAPIv1(r.Orchestrator.Pipelines)
Expand Down Expand Up @@ -481,7 +487,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad
grpcutil.RequestIDUnaryServerInterceptor(r.logger),
grpcutil.LoggerUnaryServerInterceptor(r.logger),
),
grpc.StatsHandler(r.gRPCStatsHandler),
grpc.StatsHandler(metricsGrpcStatsHandler),
)

schemaServiceAPI := pconnutils.NewSchemaServiceServer(r.connSchemaService)
Expand Down
4 changes: 4 additions & 0 deletions pkg/conduit/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func TestRuntime(t *testing.T) {
t.Logf("expected error '%v', got '%v'", context.Canceled, err)
}
is.True(strings.Contains(logs.String(), "grpc API started"))

// creating a second runtime should succeed
_, err = conduit.NewRuntime(cfg)
is.NoErr(err)
}

// safeBuffer wraps bytes.Buffer and makes it safe for concurrent use.
Expand Down

0 comments on commit fe22032

Please sign in to comment.