From d22ba6185c9b3f6f11d601ec427884b4b3293346 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 15 Nov 2021 14:58:30 +0800 Subject: [PATCH 1/4] feat: add scheduler validation Signed-off-by: Gaius --- pkg/rpc/scheduler/scheduler.proto | 1 - scheduler/rpcserver/rpcserver.go | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/rpc/scheduler/scheduler.proto b/pkg/rpc/scheduler/scheduler.proto index 287a1fe465f..395318ef6e0 100644 --- a/pkg/rpc/scheduler/scheduler.proto +++ b/pkg/rpc/scheduler/scheduler.proto @@ -39,7 +39,6 @@ message PeerTaskRequest{ } message RegisterResult{ - // task id string task_id = 2; // file content length scope for the url diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index b288a648c1f..b06c2103f89 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -63,6 +63,7 @@ func (s *server) RegisterPeerTask(ctx context.Context, request *scheduler.PeerTa ctx, span = tracer.Start(ctx, config.SpanPeerRegister, trace.WithSpanKind(trace.SpanKindServer)) span.SetAttributes(config.AttributePeerRegisterRequest.String(request.String())) defer span.End() + logger.Debugf("register peer task, req: %+v", request) resp = new(scheduler.RegisterResult) if verifyErr := validateParams(request); verifyErr != nil { @@ -71,6 +72,7 @@ func (s *server) RegisterPeerTask(ctx context.Context, request *scheduler.PeerTa span.RecordError(err) return } + taskID := s.service.GenerateTaskID(request.Url, request.UrlMeta, request.PeerId) span.SetAttributes(config.AttributeTaskID.String(taskID)) task := s.service.GetOrCreateTask(ctx, supervisor.NewTask(taskID, request.Url, request.UrlMeta)) From ad73ab098a4080b3ee5b12582edaf7027b8ef14f Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 15 Nov 2021 18:00:38 +0800 Subject: [PATCH 2/4] feat: idgen add peer id Signed-off-by: Gaius --- cdn/cdnutil/cdn_util.go | 7 -- cdn/cdnutil/cdn_util_test.go | 31 ------ cdn/rpcserver/rpcserver.go | 3 +- client/daemon/rpcserver/rpcserver.go | 3 +- client/daemon/transport/transport.go | 4 +- .../peer.go => internal/idgen/peer_id.go | 14 ++- internal/idgen/peer_id_test.go | 103 ++++++++++++++++++ pkg/rpc/scheduler/scheduler.proto | 2 +- pkg/rpc/server.go | 3 + 9 files changed, 120 insertions(+), 50 deletions(-) delete mode 100644 cdn/cdnutil/cdn_util_test.go rename client/clientutil/peer.go => internal/idgen/peer_id.go (74%) create mode 100644 internal/idgen/peer_id_test.go diff --git a/cdn/cdnutil/cdn_util.go b/cdn/cdnutil/cdn_util.go index 3112a1295d6..57ab262f256 100644 --- a/cdn/cdnutil/cdn_util.go +++ b/cdn/cdnutil/cdn_util.go @@ -17,16 +17,9 @@ package cdnutil import ( - "fmt" - "d7y.io/dragonfly/v2/cdn/config" - "d7y.io/dragonfly/v2/pkg/util/net/iputils" ) -func GenCDNPeerID(taskID string) string { - return fmt.Sprintf("%s-%s_%s", iputils.HostName, taskID, "CDN") -} - // ComputePieceSize computes the piece size with specified fileLength. // // If the fileLength<=0, which means failed to get fileLength diff --git a/cdn/cdnutil/cdn_util_test.go b/cdn/cdnutil/cdn_util_test.go deleted file mode 100644 index e1ff8ae141e..00000000000 --- a/cdn/cdnutil/cdn_util_test.go +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cdnutil - -import ( - "fmt" - "testing" - - "d7y.io/dragonfly/v2/pkg/util/net/iputils" -) - -func TestGenCdnPeerID(t *testing.T) { - var taskID = "123456" - if got, want := GenCDNPeerID(taskID), fmt.Sprint(iputils.HostName, "-", taskID, "_CDN"); got != want { - t.Errorf("GenCdnPeerID() = %v, want %v", got, want) - } -} diff --git a/cdn/rpcserver/rpcserver.go b/cdn/rpcserver/rpcserver.go index ae2d328ce6e..82201ce2268 100644 --- a/cdn/rpcserver/rpcserver.go +++ b/cdn/rpcserver/rpcserver.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" - "d7y.io/dragonfly/v2/cdn/cdnutil" "d7y.io/dragonfly/v2/cdn/config" cdnerrors "d7y.io/dragonfly/v2/cdn/errors" "d7y.io/dragonfly/v2/cdn/supervisor" @@ -137,7 +136,7 @@ func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, span.RecordError(err) return err } - peerID := cdnutil.GenCDNPeerID(req.TaskId) + peerID := idgen.CDNPeerID(css.cfg.AdvertiseIP) for piece := range pieceChan { psc <- &cdnsystem.PieceSeed{ PeerId: peerID, diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 072af2477ea..1d899312dd1 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -33,6 +33,7 @@ import ( "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" dfdaemongrpc "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon" dfdaemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" @@ -137,7 +138,7 @@ func (m *server) Download(ctx context.Context, PeerTaskRequest: scheduler.PeerTaskRequest{ Url: req.Url, UrlMeta: req.UrlMeta, - PeerId: clientutil.GenPeerID(m.peerHost), + PeerId: idgen.PeerID(m.peerHost.Ip), PeerHost: m.peerHost, }, Output: req.Output, diff --git a/client/daemon/transport/transport.go b/client/daemon/transport/transport.go index 323bd74c04b..800398b7058 100644 --- a/client/daemon/transport/transport.go +++ b/client/daemon/transport/transport.go @@ -28,10 +28,10 @@ import ( "github.com/go-http-utils/headers" - "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/peer" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/pkg/util/net/httputils" @@ -174,7 +174,7 @@ func NeedUseDragonfly(req *http.Request) bool { // download uses dragonfly to download. func (rt *transport) download(req *http.Request) (*http.Response, error) { url := req.URL.String() - peerID := clientutil.GenPeerID(rt.peerHost) + peerID := idgen.PeerID(rt.peerHost.Ip) log := logger.With("peer", peerID, "component", "transport") log.Infof("start download with url: %s", url) diff --git a/client/clientutil/peer.go b/internal/idgen/peer_id.go similarity index 74% rename from client/clientutil/peer.go rename to internal/idgen/peer_id.go index 8d0749f0751..42813f7ea3f 100644 --- a/client/clientutil/peer.go +++ b/internal/idgen/peer_id.go @@ -14,17 +14,19 @@ * limitations under the License. */ -package clientutil +package idgen import ( "fmt" "os" - "github.com/pborman/uuid" - - "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "github.com/google/uuid" ) -func GenPeerID(peerHost *scheduler.PeerHost) string { - return fmt.Sprintf("%s-%d-%s", peerHost.Ip, os.Getpid(), uuid.New()) +func CDNPeerID(ip string) string { + return fmt.Sprintf("%s-%s", PeerID(ip), "cdn") +} + +func PeerID(ip string) string { + return fmt.Sprintf("%s-%d-%s", ip, os.Getpid(), uuid.New()) } diff --git a/internal/idgen/peer_id_test.go b/internal/idgen/peer_id_test.go new file mode 100644 index 00000000000..6d6d70029d3 --- /dev/null +++ b/internal/idgen/peer_id_test.go @@ -0,0 +1,103 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package idgen + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPeerID(t *testing.T) { + tests := []struct { + name string + ip string + expect func(t *testing.T, d interface{}) + }{ + { + name: "generate PeerID with ipv4", + ip: "127.0.0.1", + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Len(d, 52) + }, + }, + { + name: "generate PeerID with ipv6", + ip: "2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b", + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Len(d, 82) + }, + }, + { + name: "generate PeerID with empty string", + ip: "", + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Len(d, 43) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + data := PeerID(tc.ip) + tc.expect(t, data) + }) + } +} + +func TestCDNPeerID(t *testing.T) { + tests := []struct { + name string + ip string + expect func(t *testing.T, d interface{}) + }{ + { + name: "generate id with ipv4", + ip: "127.0.0.1", + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Len(d, 56) + }, + }, + { + name: "generate id with ipv6", + ip: "2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b", + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Len(d, 86) + }, + }, + { + name: "generate id with empty string", + ip: "", + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Len(d, 47) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + data := CDNPeerID(tc.ip) + tc.expect(t, data) + }) + } +} diff --git a/pkg/rpc/scheduler/scheduler.proto b/pkg/rpc/scheduler/scheduler.proto index 395318ef6e0..1bd31acd6c1 100644 --- a/pkg/rpc/scheduler/scheduler.proto +++ b/pkg/rpc/scheduler/scheduler.proto @@ -25,7 +25,7 @@ option go_package = "d7y.io/dragonfly/v2/pkg/rpc/scheduler"; message PeerTaskRequest{ // universal resource locator for different kind of storage - string url = 1; + string url = 1 [(validate.rules).string.uri = true]; // url meta info base.UrlMeta url_meta = 2; // peer's id and must be global uniqueness diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index 158de2be66f..7453e9b5e51 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -22,6 +22,7 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -44,11 +45,13 @@ var DefaultServerOptions = []grpc.ServerOption{ grpc.MaxConcurrentStreams(100), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( streamServerInterceptor, + grpc_validator.StreamServerInterceptor(), grpc_prometheus.StreamServerInterceptor, grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( unaryServerInterceptor, + grpc_validator.UnaryServerInterceptor(), grpc_prometheus.UnaryServerInterceptor, grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), )), From e4f12c50765b829bca016e59d8e3a87e221fd070 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 15 Nov 2021 18:01:42 +0800 Subject: [PATCH 3/4] feat: remove validation Signed-off-by: Gaius --- pkg/rpc/scheduler/scheduler.proto | 2 +- scheduler/rpcserver/rpcserver.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/rpc/scheduler/scheduler.proto b/pkg/rpc/scheduler/scheduler.proto index 1bd31acd6c1..395318ef6e0 100644 --- a/pkg/rpc/scheduler/scheduler.proto +++ b/pkg/rpc/scheduler/scheduler.proto @@ -25,7 +25,7 @@ option go_package = "d7y.io/dragonfly/v2/pkg/rpc/scheduler"; message PeerTaskRequest{ // universal resource locator for different kind of storage - string url = 1 [(validate.rules).string.uri = true]; + string url = 1; // url meta info base.UrlMeta url_meta = 2; // peer's id and must be global uniqueness diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index b06c2103f89..9e5d3adb5bb 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -63,7 +63,6 @@ func (s *server) RegisterPeerTask(ctx context.Context, request *scheduler.PeerTa ctx, span = tracer.Start(ctx, config.SpanPeerRegister, trace.WithSpanKind(trace.SpanKindServer)) span.SetAttributes(config.AttributePeerRegisterRequest.String(request.String())) defer span.End() - logger.Debugf("register peer task, req: %+v", request) resp = new(scheduler.RegisterResult) if verifyErr := validateParams(request); verifyErr != nil { From f26410ced32bef05d2578cda1723ad970cc705ef Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 15 Nov 2021 18:13:43 +0800 Subject: [PATCH 4/4] feat: remove grpc validate middleware Signed-off-by: Gaius --- pkg/rpc/server.go | 3 --- scheduler/rpcserver/rpcserver.go | 1 - 2 files changed, 4 deletions(-) diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index 7453e9b5e51..158de2be66f 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -22,7 +22,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" - grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -45,13 +44,11 @@ var DefaultServerOptions = []grpc.ServerOption{ grpc.MaxConcurrentStreams(100), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( streamServerInterceptor, - grpc_validator.StreamServerInterceptor(), grpc_prometheus.StreamServerInterceptor, grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( unaryServerInterceptor, - grpc_validator.UnaryServerInterceptor(), grpc_prometheus.UnaryServerInterceptor, grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index 9e5d3adb5bb..b288a648c1f 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -71,7 +71,6 @@ func (s *server) RegisterPeerTask(ctx context.Context, request *scheduler.PeerTa span.RecordError(err) return } - taskID := s.service.GenerateTaskID(request.Url, request.UrlMeta, request.PeerId) span.SetAttributes(config.AttributeTaskID.String(taskID)) task := s.service.GetOrCreateTask(ctx, supervisor.NewTask(taskID, request.Url, request.UrlMeta))