From 6f15b047f16603fc89c485b513f2b53b142c5d87 Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 30 Sep 2021 17:32:50 +0800 Subject: [PATCH] feat: add grpc metric and refactor grpc server (#686) * feat: add grpc metric and refactor grpc server Signed-off-by: Gaius --- cdn/cdn.go | 102 +++++++--- cdn/config/config.go | 7 + cdn/metrics/metrics.go | 72 +++++++ cdn/rpcserver/rpcserver.go | 39 ++-- cdn/rpcserver/rpcserver_test.go | 8 +- cdn/supervisor/cdn/manager.go | 6 +- client/daemon/daemon.go | 2 +- client/daemon/peer/peertask_manager_test.go | 4 +- client/daemon/rpcserver/rpcserver.go | 18 +- client/daemon/rpcserver/rpcserver_test.go | 6 +- deploy/helm-charts | 2 +- internal/constants/constants.go | 8 + manager/config/config.go | 9 +- manager/config/config_test.go | 6 +- manager/config/testdata/manager.yaml | 5 +- manager/manager.go | 54 ++--- .../{metric/metric.go => metrics/metrics.go} | 2 +- pkg/rpc/cdnsystem/cdnsystem.proto | 4 +- pkg/rpc/cdnsystem/server/server.go | 60 +++--- pkg/rpc/dfdaemon/dfdaemon.proto | 8 +- pkg/rpc/dfdaemon/server/server.go | 23 ++- pkg/rpc/examples/cdnsystem/client/main.go | 78 ------- pkg/rpc/examples/cdnsystem/server/main.go | 71 ------- pkg/rpc/manager/server/server.go | 82 ++++++-- pkg/rpc/scheduler/server/server.go | 127 ++++++------ pkg/rpc/server.go | 192 ++---------------- pkg/rpc/server_listen.go | 28 ++- scheduler/config/config.go | 5 + scheduler/config/config_test.go | 5 +- scheduler/config/testdata/scheduler.yaml | 5 + scheduler/metrics/metrics.go | 101 +++++++++ scheduler/rpcserver/rpcserver.go | 25 ++- scheduler/scheduler.go | 131 +++++++++--- 33 files changed, 684 insertions(+), 611 deletions(-) create mode 100644 cdn/metrics/metrics.go rename manager/{metric/metric.go => metrics/metrics.go} (98%) delete mode 100644 pkg/rpc/examples/cdnsystem/client/main.go delete mode 100644 pkg/rpc/examples/cdnsystem/server/main.go create mode 100644 scheduler/metrics/metrics.go diff --git a/cdn/cdn.go b/cdn/cdn.go index 0fbaa11926d..5b76490d19d 100644 --- a/cdn/cdn.go +++ b/cdn/cdn.go @@ -20,9 +20,12 @@ package cdn import ( "context" "fmt" + "net/http" "runtime" + "time" "d7y.io/dragonfly/v2/cdn/config" + "d7y.io/dragonfly/v2/cdn/metrics" "d7y.io/dragonfly/v2/cdn/plugins" "d7y.io/dragonfly/v2/cdn/rpcserver" "d7y.io/dragonfly/v2/cdn/supervisor/cdn" @@ -32,7 +35,6 @@ import ( "d7y.io/dragonfly/v2/cdn/supervisor/task" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" - "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server" "d7y.io/dragonfly/v2/pkg/rpc/manager" managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client" "d7y.io/dragonfly/v2/pkg/util/net/iputils" @@ -41,9 +43,21 @@ import ( "google.golang.org/grpc" ) +const ( + gracefulStopTimeout = 10 * time.Second +) + type Server struct { - config *config.Config - seedServer server.SeederServer + // Server configuration + config *config.Config + + // GRPC server + grpcServer *grpc.Server + + // Metrics server + metricsServer *http.Server + + // Manager client managerClient managerclient.Client } @@ -86,11 +100,20 @@ func New(cfg *config.Config) (*Server, error) { storageMgr.Initialize(taskMgr) // Initialize storage manager - cdnSeedServer, err := rpcserver.NewCdnSeedServer(cfg, taskMgr) + var opts []grpc.ServerOption + if s.config.Options.Telemetry.Jaeger != "" { + opts = append(opts, grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor())) + } + grpcServer, err := rpcserver.New(cfg, taskMgr, opts...) if err != nil { return nil, errors.Wrap(err, "create seedServer") } - s.seedServer = cdnSeedServer + s.grpcServer = grpcServer + + // Initialize prometheus + if cfg.Metrics != nil { + s.metricsServer = metrics.New(cfg.Metrics, grpcServer) + } // Initialize manager client if cfg.Manager.Addr != "" { @@ -118,21 +141,27 @@ func New(cfg *config.Config) (*Server, error) { return s, nil } -func (s *Server) Serve() (err error) { - defer func() { - if rec := recover(); rec != nil { - err = errors.New(fmt.Sprintf("%v", rec)) - } - }() - +func (s *Server) Serve() error { // Start GC ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err = gc.StartGC(ctx) - if err != nil { + if err := gc.StartGC(ctx); err != nil { return err } + // Started metrics server + if s.metricsServer != nil { + go func() { + logger.Infof("started metrics server at %s", s.metricsServer.Addr) + if err := s.metricsServer.ListenAndServe(); err != nil { + if err == http.ErrServerClosed { + return + } + logger.Fatalf("metrics server closed unexpect: %+v", err) + } + }() + } + // Serve Keepalive if s.managerClient != nil { go func() { @@ -145,24 +174,51 @@ func (s *Server) Serve() (err error) { }() } - // Serve GRPC - var opts []grpc.ServerOption - if s.config.Options.Telemetry.Jaeger != "" { - opts = append(opts, grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor())) - } - err = rpc.StartTCPServer(s.config.ListenPort, s.config.ListenPort, s.seedServer, opts...) + // Generate GRPC listener + lis, _, err := rpc.ListenWithPortRange(iputils.HostIP, s.config.ListenPort, s.config.ListenPort) if err != nil { - return errors.Wrap(err, "start tcp server") + logger.Fatalf("net listener failed to start: %+v", err) + } + defer lis.Close() + + // Started GRPC server + logger.Infof("started grpc server at %s://%s", lis.Addr().Network(), lis.Addr().String()) + if err := s.grpcServer.Serve(lis); err != nil { + logger.Errorf("stoped grpc server: %+v", err) + return err } + return nil } func (s *Server) Stop() { + // Stop manager client if s.managerClient != nil { s.managerClient.Close() logger.Info("manager client closed") } - rpc.StopServer() - logger.Info("grpc server closed under request") + // Stop metrics server + if s.metricsServer != nil { + if err := s.metricsServer.Shutdown(context.Background()); err != nil { + logger.Errorf("metrics server failed to stop: %+v", err) + } + logger.Info("metrics server closed under request") + } + + // Stop GRPC server + stopped := make(chan struct{}) + go func() { + s.grpcServer.GracefulStop() + logger.Info("grpc server closed under request") + close(stopped) + }() + + t := time.NewTimer(gracefulStopTimeout) + select { + case <-t.C: + s.grpcServer.Stop() + case <-stopped: + t.Stop() + } } diff --git a/cdn/config/config.go b/cdn/config/config.go index 55f9b0910ef..2e34b062772 100644 --- a/cdn/config/config.go +++ b/cdn/config/config.go @@ -189,6 +189,9 @@ type BaseProperties struct { // Host configuration Host HostConfig `yaml:"host" mapstructure:"host"` + + // Metrics configuration + Metrics *RestConfig `yaml:"metrics" mapstructure:"metrics"` } type ManagerConfig struct { @@ -214,3 +217,7 @@ type HostConfig struct { // IDC for scheduler IDC string `mapstructure:"idc" yaml:"idc"` } + +type RestConfig struct { + Addr string `yaml:"addr" mapstructure:"addr"` +} diff --git a/cdn/metrics/metrics.go b/cdn/metrics/metrics.go new file mode 100644 index 00000000000..aa7513dd754 --- /dev/null +++ b/cdn/metrics/metrics.go @@ -0,0 +1,72 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "net/http" + + "d7y.io/dragonfly/v2/cdn/config" + "d7y.io/dragonfly/v2/internal/constants" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc" +) + +// Variables declared for metrics. +var ( + DownloadCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.CDNMetricsName, + Name: "download_total", + Help: "Counter of the number of the downloading.", + }) + + DownloadFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.CDNMetricsName, + Name: "download_failure_total", + Help: "Counter of the number of failed of the downloading.", + }) + + DownloadTraffic = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.CDNMetricsName, + Name: "download_traffic", + Help: "Counter of the number of download traffic.", + }) + + ConcurrentDownloadGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.CDNMetricsName, + Name: "concurrent_download_total", + Help: "Gauger of the number of concurrent of the downloading.", + }) +) + +func New(cfg *config.RestConfig, grpcServer *grpc.Server) *http.Server { + grpc_prometheus.Register(grpcServer) + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + return &http.Server{ + Addr: cfg.Addr, + Handler: mux, + } +} diff --git a/cdn/rpcserver/rpcserver.go b/cdn/rpcserver/rpcserver.go index 173d4955517..c918afe889f 100644 --- a/cdn/rpcserver/rpcserver.go +++ b/cdn/rpcserver/rpcserver.go @@ -32,6 +32,7 @@ import ( "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" + cdnserver "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server" "d7y.io/dragonfly/v2/pkg/util/digestutils" "d7y.io/dragonfly/v2/pkg/util/net/iputils" "d7y.io/dragonfly/v2/pkg/util/net/urlutils" @@ -39,38 +40,26 @@ import ( "github.com/pkg/errors" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" ) -var tracer trace.Tracer +var tracer = otel.Tracer("cdn-server") -func init() { - tracer = otel.Tracer("cdn-server") -} - -type options struct { - tracer trace.Tracer -} - -type Option func(*options) - -func WithTracer(tracer trace.Tracer) Option { - return func(o *options) { - o.tracer = tracer - } -} - -// CdnSeedServer is used to implement cdnsystem.SeederServer. -type CdnSeedServer struct { +type server struct { + *grpc.Server taskMgr supervisor.SeedTaskMgr cfg *config.Config } -// NewCdnSeedServer returns a new Manager Object. -func NewCdnSeedServer(cfg *config.Config, taskMgr supervisor.SeedTaskMgr) (*CdnSeedServer, error) { - return &CdnSeedServer{ +// New returns a new Manager Object. +func New(cfg *config.Config, taskMgr supervisor.SeedTaskMgr, opts ...grpc.ServerOption) (*grpc.Server, error) { + svr := &server{ taskMgr: taskMgr, cfg: cfg, - }, nil + } + + svr.Server = cdnserver.New(svr, opts...) + return svr.Server, nil } func constructRegisterRequest(req *cdnsystem.SeedRequest) (*types.TaskRegisterRequest, error) { @@ -114,7 +103,7 @@ func checkSeedRequestParams(req *cdnsystem.SeedRequest) error { return nil } -func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, psc chan<- *cdnsystem.PieceSeed) (err error) { +func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, psc chan<- *cdnsystem.PieceSeed) (err error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanObtainSeeds, trace.WithSpanKind(trace.SpanKindServer)) defer span.End() @@ -185,7 +174,7 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe return nil } -func (css *CdnSeedServer) GetPieceTasks(ctx context.Context, req *base.PieceTaskRequest) (piecePacket *base.PiecePacket, err error) { +func (css *server) GetPieceTasks(ctx context.Context, req *base.PieceTaskRequest) (piecePacket *base.PiecePacket, err error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanGetPieceTasks, trace.WithSpanKind(trace.SpanKindServer)) defer span.End() diff --git a/cdn/rpcserver/rpcserver_test.go b/cdn/rpcserver/rpcserver_test.go index 692fd76f12a..ba7fed61fab 100644 --- a/cdn/rpcserver/rpcserver_test.go +++ b/cdn/rpcserver/rpcserver_test.go @@ -48,7 +48,7 @@ func TestCdnSeedServer_GetPieceTasks(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - css := &CdnSeedServer{ + css := &server{ taskMgr: tt.fields.taskMgr, cfg: tt.fields.cfg, } @@ -84,7 +84,7 @@ func TestCdnSeedServer_ObtainSeeds(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - css := &CdnSeedServer{ + css := &server{ taskMgr: tt.fields.taskMgr, cfg: tt.fields.cfg, } @@ -103,14 +103,14 @@ func TestNewCdnSeedServer(t *testing.T) { tests := []struct { name string args args - want *CdnSeedServer + want *server wantErr bool }{ // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := NewCdnSeedServer(tt.args.cfg, tt.args.taskMgr) + got, err := New(tt.args.cfg, tt.args.taskMgr) if (err != nil) != tt.wantErr { t.Errorf("NewCdnSeedServer() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/cdn/supervisor/cdn/manager.go b/cdn/supervisor/cdn/manager.go index ee405f72f4c..12f16cafe1f 100644 --- a/cdn/supervisor/cdn/manager.go +++ b/cdn/supervisor/cdn/manager.go @@ -136,7 +136,7 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa // download fail if err != nil { downloadSpan.RecordError(err) - server.StatSeedFinish(task.TaskID, task.URL, false, err, start.Nanosecond(), time.Now().Nanosecond(), 0, 0) + server.StatSeedFinish(task.TaskID, task.URL, false, err, start, time.Now(), 0, 0) seedTask.UpdateStatus(types.TaskInfoCdnStatusSourceError) return seedTask, err } @@ -146,13 +146,13 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa // forth: write to storage downloadMetadata, err := cm.writer.startWriter(ctx, reader, task, detectResult) if err != nil { - server.StatSeedFinish(task.TaskID, task.URL, false, err, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength, + server.StatSeedFinish(task.TaskID, task.URL, false, err, start, time.Now(), downloadMetadata.backSourceLength, downloadMetadata.realSourceFileLength) task.Log().Errorf("failed to write for task: %v", err) seedTask.UpdateStatus(types.TaskInfoCdnStatusFailed) return seedTask, err } - server.StatSeedFinish(task.TaskID, task.URL, true, nil, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength, + server.StatSeedFinish(task.TaskID, task.URL, true, nil, start, time.Now(), downloadMetadata.backSourceLength, downloadMetadata.realSourceFileLength) sourceDigest := reader.Digest() // fifth: handle CDN result diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 67f8a9f3f63..c8c2e6b21f3 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -156,7 +156,7 @@ func New(opt *config.DaemonOption) (Daemon, error) { } peerServerOption = append(peerServerOption, grpc.Creds(tlsCredentials)) } - rpcManager, err := rpcserver.NewServer(host, peerTaskManager, storageManager, downloadServerOption, peerServerOption) + rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, downloadServerOption, peerServerOption) if err != nil { return nil, err } diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 4f81d6d1e4b..a49627ca206 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -53,8 +53,6 @@ import ( "d7y.io/dragonfly/v2/pkg/source/httpprotocol" ) -var _ daemonserver.DaemonServer = mock_daemon.NewMockDaemonServer(nil) - type componentsOption struct { taskID string contentLength int64 @@ -101,7 +99,7 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio Type: "tcp", Addr: fmt.Sprintf("0.0.0.0:%d", port), }) - go rpc.NewServer(daemon).Serve(ln) + go daemonserver.New(daemon).Serve(ln) time.Sleep(100 * time.Millisecond) // 2. setup a scheduler diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index ffbe2dd18bc..072af2477ea 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -33,7 +33,6 @@ import ( "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/base" dfdaemongrpc "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" dfdaemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" @@ -53,24 +52,21 @@ type server struct { peerTaskManager peer.TaskManager storageManager storage.Manager - downloadServer rpc.Server - peerServer rpc.Server + downloadServer *grpc.Server + peerServer *grpc.Server uploadAddr string } -var _ dfdaemonserver.DaemonServer = (*server)(nil) -var _ Server = (*server)(nil) - -func NewServer(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, storageManager storage.Manager, downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Server, error) { - mgr := &server{ +func New(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, storageManager storage.Manager, downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Server, error) { + svr := &server{ KeepAlive: clientutil.NewKeepAlive("rpc server"), peerHost: peerHost, peerTaskManager: peerTaskManager, storageManager: storageManager, } - mgr.downloadServer = rpc.NewServer(mgr, downloadOpts...) - mgr.peerServer = rpc.NewServer(mgr, peerOpts...) - return mgr, nil + svr.downloadServer = dfdaemonserver.New(svr, downloadOpts...) + svr.peerServer = dfdaemonserver.New(svr, peerOpts...) + return svr, nil } func (m *server) ServeDownload(listener net.Listener) error { diff --git a/client/daemon/rpcserver/rpcserver_test.go b/client/daemon/rpcserver/rpcserver_test.go index 931c9c6a597..ab33ec6060f 100644 --- a/client/daemon/rpcserver/rpcserver_test.go +++ b/client/daemon/rpcserver/rpcserver_test.go @@ -34,11 +34,11 @@ import ( mock_peer "d7y.io/dragonfly/v2/client/daemon/test/mock/peer" mock_storage "d7y.io/dragonfly/v2/client/daemon/test/mock/storage" "d7y.io/dragonfly/v2/pkg/basic/dfnet" - "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/base" dfdaemongrpc "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" dfclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" _ "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" + dfdaemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" ) @@ -78,7 +78,7 @@ func TestDownloadManager_ServeDownload(t *testing.T) { peerHost: &scheduler.PeerHost{}, peerTaskManager: mockPeerTaskManager, } - m.downloadServer = rpc.NewServer(m) + m.downloadServer = dfdaemonserver.New(m) port, err := freeport.GetFreePort() if err != nil { t.Fatal(err) @@ -161,7 +161,7 @@ func TestDownloadManager_ServePeer(t *testing.T) { peerHost: &scheduler.PeerHost{}, storageManager: mockStorageManger, } - m.peerServer = rpc.NewServer(m) + m.peerServer = dfdaemonserver.New(m) port, err := freeport.GetFreePort() if err != nil { t.Fatal(err) diff --git a/deploy/helm-charts b/deploy/helm-charts index acc7c633ba5..65abd2d1b04 160000 --- a/deploy/helm-charts +++ b/deploy/helm-charts @@ -1 +1 @@ -Subproject commit acc7c633ba587f974d9e06ae58d12127df022030 +Subproject commit 65abd2d1b046a20a47b6ae0e3c86f00160dafade diff --git a/internal/constants/constants.go b/internal/constants/constants.go index cb63bb3ab37..fd368f85b0a 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -22,3 +22,11 @@ const ( CDNPattern = "cdn" P2PPattern = "p2p" ) + +const ( + MetricsNamespace = "dragonfly" + ManagerMetricsName = "manager" + SchedulerMetricsName = "scheduler" + DfdaemonMetricsName = "dfdaemon" + CDNMetricsName = "cdn" +) diff --git a/manager/config/config.go b/manager/config/config.go index e4ba185c481..45d6e7b9301 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -28,6 +28,7 @@ type Config struct { Server *ServerConfig `yaml:"server" mapstructure:"server"` Database *DatabaseConfig `yaml:"database" mapstructure:"database"` Cache *CacheConfig `yaml:"cache" mapstructure:"cache"` + Metrics *RestConfig `yaml:"metrics" mapstructure:"metrics"` } type ServerConfig struct { @@ -35,7 +36,6 @@ type ServerConfig struct { PublicPath string `yaml:"publicPath" mapstructure:"publicPath"` GRPC *TCPListenConfig `yaml:"grpc" mapstructure:"grpc"` REST *RestConfig `yaml:"rest" mapstructure:"rest"` - Metric *RestConfig `yaml:"metric" mapstructure:"metric"` } type DatabaseConfig struct { @@ -106,9 +106,6 @@ func New() *Config { REST: &RestConfig{ Addr: ":8080", }, - Metric: &RestConfig{ - Addr: ":8000", - }, }, Database: &DatabaseConfig{ Redis: &RedisConfig{ @@ -184,10 +181,6 @@ func (cfg *Config) Validate() error { if cfg.Server.REST == nil { return errors.New("empty rest server config is not specified") } - - if cfg.Server.Metric == nil { - return errors.New("empty metric server config is not specified") - } } return nil diff --git a/manager/config/config_test.go b/manager/config/config_test.go index 0fff025bfb6..b7994a1ed5e 100644 --- a/manager/config/config_test.go +++ b/manager/config/config_test.go @@ -41,9 +41,6 @@ func TestManagerConfig_Load(t *testing.T) { REST: &RestConfig{ Addr: ":8080", }, - Metric: &RestConfig{ - Addr: ":8000", - }, }, Database: &DatabaseConfig{ Mysql: &MysqlConfig{ @@ -71,6 +68,9 @@ func TestManagerConfig_Load(t *testing.T) { TTL: 1000, }, }, + Metrics: &RestConfig{ + Addr: ":8000", + }, } managerConfigYAML := &Config{} diff --git a/manager/config/testdata/manager.yaml b/manager/config/testdata/manager.yaml index 5bb98939ce6..3b35c8db7f4 100644 --- a/manager/config/testdata/manager.yaml +++ b/manager/config/testdata/manager.yaml @@ -7,8 +7,6 @@ server: end: 65003 rest: addr: :8080 - metric: - addr: :8000 database: mysql: @@ -31,3 +29,6 @@ cache: local: size: 10000 ttl: 1000 + +metrics: + addr: :8000 diff --git a/manager/manager.go b/manager/manager.go index b902dc8b56a..a67bf8a7db1 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -26,7 +26,7 @@ import ( "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/database" "d7y.io/dragonfly/v2/manager/job" - "d7y.io/dragonfly/v2/manager/metric" + "d7y.io/dragonfly/v2/manager/metrics" "d7y.io/dragonfly/v2/manager/permission/rbac" "d7y.io/dragonfly/v2/manager/router" "d7y.io/dragonfly/v2/manager/searcher" @@ -51,11 +51,13 @@ type Server struct { // REST server restServer *http.Server - // Metric server - metricServer *http.Server + // Metrics server + metricsServer *http.Server } func New(cfg *config.Config) (*Server, error) { + s := &Server{config: cfg} + // Initialize database db, err := database.New(cfg) if err != nil { @@ -89,7 +91,7 @@ func New(cfg *config.Config) (*Server, error) { if err != nil { return nil, err } - restServer := &http.Server{ + s.restServer = &http.Server{ Addr: cfg.Server.REST.Addr, Handler: router, } @@ -107,16 +109,14 @@ func New(cfg *config.Config) (*Server, error) { opts = append(opts, grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor())) } grpcServer := grpc_manager_server.New(grpcService, opts...) + s.grpcServer = grpcServer // Initialize prometheus - metricServer := metric.New(cfg.Server.Metric, grpcServer) - - return &Server{ - config: cfg, - grpcServer: grpcServer, - restServer: restServer, - metricServer: metricServer, - }, nil + if cfg.Metrics != nil { + s.metricsServer = metrics.New(cfg.Metrics, grpcServer) + } + + return s, nil } func (s *Server) Serve() error { @@ -131,16 +131,18 @@ func (s *Server) Serve() error { } }() - // Started metric server - go func() { - logger.Infof("started metric server at %s", s.metricServer.Addr) - if err := s.metricServer.ListenAndServe(); err != nil { - if err == http.ErrServerClosed { - return + // Started metrics server + if s.metricsServer != nil { + go func() { + logger.Infof("started metrics server at %s", s.metricsServer.Addr) + if err := s.metricsServer.ListenAndServe(); err != nil { + if err == http.ErrServerClosed { + return + } + logger.Fatalf("metrics server closed unexpect: %+v", err) } - logger.Fatalf("metric server closed unexpect: %+v", err) - } - }() + }() + } // Generate GRPC listener lis, _, err := rpc.ListenWithPortRange(s.config.Server.GRPC.Listen, s.config.Server.GRPC.PortRange.Start, s.config.Server.GRPC.PortRange.End) @@ -166,11 +168,13 @@ func (s *Server) Stop() { } logger.Info("rest server closed under request") - // Stop metric server - if err := s.metricServer.Shutdown(context.Background()); err != nil { - logger.Errorf("metric server failed to stop: %+v", err) + // Stop metrics server + if s.metricsServer != nil { + if err := s.metricsServer.Shutdown(context.Background()); err != nil { + logger.Errorf("metrics server failed to stop: %+v", err) + } + logger.Info("metrics server closed under request") } - logger.Info("metric server closed under request") // Stop GRPC server stopped := make(chan struct{}) diff --git a/manager/metric/metric.go b/manager/metrics/metrics.go similarity index 98% rename from manager/metric/metric.go rename to manager/metrics/metrics.go index 1944c276c4c..c42eb95a6c7 100644 --- a/manager/metric/metric.go +++ b/manager/metrics/metrics.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package metric +package metrics import ( "net/http" diff --git a/pkg/rpc/cdnsystem/cdnsystem.proto b/pkg/rpc/cdnsystem/cdnsystem.proto index 7c1255b7cac..5a22a8f506d 100644 --- a/pkg/rpc/cdnsystem/cdnsystem.proto +++ b/pkg/rpc/cdnsystem/cdnsystem.proto @@ -47,8 +47,8 @@ message PieceSeed{ // CDN System RPC Service service Seeder{ - // generate seeds and return to scheduler + // Generate seeds and return to scheduler rpc ObtainSeeds(SeedRequest)returns(stream PieceSeed); - // get piece tasks from cdn + // Get piece tasks from cdn rpc GetPieceTasks(base.PieceTaskRequest)returns(base.PiecePacket); } diff --git a/pkg/rpc/cdnsystem/server/server.go b/pkg/rpc/cdnsystem/server/server.go index ff01f88f9b4..c8d86454b08 100644 --- a/pkg/rpc/cdnsystem/server/server.go +++ b/pkg/rpc/cdnsystem/server/server.go @@ -19,7 +19,9 @@ package server import ( "context" "sync" + "time" + "d7y.io/dragonfly/v2/cdn/metrics" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" @@ -32,16 +34,11 @@ import ( "google.golang.org/grpc/peer" ) -func init() { - // set register with server implementation. - rpc.SetRegister(func(s *grpc.Server, impl interface{}) { - cdnsystem.RegisterSeederServer(s, &proxy{server: impl.(SeederServer)}) - }) -} - -// SeederServer see cdnsystem.SeederServer +// SeederServer refer to cdnsystem.SeederServer type SeederServer interface { + // Generate seeds and return to scheduler ObtainSeeds(context.Context, *cdnsystem.SeedRequest, chan<- *cdnsystem.PieceSeed) error + // Get piece tasks from cdn GetPieceTasks(context.Context, *base.PieceTaskRequest) (*base.PiecePacket, error) } @@ -50,7 +47,17 @@ type proxy struct { cdnsystem.UnimplementedSeederServer } +func New(seederServer SeederServer, opts ...grpc.ServerOption) *grpc.Server { + grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...) + cdnsystem.RegisterSeederServer(grpcServer, &proxy{server: seederServer}) + return grpcServer +} + func (p *proxy) ObtainSeeds(sr *cdnsystem.SeedRequest, stream cdnsystem.Seeder_ObtainSeedsServer) (err error) { + metrics.DownloadCount.Inc() + metrics.ConcurrentDownloadGauge.Inc() + defer metrics.ConcurrentDownloadGauge.Dec() + ctx, cancel := context.WithCancel(stream.Context()) defer cancel() @@ -79,6 +86,9 @@ func (p *proxy) ObtainSeeds(sr *cdnsystem.SeedRequest, stream cdnsystem.Seeder_O err = nil } + if err != nil { + metrics.DownloadFailureCount.Inc() + } return } @@ -122,23 +132,25 @@ func call(ctx context.Context, psc chan *cdnsystem.PieceSeed, p *proxy, sr *cdns } func StatSeedStart(taskID, url string) { - logger.StatSeedLogger.Info("trigger seed making", - zap.String("taskID", taskID), - zap.String("url", url), - zap.String("seederIp", iputils.HostIP), - zap.String("seederName", iputils.HostName)) + logger.StatSeedLogger.Info("Start Seed", + zap.String("TaskID", taskID), + zap.String("URL", url), + zap.String("SeederIp", iputils.HostIP), + zap.String("SeederHostName", iputils.HostName)) } -func StatSeedFinish(taskID, url string, success bool, err error, beginTime, endTime int, traffic, contentLength int64) { - logger.StatSeedLogger.Info("seed making finish", - zap.Bool("success", success), - zap.String("taskID", taskID), - zap.String("url", url), - zap.String("seederIp", iputils.HostIP), - zap.String("seederName", iputils.HostName), - zap.Int("beginTime", beginTime), - zap.Int("endTime", endTime), - zap.Int64("traffic", traffic), - zap.Int64("contentLength", contentLength), +func StatSeedFinish(taskID, url string, success bool, err error, startAt, finishAt time.Time, traffic, contentLength int64) { + metrics.DownloadTraffic.Add(float64(traffic)) + + logger.StatSeedLogger.Info("Finish Seed", + zap.Bool("Success", success), + zap.String("TaskID", taskID), + zap.String("URL", url), + zap.String("SeederIp", iputils.HostIP), + zap.String("SeederHostName", iputils.HostName), + zap.Time("StartAt", startAt), + zap.Time("FinishAt", finishAt), + zap.Int64("Traffic", traffic), + zap.Int64("ContentLength", contentLength), zap.Error(err)) } diff --git a/pkg/rpc/dfdaemon/dfdaemon.proto b/pkg/rpc/dfdaemon/dfdaemon.proto index 74c1de56fa4..7deaa83ac4f 100644 --- a/pkg/rpc/dfdaemon/dfdaemon.proto +++ b/pkg/rpc/dfdaemon/dfdaemon.proto @@ -56,10 +56,10 @@ message DownResult{ // Daemon Client RPC Service service Daemon{ - // trigger client to download file + // Trigger client to download file rpc Download(DownRequest) returns(stream DownResult); - // get piece tasks from other peers + // Get piece tasks from other peers rpc GetPieceTasks(base.PieceTaskRequest)returns(base.PiecePacket); - // check daemon health + // Check daemon health rpc CheckHealth(google.protobuf.Empty)returns(google.protobuf.Empty); -} \ No newline at end of file +} diff --git a/pkg/rpc/dfdaemon/server/server.go b/pkg/rpc/dfdaemon/server/server.go index 3b7a0e5f926..e05041b19ec 100644 --- a/pkg/rpc/dfdaemon/server/server.go +++ b/pkg/rpc/dfdaemon/server/server.go @@ -31,11 +31,14 @@ import ( "google.golang.org/grpc/peer" ) -func init() { - // set register with server implementation. - rpc.SetRegister(func(s *grpc.Server, impl interface{}) { - dfdaemon.RegisterDaemonServer(s, &proxy{server: impl.(DaemonServer)}) - }) +// DaemonServer refer to dfdaemon.DaemonServer +type DaemonServer interface { + // Trigger client to download file + Download(context.Context, *dfdaemon.DownRequest, chan<- *dfdaemon.DownResult) error + // Get piece tasks from other peers + GetPieceTasks(context.Context, *base.PieceTaskRequest) (*base.PiecePacket, error) + // Check daemon health + CheckHealth(context.Context) error } type proxy struct { @@ -43,11 +46,10 @@ type proxy struct { dfdaemon.UnimplementedDaemonServer } -// see dfdaemon.DaemonServer -type DaemonServer interface { - Download(context.Context, *dfdaemon.DownRequest, chan<- *dfdaemon.DownResult) error - GetPieceTasks(context.Context, *base.PieceTaskRequest) (*base.PiecePacket, error) - CheckHealth(context.Context) error +func New(daemonServer DaemonServer, opts ...grpc.ServerOption) *grpc.Server { + grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...) + dfdaemon.RegisterDaemonServer(grpcServer, &proxy{server: daemonServer}) + return grpcServer } func (p *proxy) Download(req *dfdaemon.DownRequest, stream dfdaemon.Daemon_DownloadServer) (err error) { @@ -87,7 +89,6 @@ func (p *proxy) GetPieceTasks(ctx context.Context, ptr *base.PieceTaskRequest) ( } func (p *proxy) CheckHealth(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { - _ = req return new(empty.Empty), p.server.CheckHealth(ctx) } diff --git a/pkg/rpc/examples/cdnsystem/client/main.go b/pkg/rpc/examples/cdnsystem/client/main.go deleted file mode 100644 index c21ee64e4ca..00000000000 --- a/pkg/rpc/examples/cdnsystem/client/main.go +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "sync" - - "d7y.io/dragonfly/v2/internal/dflog/logcore" - "d7y.io/dragonfly/v2/pkg/basic/dfnet" - "d7y.io/dragonfly/v2/pkg/rpc/base" - _ "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" - - "context" - "fmt" - - "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" - "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" -) - -func main() { - logcore.InitCdnSystem(true) - c, err := client.GetClientByAddr([]dfnet.NetAddr{ - { - Type: dfnet.TCP, - Addr: "127.0.0.1:8003", - }, - }) - if err != nil { - panic(err) - } - var wg sync.WaitGroup - for i := 0; i < 1; i++ { - wg.Add(1) - go func() { - defer wg.Done() - psc, _ := c.ObtainSeeds(context.TODO(), &cdnsystem.SeedRequest{ - TaskId: "test2", - Url: "https://desktop.docker.com/mac/stable/amd64/Docker.dmg", - UrlMeta: &base.UrlMeta{ - Filter: "", - }, - }) - - //psc, _ := c.ObtainSeeds(context.TODO(), &cdnsystem.SeedRequest{ - // TaskId: idgen.TaskID("https://desktop.docker.com/mac/stable/amd64/Docker.dmg?a=a&b=b&c=c", "a&b", &base.UrlMeta{ - // Digest: "md5", - // Range: "50-1000", - // }, "dragonfly"), - // Url: "https://desktop.docker.com/mac/stable/amd64/Docker.dmg?a=a&b=b&c=c", - // Filter: "a&b", - //}) - for { - piece, err := psc.Recv() - if err != nil { - fmt.Println(err) - break - } - fmt.Println(piece) - } - }() - } - wg.Wait() - fmt.Println("client finish") -} diff --git a/pkg/rpc/examples/cdnsystem/server/main.go b/pkg/rpc/examples/cdnsystem/server/main.go deleted file mode 100644 index ae2c5a2762f..00000000000 --- a/pkg/rpc/examples/cdnsystem/server/main.go +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "context" - "fmt" - "time" - - "d7y.io/dragonfly/v2/internal/dflog/logcore" - "d7y.io/dragonfly/v2/pkg/rpc" - "d7y.io/dragonfly/v2/pkg/rpc/base" - "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" - "d7y.io/dragonfly/v2/pkg/safe" -) - -type helloSeeder struct { -} - -func (hs *helloSeeder) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, psc chan<- *cdnsystem.PieceSeed) (err error) { - safe.Call(func() { - fmt.Printf("req: %v\n", req) - var i = 5 - for { - select { - case <-ctx.Done(): - return - default: - if i < 0 { - psc <- &cdnsystem.PieceSeed{ - Done: true, - ContentLength: 100, - } - return - } - psc <- &cdnsystem.PieceSeed{} - time.Sleep(1 * time.Second) - i-- - } - } - }) - - return -} - -func (hs *helloSeeder) GetPieceTasks(context.Context, *base.PieceTaskRequest) (*base.PiecePacket, error) { - return nil, nil -} - -func main() { - logcore.InitCdnSystem(false) - err := rpc.StartTCPServer(12345, 12345, &helloSeeder{}) - - if err != nil { - fmt.Printf("finish error: %v\n", err) - } -} diff --git a/pkg/rpc/manager/server/server.go b/pkg/rpc/manager/server/server.go index b0072fe8a83..44ea2e89964 100644 --- a/pkg/rpc/manager/server/server.go +++ b/pkg/rpc/manager/server/server.go @@ -17,6 +17,8 @@ package server import ( + "context" + logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/manager" @@ -28,24 +30,68 @@ import ( "google.golang.org/grpc" ) -func New(managerServer manager.ManagerServer, opts ...grpc.ServerOption) *grpc.Server { - defaultOptions := []grpc.ServerOption{ - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - grpc_validator.StreamServerInterceptor(), - grpc_recovery.StreamServerInterceptor(), - grpc_prometheus.StreamServerInterceptor, - grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), - )), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - grpc_validator.UnaryServerInterceptor(), - grpc_recovery.UnaryServerInterceptor(), - grpc_prometheus.UnaryServerInterceptor, - grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), - )), - } - - grpcServer := grpc.NewServer(append(defaultOptions, opts...)...) - manager.RegisterManagerServer(grpcServer, managerServer) +var defaultServerOptions = []grpc.ServerOption{ + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + grpc_validator.StreamServerInterceptor(), + grpc_recovery.StreamServerInterceptor(), + grpc_prometheus.StreamServerInterceptor, + grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), + )), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + grpc_validator.UnaryServerInterceptor(), + grpc_recovery.UnaryServerInterceptor(), + grpc_prometheus.UnaryServerInterceptor, + grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), + )), +} +// ManagerServer is the server API for Manager service. +type ManagerServer interface { + // Get CDN and CDN cluster configuration + GetCDN(context.Context, *manager.GetCDNRequest) (*manager.CDN, error) + // Update CDN configuration + UpdateCDN(context.Context, *manager.UpdateCDNRequest) (*manager.CDN, error) + // Get Scheduler and Scheduler cluster configuration + GetScheduler(context.Context, *manager.GetSchedulerRequest) (*manager.Scheduler, error) + // Update scheduler configuration + UpdateScheduler(context.Context, *manager.UpdateSchedulerRequest) (*manager.Scheduler, error) + // List acitve schedulers configuration + ListSchedulers(context.Context, *manager.ListSchedulersRequest) (*manager.ListSchedulersResponse, error) + // KeepAlive with manager + KeepAlive(manager.Manager_KeepAliveServer) error +} + +type proxy struct { + server ManagerServer + manager.UnimplementedManagerServer +} + +func New(managerServer ManagerServer, opts ...grpc.ServerOption) *grpc.Server { + grpcServer := grpc.NewServer(append(defaultServerOptions, opts...)...) + manager.RegisterManagerServer(grpcServer, &proxy{server: managerServer}) return grpcServer } + +func (p *proxy) GetCDN(ctx context.Context, req *manager.GetCDNRequest) (*manager.CDN, error) { + return p.server.GetCDN(ctx, req) +} + +func (p *proxy) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*manager.CDN, error) { + return p.server.UpdateCDN(ctx, req) +} + +func (p *proxy) GetScheduler(ctx context.Context, req *manager.GetSchedulerRequest) (*manager.Scheduler, error) { + return p.server.GetScheduler(ctx, req) +} + +func (p *proxy) UpdateScheduler(ctx context.Context, req *manager.UpdateSchedulerRequest) (*manager.Scheduler, error) { + return p.server.UpdateScheduler(ctx, req) +} + +func (p *proxy) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRequest) (*manager.ListSchedulersResponse, error) { + return p.server.ListSchedulers(ctx, req) +} + +func (p *proxy) KeepAlive(req manager.Manager_KeepAliveServer) error { + return p.server.KeepAlive(req) +} diff --git a/pkg/rpc/scheduler/server/server.go b/pkg/rpc/scheduler/server/server.go index 198daee1da8..8f0769b2747 100644 --- a/pkg/rpc/scheduler/server/server.go +++ b/pkg/rpc/scheduler/server/server.go @@ -25,86 +25,97 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" - "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/pkg/util/net/iputils" + "d7y.io/dragonfly/v2/scheduler/metrics" "go.uber.org/zap" "google.golang.org/grpc" ) -func init() { - // set register with server implementation. - rpc.SetRegister(func(s *grpc.Server, impl interface{}) { - scheduler.RegisterSchedulerServer(s, &proxy{server: impl.(SchedulerServer)}) - }) -} - -type proxy struct { - server SchedulerServer - scheduler.UnimplementedSchedulerServer -} - -// SchedulerServer scheduler.SchedulerServer +// SchedulerServer refer to scheduler.SchedulerServer type SchedulerServer interface { - // RegisterPeerTask register a peer to scheduler + // RegisterPeerTask registers a peer into one task. RegisterPeerTask(context.Context, *scheduler.PeerTaskRequest) (*scheduler.RegisterResult, error) - // ReportPieceResult report piece result to scheduler + // ReportPieceResult reports piece results and receives peer packets. ReportPieceResult(scheduler.Scheduler_ReportPieceResultServer) error - // ReportPeerResult report peer download result to scheduler + // ReportPeerResult reports downloading result for the peer task. ReportPeerResult(context.Context, *scheduler.PeerResult) error - // LeaveTask leave peer from scheduler + // LeaveTask makes the peer leaving from scheduling overlay for the task. LeaveTask(context.Context, *scheduler.PeerTarget) error } -func (p *proxy) RegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest) (rr *scheduler.RegisterResult, err error) { - rr, err = p.server.RegisterPeerTask(ctx, ptr) +type proxy struct { + server SchedulerServer + scheduler.UnimplementedSchedulerServer +} - var taskID = "unknown" - var suc bool - var code base.Code +func New(schedulerServer SchedulerServer, opts ...grpc.ServerOption) *grpc.Server { + grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...) + scheduler.RegisterSchedulerServer(grpcServer, &proxy{server: schedulerServer}) + return grpcServer +} - if err == nil && rr != nil { - taskID = rr.TaskId - suc = true +func (p *proxy) RegisterPeerTask(ctx context.Context, req *scheduler.PeerTaskRequest) (*scheduler.RegisterResult, error) { + metrics.RegisterPeerTaskCount.Inc() + taskID := "unknown" + isSuccess := false + resp, err := p.server.RegisterPeerTask(ctx, req) + if err != nil { + taskID = resp.TaskId + isSuccess = true + metrics.RegisterPeerTaskFailureCount.Inc() } - - peerHost := ptr.PeerHost - - logger.StatPeerLogger.Info("register peer task", - zap.Bool("success", suc), - zap.String("taskID", taskID), - zap.String("url", ptr.Url), - zap.String("peerIp", peerHost.Ip), - zap.String("securityDomain", peerHost.SecurityDomain), - zap.String("idc", peerHost.Idc), - zap.String("schedulerIp", iputils.HostIP), - zap.String("schedulerName", iputils.HostName), - zap.Int32("code", int32(code))) - - return + metrics.PeerTaskCounter.WithLabelValues(resp.SizeScope.String()).Inc() + + peerHost := req.PeerHost + logger.StatPeerLogger.Info("Register Peer Task", + zap.Bool("Success", isSuccess), + zap.String("TaskID", taskID), + zap.String("URL", req.Url), + zap.String("PeerIP", peerHost.Ip), + zap.String("PeerHostName", peerHost.HostName), + zap.String("SecurityDomain", peerHost.SecurityDomain), + zap.String("IDC", peerHost.Idc), + zap.String("SchedulerIP", iputils.HostIP), + zap.String("SchedulerHostName", iputils.HostName), + ) + + return resp, err } func (p *proxy) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultServer) error { + metrics.ConcurrentScheduleGauge.Inc() + defer metrics.ConcurrentScheduleGauge.Dec() + return p.server.ReportPieceResult(stream) } -func (p *proxy) ReportPeerResult(ctx context.Context, pr *scheduler.PeerResult) (*empty.Empty, error) { - err := p.server.ReportPeerResult(ctx, pr) - - logger.StatPeerLogger.Info("finish peer task", - zap.Bool("success", pr.Success), - zap.String("peerID", pr.PeerId), - zap.String("taskID", pr.TaskId), - zap.String("URL", pr.Url), - zap.String("IDC", pr.Idc), - zap.String("peerIP", pr.SrcIp), - zap.String("securityDomain", pr.SecurityDomain), - zap.String("schedulerIp", iputils.HostIP), - zap.String("schedulerName", iputils.HostName), - zap.String("contentLength", unit.Bytes(pr.ContentLength).String()), - zap.String("traffic", unit.Bytes(uint64(pr.Traffic)).String()), - zap.Duration("cost", time.Duration(int64(pr.Cost))), - zap.Int32("code", int32(pr.Code))) +func (p *proxy) ReportPeerResult(ctx context.Context, req *scheduler.PeerResult) (*empty.Empty, error) { + metrics.DownloadCount.Inc() + if req.Success { + metrics.P2PTraffic.Add(float64(req.Traffic)) + metrics.PeerTaskDownloadDuration.Observe(float64(req.Cost)) + } else { + metrics.DownloadFailureCount.Inc() + } + + err := p.server.ReportPeerResult(ctx, req) + + logger.StatPeerLogger.Info("Finish Peer Task", + zap.Bool("Success", req.Success), + zap.String("TaskID", req.TaskId), + zap.String("PeerID", req.PeerId), + zap.String("URL", req.Url), + zap.String("PeerIP", req.SrcIp), + zap.String("SecurityDomain", req.SecurityDomain), + zap.String("IDC", req.Idc), + zap.String("SchedulerIP", iputils.HostIP), + zap.String("SchedulerHostName", iputils.HostName), + zap.String("ContentLength", unit.Bytes(req.ContentLength).String()), + zap.String("Traffic", unit.Bytes(uint64(req.Traffic)).String()), + zap.Duration("Cost", time.Duration(int64(req.Cost))), + zap.Int32("Code", int32(req.Code))) + return new(empty.Empty), err } diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index 84d288d636d..e12a76e55e6 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -18,54 +18,21 @@ package rpc import ( "context" - "errors" - "fmt" - "io" - "net" - "os" - "strconv" - "strings" - "sync" - "syscall" "time" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/status" - "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc/base/common" - "d7y.io/dragonfly/v2/pkg/util/fileutils" - "d7y.io/dragonfly/v2/pkg/util/stringutils" -) -type RegisterFunc func(*grpc.Server, interface{}) - -var ( - register RegisterFunc - tcpServer *grpc.Server - unixServer *grpc.Server - mutex sync.Mutex + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" ) -func SetRegister(f RegisterFunc) { - if f == nil { - return - } - - mutex.Lock() - defer mutex.Unlock() - - if register != nil { - panic("duplicated service register") - } - - register = f -} - -var serverOpts = []grpc.ServerOption{ +var DefaultServerOptions = []grpc.ServerOption{ grpc.ConnectionTimeout(10 * time.Second), grpc.InitialConnWindowSize(8 * 1024 * 1024), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ @@ -75,143 +42,20 @@ var serverOpts = []grpc.ServerOption{ MaxConnectionIdle: 5 * time.Minute, }), grpc.MaxConcurrentStreams(100), - grpc.StreamInterceptor(streamServerInterceptor), - grpc.UnaryInterceptor(unaryServerInterceptor), -} - -var sp = struct { - port int - ch chan struct{} - once sync.Once -}{ch: make(chan struct{})} - -func GetTCPServerPort() int { - sp.once.Do(func() { - <-sp.ch - }) - - return sp.port -} - -// for client, start tcp first and then start unix on server process -func StartTCPServer(incrementPort int, upLimit int, impl interface{}, opts ...grpc.ServerOption) error { - for { - if incrementPort > upLimit { - return errors.New("no ports available") - } - - netAddr := dfnet.NetAddr{ - Type: dfnet.TCP, - Addr: fmt.Sprintf(":%d", incrementPort), - } - - if err := startServer(netAddr, impl, opts); err != nil && !isErrAddrInuse(err) { - return err - } else if err == nil { - return nil - } - - incrementPort++ - } -} - -func StartUnixServer(sockPath string, impl interface{}, opts ...grpc.ServerOption) error { - _ = fileutils.DeleteFile(sockPath) - - netAddr := dfnet.NetAddr{ - Type: dfnet.UNIX, - Addr: sockPath, - } - - if err := startServer(netAddr, impl, opts); err != nil { - return err - } - - return nil -} - -// start server with addr and register source -func startServer(netAddr dfnet.NetAddr, impl interface{}, opts []grpc.ServerOption) error { - lis, err := net.Listen(string(netAddr.Type), netAddr.Addr) - - if err != nil { - return err - } - logger.GrpcLogger.Infof("rpc listening on: %s", netAddr.GetEndpoint()) - server := grpc.NewServer(append(serverOpts, opts...)...) - - switch netAddr.Type { - case dfnet.UNIX: - unixServer = server - case dfnet.TCP: - tcpServer = server - - addr := lis.Addr().String() - index := strings.LastIndex(addr, ":") - p, err := strconv.Atoi(stringutils.SubString(addr, index+1, len(addr))) - if err != nil { - return err - } - - sp.port = p - close(sp.ch) - } - - register(server, impl) - - return server.Serve(lis) -} - -func StopServer() { - if unixServer != nil { - unixServer.GracefulStop() - } - - if tcpServer != nil { - tcpServer.GracefulStop() - } -} - -func isErrAddrInuse(err error) bool { - if ope, ok := err.(*net.OpError); ok { - if sse, ok := ope.Err.(*os.SyscallError); ok { - if errno, ok := sse.Err.(syscall.Errno); ok { - return errno == syscall.EADDRINUSE - } - } - } - - return false -} - -type wrappedServerStream struct { - grpc.ServerStream - method string -} - -func (w *wrappedServerStream) RecvMsg(m interface{}) error { - err := w.ServerStream.RecvMsg(m) - if err != nil && err != io.EOF { - logger.GrpcLogger.Errorf("server receive a message: %T error: %v for method: %s", m, err, w.method) - } - - return err -} - -func (w *wrappedServerStream) SendMsg(m interface{}) error { - err := w.ServerStream.SendMsg(m) - if err != nil { - logger.GrpcLogger.Errorf("server send a message: %T error: %v for method: %s", m, err, w.method) - } - - return err + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + streamServerInterceptor, + grpc_prometheus.StreamServerInterceptor, + grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), + )), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + unaryServerInterceptor, + grpc_prometheus.UnaryServerInterceptor, + grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), + )), } func streamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - err := handler(srv, &wrappedServerStream{ - ServerStream: ss, - method: info.FullMethod, - }) + err := handler(srv, ss) if err != nil { err = convertServerError(err) logger.GrpcLogger.Errorf("do stream server error: %v for method: %s", err, info.FullMethod) diff --git a/pkg/rpc/server_listen.go b/pkg/rpc/server_listen.go index b67c68a76b3..e2040e44b42 100644 --- a/pkg/rpc/server_listen.go +++ b/pkg/rpc/server_listen.go @@ -19,8 +19,8 @@ package rpc import ( "fmt" "net" - - "google.golang.org/grpc" + "os" + "syscall" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/basic/dfnet" @@ -49,7 +49,7 @@ func ListenWithPortRange(listen string, startPort, endPort int) (net.Listener, i if err == nil && listener != nil { return listener, listener.Addr().(*net.TCPAddr).Port, nil } - if isErrAddrInuse(err) { + if isErrAddr(err) { logger.Warnf("listen port %s:%d is in used, sys error: %s", listen, port, err) continue } else if err != nil { @@ -60,18 +60,14 @@ func ListenWithPortRange(listen string, startPort, endPort int) (net.Listener, i return nil, -1, fmt.Errorf("no available port to listen, port: %d - %d", startPort, endPort) } -type Server interface { - Serve(net.Listener) error - Stop() - GracefulStop() -} +func isErrAddr(err error) bool { + if ope, ok := err.(*net.OpError); ok { + if sse, ok := ope.Err.(*os.SyscallError); ok { + if errno, ok := sse.Err.(syscall.Errno); ok { + return errno == syscall.EADDRINUSE + } + } + } -// NewServer returns a Server with impl -// Example: -// s := NewServer(impl, ...) -// s.Serve(listener) -func NewServer(impl interface{}, opts ...grpc.ServerOption) Server { - server := grpc.NewServer(append(serverOpts, opts...)...) - register(server, impl) - return server + return false } diff --git a/scheduler/config/config.go b/scheduler/config/config.go index 567a1588ac8..a05bfdc87b8 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -35,6 +35,7 @@ type Config struct { Manager *ManagerConfig `yaml:"manager" mapstructure:"manager"` Host *HostConfig `yaml:"host" mapstructure:"host"` Job *JobConfig `yaml:"job" mapstructure:"job"` + Metrics *RestConfig `yaml:"metrics" mapstructure:"metrics"` DisableCDN bool `yaml:"disableCDN" mapstructure:"disableCDN"` } @@ -210,6 +211,10 @@ type GCConfig struct { TaskTTI time.Duration `yaml:"taskTTI" mapstructure:"taskTTI"` } +type RestConfig struct { + Addr string `yaml:"addr" mapstructure:"addr"` +} + type HostConfig struct { // Location for scheduler Location string `mapstructure:"location" yaml:"location"` diff --git a/scheduler/config/config_test.go b/scheduler/config/config_test.go index ed2459bec44..efe24203e06 100644 --- a/scheduler/config/config_test.go +++ b/scheduler/config/config_test.go @@ -49,7 +49,6 @@ func TestSchedulerConfig_Load(t *testing.T) { Host: "foo", Port: 8002, }, - Manager: &ManagerConfig{ Addr: "127.0.0.1:65003", SchedulerClusterID: 1, @@ -73,6 +72,10 @@ func TestSchedulerConfig_Load(t *testing.T) { BackendDB: 2, }, }, + Metrics: &RestConfig{ + Addr: ":8000", + }, + DisableCDN: true, } schedulerConfigYAML := &Config{} diff --git a/scheduler/config/testdata/scheduler.yaml b/scheduler/config/testdata/scheduler.yaml index 6060942e7be..7d1268af47e 100644 --- a/scheduler/config/testdata/scheduler.yaml +++ b/scheduler/config/testdata/scheduler.yaml @@ -48,3 +48,8 @@ job: password: "password" brokerDB: 1 backendDB: 2 + +metrics: + addr: ":8000" + +disableCDN: true diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go new file mode 100644 index 00000000000..56871ba0185 --- /dev/null +++ b/scheduler/metrics/metrics.go @@ -0,0 +1,101 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "net/http" + + "d7y.io/dragonfly/v2/internal/constants" + "d7y.io/dragonfly/v2/scheduler/config" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc" +) + +// Variables declared for metrics. +var ( + RegisterPeerTaskCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.SchedulerMetricsName, + Name: "register_peer_task_total", + Help: "Counter of the number of the register peer task.", + }) + + RegisterPeerTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.SchedulerMetricsName, + Name: "register_peer_task_failure_total", + Help: "Counter of the number of failed of the register peer task.", + }) + + DownloadCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.SchedulerMetricsName, + Name: "download_total", + Help: "Counter of the number of the downloading.", + }) + + DownloadFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.SchedulerMetricsName, + Name: "download_failure_total", + Help: "Counter of the number of failed of the downloading.", + }) + + P2PTraffic = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.SchedulerMetricsName, + Name: "p2p_traffic", + Help: "Counter of the number of p2p traffic.", + }) + + PeerTaskCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.SchedulerMetricsName, + Name: "peer_task_total", + Help: "Counter of the number of peer task.", + }, []string{"type"}) + + PeerTaskDownloadDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.SchedulerMetricsName, + Name: "peer_task_download_duration_milliseconds", + Help: "Histogram of the time each peer task downloading.", + Buckets: []float64{100, 200, 500, 1000, 1500, 2 * 1000, 3 * 1000, 5 * 1000, 10 * 1000, 20 * 1000, 60 * 1000, 120 * 1000, 300 * 1000}, + }) + + ConcurrentScheduleGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.SchedulerMetricsName, + Name: "concurrent_schedule_total", + Help: "Gauger of the number of concurrent of the scheduling.", + }) +) + +func New(cfg *config.RestConfig, grpcServer *grpc.Server) *http.Server { + grpc_prometheus.Register(grpcServer) + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + return &http.Server{ + Addr: cfg.Addr, + Handler: mux, + } +} diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index 40d02c3e161..b78fb7ad3d8 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -26,7 +26,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" - "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" + schedulerserver "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" "d7y.io/dragonfly/v2/pkg/util/net/urlutils" "d7y.io/dragonfly/v2/pkg/util/stringutils" "d7y.io/dragonfly/v2/scheduler/config" @@ -34,22 +34,27 @@ import ( "d7y.io/dragonfly/v2/scheduler/supervisor" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" ) var tracer = otel.Tracer("scheduler-server") -type SchedulerServer struct { +type server struct { + *grpc.Server service *core.SchedulerService } -// NewSchedulerServer returns a new transparent scheduler server from the given options -func NewSchedulerServer(service *core.SchedulerService) (server.SchedulerServer, error) { - return &SchedulerServer{ +// New returns a new transparent scheduler server from the given options +func New(service *core.SchedulerService, opts ...grpc.ServerOption) (*grpc.Server, error) { + svr := &server{ service: service, - }, nil + } + + svr.Server = schedulerserver.New(svr, opts...) + return svr.Server, nil } -func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *scheduler.PeerTaskRequest) (resp *scheduler.RegisterResult, err error) { +func (s *server) RegisterPeerTask(ctx context.Context, request *scheduler.PeerTaskRequest) (resp *scheduler.RegisterResult, err error) { defer func() { logger.Debugf("peer %s register result %v, err: %v", request.PeerId, resp, err) }() @@ -116,7 +121,7 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul } } -func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultServer) error { +func (s *server) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultServer) error { var span trace.Span ctx, span := tracer.Start(stream.Context(), config.SpanReportPieceResult, trace.WithSpanKind(trace.SpanKindServer)) defer span.End() @@ -175,7 +180,7 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie } } -func (s *SchedulerServer) ReportPeerResult(ctx context.Context, result *scheduler.PeerResult) (err error) { +func (s *server) ReportPeerResult(ctx context.Context, result *scheduler.PeerResult) (err error) { logger.Debugf("report peer result %v", result) var span trace.Span ctx, span = tracer.Start(ctx, config.SpanReportPeerResult, trace.WithSpanKind(trace.SpanKindServer)) @@ -193,7 +198,7 @@ func (s *SchedulerServer) ReportPeerResult(ctx context.Context, result *schedule return s.service.HandlePeerResult(ctx, peer, result) } -func (s *SchedulerServer) LeaveTask(ctx context.Context, target *scheduler.PeerTarget) (err error) { +func (s *server) LeaveTask(ctx context.Context, target *scheduler.PeerTarget) (err error) { logger.Debugf("leave task %v", target) var span trace.Span ctx, span = tracer.Start(ctx, config.SpanPeerLeave, trace.WithSpanKind(trace.SpanKindServer)) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index d97d02e706f..f263609cdda 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -18,6 +18,8 @@ package scheduler import ( "context" + "net/http" + "time" "d7y.io/dragonfly/v2/cmd/dependency" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -26,24 +28,44 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/manager" managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client" - "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" "d7y.io/dragonfly/v2/pkg/util/net/iputils" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/core" "d7y.io/dragonfly/v2/scheduler/job" + "d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/rpcserver" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" ) +const ( + gracefulStopTimeout = 10 * time.Second +) + type Server struct { - config *config.Config - schedulerServer server.SchedulerServer - schedulerService *core.SchedulerService - managerClient managerclient.Client - dynConfig config.DynconfigInterface - job job.Job - gc gc.GC + // Server configuration + config *config.Config + + // GRPC server + grpcServer *grpc.Server + + // Metrics server + metricsServer *http.Server + + // Scheduler service + service *core.SchedulerService + + // Manager client + managerClient managerclient.Client + + // Dynamic config + dynConfig config.DynconfigInterface + + // Async job + job job.Job + + // GC server + gc gc.GC } func New(cfg *config.Config) (*Server, error) { @@ -94,22 +116,31 @@ func New(cfg *config.Config) (*Server, error) { if cfg.Options.Telemetry.Jaeger != "" { openTel = true } - schedulerService, err := core.NewSchedulerService(cfg.Scheduler, dynConfig, s.gc, core.WithDisableCDN(cfg.DisableCDN), core.WithOpenTel(openTel)) + service, err := core.NewSchedulerService(cfg.Scheduler, dynConfig, s.gc, core.WithDisableCDN(cfg.DisableCDN), core.WithOpenTel(openTel)) if err != nil { return nil, err } - s.schedulerService = schedulerService + s.service = service // Initialize grpc service - schedulerServer, err := rpcserver.NewSchedulerServer(schedulerService) + var opts []grpc.ServerOption + if s.config.Options.Telemetry.Jaeger != "" { + opts = append(opts, grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor())) + } + grpcServer, err := rpcserver.New(s.service, opts...) if err != nil { return nil, err } - s.schedulerServer = schedulerServer + s.grpcServer = grpcServer + + // Initialize prometheus + if cfg.Metrics != nil { + s.metricsServer = metrics.New(cfg.Metrics, grpcServer) + } // Initialize job service if cfg.Job.Redis.Host != "" { - s.job, err = job.New(context.Background(), cfg.Job, iputils.HostName, s.schedulerService) + s.job, err = job.New(context.Background(), cfg.Job, iputils.HostName, s.service) if err != nil { return nil, err } @@ -131,9 +162,9 @@ func (s *Server) Serve() error { s.gc.Serve() logger.Info("gc start successfully") - // Serve schedulerService + // Serve service go func() { - s.schedulerService.Serve() + s.service.Serve() logger.Info("scheduler service start successfully") }() @@ -147,6 +178,19 @@ func (s *Server) Serve() error { }() } + // Started metrics server + if s.metricsServer != nil { + go func() { + logger.Infof("started metrics server at %s", s.metricsServer.Addr) + if err := s.metricsServer.ListenAndServe(); err != nil { + if err == http.ErrServerClosed { + return + } + logger.Fatalf("metrics server closed unexpect: %+v", err) + } + }() + } + // Serve Keepalive if s.managerClient != nil { go func() { @@ -159,19 +203,17 @@ func (s *Server) Serve() error { }() } - // Serve GRPC - logger.Infof("start server at port %d", s.config.Server.Port) - var opts []grpc.ServerOption - if s.config.Options.Telemetry.Jaeger != "" { - opts = append(opts, grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor())) + // Generate GRPC listener + lis, _, err := rpc.ListenWithPortRange(s.config.Server.IP, s.config.Server.Port, s.config.Server.Port) + if err != nil { + logger.Fatalf("net listener failed to start: %+v", err) } - if err := rpc.StartTCPServer( - s.config.Server.Port, - s.config.Server.Port, - s.schedulerServer, - opts..., - ); err != nil { - logger.Errorf("grpc start failed %v", err) + defer lis.Close() + + // Started GRPC server + logger.Infof("started grpc server at %s://%s", lis.Addr().Network(), lis.Addr().String()) + if err := s.grpcServer.Serve(lis); err != nil { + logger.Errorf("stoped grpc server: %+v", err) return err } @@ -179,20 +221,47 @@ func (s *Server) Serve() error { } func (s *Server) Stop() { + // Stop dynamic server s.dynConfig.Stop() logger.Info("dynconfig client closed") + // Stop manager client if s.managerClient != nil { - s.managerClient.Close() + if err := s.managerClient.Close(); err != nil { + logger.Errorf("manager client failed to stop: %+v", err) + } logger.Info("manager client closed") } + // Stop GC s.gc.Stop() logger.Info("gc closed") - s.schedulerService.Stop() + // Stop scheduler service + s.service.Stop() logger.Info("scheduler service closed") - rpc.StopServer() - logger.Info("grpc server closed under request") + // Stop metrics server + if s.metricsServer != nil { + if err := s.metricsServer.Shutdown(context.Background()); err != nil { + logger.Errorf("metrics server failed to stop: %+v", err) + } + logger.Info("metrics server closed under request") + } + + // Stop GRPC server + stopped := make(chan struct{}) + go func() { + s.grpcServer.GracefulStop() + logger.Info("grpc server closed under request") + close(stopped) + }() + + t := time.NewTimer(gracefulStopTimeout) + select { + case <-t.C: + s.grpcServer.Stop() + case <-stopped: + t.Stop() + } }