diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 0bacd445b..82958dc90 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -97,12 +97,6 @@ type Config struct { ConnectorPlugins map[string]sdk.Connector - dev struct { - cpuprofile string - memprofile string - blockprofile string - } - SchemaRegistry struct { Type string @@ -110,6 +104,12 @@ type Config struct { ConnectionString string } } + + dev struct { + cpuprofile string + memprofile string + blockprofile string + } } func DefaultConfig() Config { diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 1323df186..05360c1db 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -26,6 +26,7 @@ import ( "runtime" "runtime/pprof" "strings" + "sync" "time" "github.com/conduitio/conduit-commons/database" @@ -101,8 +102,7 @@ type Runtime struct { connectorPersister *connector.Persister procSchemaService *procutils.SchemaService - logger log.CtxLogger - gRPCStatsHandler *promgrpc.StatsHandler + logger log.CtxLogger } // NewRuntime sets up a Runtime instance and primes it for start. @@ -141,7 +141,7 @@ func NewRuntime(cfg Config) (*Runtime, error) { } } - configurePrometheus() + configureMetrics() measure.ConduitInfo.WithValues(Version(true)).Inc() // Start the connector persister @@ -157,8 +157,7 @@ func NewRuntime(cfg Config) (*Runtime, error) { connectorPersister: connectorPersister, - gRPCStatsHandler: newGRPCStatsHandler(), - logger: logger, + logger: logger, } err := createServices(r) @@ -248,13 +247,6 @@ func createSchemaRegistry(config Config, logger log.CtxLogger, db database.DB) ( return schemaRegistry, nil } -func newGRPCStatsHandler() *promgrpc.StatsHandler { - h := promgrpc.ServerStatsHandler() - promclient.MustRegister(h) - - return h -} - func newLogger(level string, format string) log.CtxLogger { // TODO make logger hooks configurable l, _ := zerolog.ParseLevel(level) @@ -262,10 +254,24 @@ func newLogger(level string, format string) log.CtxLogger { return log.InitLogger(l, f) } -func configurePrometheus() { - registry := prometheus.NewRegistry(nil) - promclient.MustRegister(registry) - metrics.Register(registry) +var ( + metricsConfigureOnce sync.Once + metricsGrpcStatsHandler *promgrpc.StatsHandler +) + +// configureMetrics +func configureMetrics() *promgrpc.StatsHandler { + metricsConfigureOnce.Do(func() { + // conduit metrics + reg := prometheus.NewRegistry(nil) + metrics.Register(reg) + promclient.MustRegister(reg) + + // grpc metrics + metricsGrpcStatsHandler = promgrpc.ServerStatsHandler() + promclient.MustRegister(metricsGrpcStatsHandler) + }) + return metricsGrpcStatsHandler } // Run initializes all of Conduit's underlying services and starts the GRPC and @@ -429,7 +435,7 @@ func (r *Runtime) serveGRPCAPI(ctx context.Context, t *tomb.Tomb) (net.Addr, err grpcutil.RequestIDUnaryServerInterceptor(r.logger), grpcutil.LoggerUnaryServerInterceptor(r.logger), ), - grpc.StatsHandler(r.gRPCStatsHandler), + grpc.StatsHandler(metricsGrpcStatsHandler), ) pipelineAPIv1 := api.NewPipelineAPIv1(r.Orchestrator.Pipelines) @@ -481,7 +487,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad grpcutil.RequestIDUnaryServerInterceptor(r.logger), grpcutil.LoggerUnaryServerInterceptor(r.logger), ), - grpc.StatsHandler(r.gRPCStatsHandler), + grpc.StatsHandler(metricsGrpcStatsHandler), ) schemaServiceAPI := pconnutils.NewSchemaServiceServer(r.connSchemaService) diff --git a/pkg/conduit/runtime_test.go b/pkg/conduit/runtime_test.go index ccc18f0ff..28226c369 100644 --- a/pkg/conduit/runtime_test.go +++ b/pkg/conduit/runtime_test.go @@ -78,6 +78,10 @@ func TestRuntime(t *testing.T) { t.Logf("expected error '%v', got '%v'", context.Canceled, err) } is.True(strings.Contains(logs.String(), "grpc API started")) + + // creating a second runtime should succeed + _, err = conduit.NewRuntime(cfg) + is.NoErr(err) } // safeBuffer wraps bytes.Buffer and makes it safe for concurrent use.