diff --git a/go.mod b/go.mod index 27bcc2b931..83091cf8b9 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,10 @@ require ( go.uber.org/zap v1.10.0 google.golang.org/grpc v1.22.1 gopkg.in/yaml.v2 v2.2.2 + k8s.io/api v0.0.0-20190325185214-7544f9db76f6 + k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841 + k8s.io/client-go v8.0.0+incompatible + k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect ) go 1.13 diff --git a/go.sum b/go.sum index cfde0ef1bd..813496b6ee 100644 --- a/go.sum +++ b/go.sum @@ -14,10 +14,13 @@ github.com/Jeffail/gabs v1.1.0 h1:kw5zCcl9tlJNHTDme7qbi21fDHZmXrnjMoXos3Jw/NI= github.com/Jeffail/gabs v1.1.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= github.com/Microsoft/go-winio v0.4.3 h1:M3NHMuPgMSUPdE5epwNUHlRPSVzHs8HpRTrVXhR0myo= github.com/Microsoft/go-winio v0.4.3/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= +github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0= github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= +github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= +github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/SAP/go-hdb v0.12.0 h1:5hBQZ2jjyZ268qjDmoDZJuCyLzR6oRLI60eYzmTW9m4= github.com/SAP/go-hdb v0.12.0/go.mod h1:etBT+FAi1t5k3K3tf5vQTnosgYmhDkRi8jEnQqCnxF0= github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc h1:LkkwnbY+S8WmwkWq1SVyRWMH9nYWO1P5XN3OD1tts/w= @@ -86,6 +89,7 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbp github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/creasty/defaults v1.3.0 h1:uG+RAxYbJgOPCOdKEcec9ZJXeva7Y6mj/8egdzwmLtw= github.com/creasty/defaults v1.3.0/go.mod h1:CIEEvs7oIVZm30R8VxtFJs+4k201gReYyuYHJxZc68I= +github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -106,7 +110,6 @@ github.com/dubbogo/getty v1.3.3 h1:8m4zZBqFHO+NmhH7rMPlFuuYRVjcPD7cUhumevqMZZs= github.com/dubbogo/getty v1.3.3/go.mod h1:U92BDyJ6sW9Jpohr2Vlz8w2uUbIbNZ3d+6rJvFTSPp0= github.com/dubbogo/go-zookeeper v1.0.0 h1:RsYdlGwhDW+iKXM3eIIcvt34P2swLdmQfuIJxsHlGoM= github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= -github.com/dubbogo/gost v1.5.1 h1:oG5dzaWf1KYynBaBoUIOkgT+YD0niHV6xxI0Odq7hDg= github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8= github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= @@ -114,6 +117,8 @@ github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIh github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= +github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633 h1:H2pdYOb3KQ1/YsqVWoWNLQO+fusocsw354rqGTZtAgw= +github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful/v3 v3.0.0 h1:Duxxa4x0WIHW3bYEDmoAPNjmy8Rbqn+utcF74dlF/G8= github.com/emicklei/go-restful/v3 v3.0.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.8.0 h1:uE6Fp4fOcAJdc1wTQXLJ+SYistkbG1dNoi6Zs1+Ybvk= @@ -126,7 +131,9 @@ github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/structs v0.0.0-20180123065059-ebf56d35bba7 h1:bGT+Ub6bpzHl7AAYQhBrZ5nYTAH2SF/848WducU0Ao4= github.com/fatih/structs v0.0.0-20180123065059-ebf56d35bba7/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= @@ -140,6 +147,10 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-ole/go-ole v1.2.1 h1:2lOsA72HgjxAuMlKpFiCbHTvu44PIVkZ5hqm3RSdI/E= github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= +github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0= +github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= +github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc= +github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I= github.com/go-resty/resty/v2 v2.1.0 h1:Z6IefCpUMfnvItVJaJXWv/pMiiD11So35QgwEELsldE= github.com/go-resty/resty/v2 v2.1.0/go.mod h1:dZGr0i9PLlaaTD4H/hoZIDjQ+r6xq8mgbRzHZf7f2J8= github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4 h1:1LlmVz15APoKz9dnm5j2ePptburJlwEH+/v/pUuoxck= @@ -162,6 +173,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= @@ -180,9 +192,11 @@ github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4r github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 h1:zLTLjkaOFEFIOxY5BWLFLwh+cL8vOBW4XJ2aqLE/Tf0= github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhpy9g= github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/gophercloud/gophercloud v0.0.0-20180828235145-f29afc2cceca h1:wobTb8SE189AuxzEKClyYxiI4nUGWlpVtl13eLiFlOE= @@ -281,6 +295,7 @@ github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 h1:O/pT5C1Q3mVXMyu github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443/go.mod h1:bEpDU35nTu0ey1EXjwNwPjI9xErAsoOCmcMb9GKvyxo= github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d h1:kJCB4vdITiW1eC1vq2e6IsrXKrZit1bv/TDYFGMp4BQ= github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= @@ -299,6 +314,7 @@ github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0 github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/joyent/triton-go v0.0.0-20180628001255-830d2b111e62 h1:JHCT6xuyPUrbbgAPE/3dqlvUKzRHMNuTBKKUb6OeR/k= github.com/joyent/triton-go v0.0.0-20180628001255-830d2b111e62/go.mod h1:U+RSyWxWd04xTqnuOQxnai7XGS2PrPY2cfGoDKtMHjA= +github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo= @@ -329,6 +345,7 @@ github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5Wu github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI= @@ -358,9 +375,11 @@ github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb h1:lbmvw8r9W55w+aQgWn35W1nuleRIECMoqUrmwAOAvoI= github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo= @@ -368,8 +387,12 @@ github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk= github.com/oklog/run v0.0.0-20180308005104-6934b124db28 h1:Hbr3fbVPXea52oPQeP7KLSxP52g6SFaNY1IqAmUyEW0= github.com/oklog/run v0.0.0-20180308005104-6934b124db28/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= +github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -393,6 +416,7 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1 h1:ccV59UEOTzVDnDUEFdT95ZzHVZ+5+158q8+SJb2QV5w= @@ -435,7 +459,6 @@ github.com/smartystreets/assertions v0.0.0-20180820201707-7c9eb446e3cf/go.mod h1 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= -github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 h1:N8Bg45zpk/UcpNGnfJt2y/3lRWASHNTUET8owPYCgYI= github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= @@ -443,16 +466,16 @@ github.com/softlayer/softlayer-go v0.0.0-20180806151055-260589d94c7d h1:bVQRCxQv github.com/softlayer/softlayer-go v0.0.0-20180806151055-260589d94c7d/go.mod h1:Cw4GTlQccdRGSEf6KiMju767x0NEHE0YIVPJSaXjlsw= github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= @@ -485,12 +508,12 @@ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c h1:Vj5n4GlwjmQteupaxJ9+0FNOmBrHfq7vN4btdGoDZgI= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -499,7 +522,6 @@ golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -511,6 +533,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -523,7 +546,7 @@ golang.org/x/sys v0.0.0-20190508220229-2d0786266e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -532,11 +555,11 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZe golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= google.golang.org/api v0.0.0-20180829000535-087779f1d2c9 h1:z1TeLUmxf9ws9KLICfmX+KGXTs+rjm+aGWzfsv7MZ9w= google.golang.org/api v0.0.0-20180829000535-087779f1d2c9/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= @@ -549,13 +572,13 @@ google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= -gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d h1:TxyelI5cVkbREznMhfzycHdkp5cLA7DpE+GKjSslYhM= gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= @@ -569,6 +592,7 @@ gopkg.in/ory-am/dockertest.v3 v3.3.4/go.mod h1:s9mmoLkaGeAh97qygnNj4xWkiN7e1SKek gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -587,3 +611,9 @@ k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841 h1:Q4RZrHNtlC/mSdC1sTrcZ5 k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0= k8s.io/client-go v8.0.0+incompatible h1:tTI4hRmb1DRMl4fG6Vclfdi6nTM82oIrTT7HfitmxC4= k8s.io/client-go v8.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s= +k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= +k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= +k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a h1:UcxjrRMyNx/i/y8G7kPvLyy7rfbeuf1PYyBf973pgyU= +k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E= +sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI= +sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index f9b046a2c5..51fdf21f5d 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -38,9 +38,9 @@ type dataListener struct { listener config_center.ConfigurationListener } -// NewRegistryDataListener ... +// NewRegistryDataListener func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { - return &dataListener{listener: listener, interestedURL: []*common.URL{}} + return &dataListener{listener: listener} } func (l *dataListener) AddInterestedURL(url *common.URL) { @@ -49,7 +49,12 @@ func (l *dataListener) AddInterestedURL(url *common.URL) { func (l *dataListener) DataChange(eventType remoting.Event) bool { - url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):] + index := strings.Index(eventType.Path, "/providers/") + if index == -1 { + logger.Warnf("Listen with no url, event.path={%v}", eventType.Path) + return false + } + url := eventType.Path[index+len("/providers/"):] serviceURL, err := common.NewURL(url) if err != nil { logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err) @@ -68,7 +73,6 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool { return true } } - return false } @@ -97,7 +101,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { case e := <-l.events: logger.Infof("got etcd event %#v", e) - if e.ConfigType == remoting.EventTypeDel { + if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() { select { case <-l.registry.Done(): logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) diff --git a/registry/etcdv3/listener_test.go b/registry/etcdv3/listener_test.go index 928e3fa83d..e691ae3cf1 100644 --- a/registry/etcdv3/listener_test.go +++ b/registry/etcdv3/listener_test.go @@ -18,10 +18,9 @@ package etcdv3 import ( + "os" "testing" "time" - - "github.com/apache/dubbo-go/config_center" ) import ( @@ -32,6 +31,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/remoting" ) @@ -40,13 +40,16 @@ type RegistryTestSuite struct { etcd *embed.Etcd } +const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-registry.etcd" + // start etcd server func (suite *RegistryTestSuite) SetupSuite() { t := suite.T() cfg := embed.NewConfig() - cfg.Dir = "/tmp/default.etcd" + // avoid conflict with default etcd work-dir + cfg.Dir = defaultEtcdV3WorkDir e, err := embed.StartEtcd(cfg) if err != nil { t.Fatal(err) @@ -66,6 +69,10 @@ func (suite *RegistryTestSuite) SetupSuite() { // stop etcd server func (suite *RegistryTestSuite) TearDownSuite() { suite.etcd.Close() + // clean the etcd workdir + if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil { + suite.FailNow(err.Error()) + } } func (suite *RegistryTestSuite) TestDataChange() { diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go index 6e26a8f3fc..dc4e382979 100644 --- a/registry/etcdv3/registry_test.go +++ b/registry/etcdv3/registry_test.go @@ -98,7 +98,7 @@ func (suite *RegistryTestSuite) TestSubscribe() { assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String()) } -func (suite *RegistryTestSuite) TestConsumerDestory() { +func (suite *RegistryTestSuite) TestConsumerDestroy() { t := suite.T() url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) @@ -117,7 +117,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() { } -func (suite *RegistryTestSuite) TestProviderDestory() { +func (suite *RegistryTestSuite) TestProviderDestroy() { t := suite.T() reg := initRegistry(t) diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go new file mode 100644 index 0000000000..f8869fea7b --- /dev/null +++ b/registry/kubernetes/listener.go @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "strings" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting" +) + +type dataListener struct { + interestedURL []*common.URL + listener config_center.ConfigurationListener +} + +// NewRegistryDataListener +func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { + return &dataListener{listener: listener} +} + +// AddInterestedURL +func (l *dataListener) AddInterestedURL(url *common.URL) { + l.interestedURL = append(l.interestedURL, url) +} + +// DataChange +// notify listen, when interest event +func (l *dataListener) DataChange(eventType remoting.Event) bool { + + index := strings.Index(eventType.Path, "/providers/") + if index == -1 { + logger.Warnf("Listen with no url, event.path={%v}", eventType.Path) + return false + } + url := eventType.Path[index+len("/providers/"):] + serviceURL, err := common.NewURL(url) + if err != nil { + logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err) + return false + } + + for _, v := range l.interestedURL { + if serviceURL.URLEqual(*v) { + l.listener.Process( + &config_center.ConfigChangeEvent{ + Key: eventType.Path, + Value: serviceURL, + ConfigType: eventType.Action, + }, + ) + return true + } + } + return false +} + +type configurationListener struct { + registry *kubernetesRegistry + events chan *config_center.ConfigChangeEvent +} + +// NewConfigurationListener for listening the event of kubernetes. +func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener { + // add a new waiter + reg.WaitGroup().Add(1) + return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} +} + +func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { + l.events <- configType +} + +func (l *configurationListener) Next() (*registry.ServiceEvent, error) { + for { + select { + case <-l.registry.Done(): + logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.") + return nil, perrors.New("listener stopped") + + case e := <-l.events: + logger.Infof("got kubernetes event %#v", e) + if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() { + select { + case <-l.registry.Done(): + logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) + default: + } + continue + } + return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil + } + } +} +func (l *configurationListener) Close() { + l.registry.WaitGroup().Done() +} diff --git a/registry/kubernetes/listener_test.go b/registry/kubernetes/listener_test.go new file mode 100644 index 0000000000..c50b5b670a --- /dev/null +++ b/registry/kubernetes/listener_test.go @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "encoding/json" + "os" + "strconv" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting" +) + +var clientPodJsonData = `{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "annotations": { + "dubbo.io/annotation": "W3siayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzL2NvbnN1bWVyJTNBJTJGJTJGMTcyLjE3LjAuOCUyRlVzZXJQcm92aWRlciUzRmNhdGVnb3J5JTNEY29uc3VtZXJzJTI2ZHViYm8lM0RkdWJib2dvLWNvbnN1bWVyLTIuNi4wJTI2cHJvdG9jb2wlM0RkdWJibyIsInYiOiIifV0=" + }, + "creationTimestamp": "2020-03-13T03:38:57Z", + "labels": { + "dubbo.io/label": "dubbo.io-value" + }, + "name": "client", + "namespace": "default", + "resourceVersion": "2449700", + "selfLink": "/api/v1/namespaces/default/pods/client", + "uid": "3ec394f5-dcc6-49c3-8061-57b4b2b41344" + }, + "spec": { + "containers": [ + { + "env": [ + { + "name": "NAMESPACE", + "valueFrom": { + "fieldRef": { + "apiVersion": "v1", + "fieldPath": "metadata.namespace" + } + } + } + ], + "image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client", + "imagePullPolicy": "Always", + "name": "client", + "resources": {}, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "volumeMounts": [ + { + "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount", + "name": "dubbo-sa-token-l2lzh", + "readOnly": true + } + ] + } + ], + "dnsPolicy": "ClusterFirst", + "enableServiceLinks": true, + "nodeName": "minikube", + "priority": 0, + "restartPolicy": "Never", + "schedulerName": "default-scheduler", + "securityContext": {}, + "serviceAccount": "dubbo-sa", + "serviceAccountName": "dubbo-sa", + "terminationGracePeriodSeconds": 30, + "tolerations": [ + { + "effect": "NoExecute", + "key": "node.kubernetes.io/not-ready", + "operator": "Exists", + "tolerationSeconds": 300 + }, + { + "effect": "NoExecute", + "key": "node.kubernetes.io/unreachable", + "operator": "Exists", + "tolerationSeconds": 300 + } + ], + "volumes": [ + { + "name": "dubbo-sa-token-l2lzh", + "secret": { + "defaultMode": 420, + "secretName": "dubbo-sa-token-l2lzh" + } + } + ] + }, + "status": { + "conditions": [ + { + "lastProbeTime": null, + "lastTransitionTime": "2020-03-13T03:38:57Z", + "status": "True", + "type": "Initialized" + }, + { + "lastProbeTime": null, + "lastTransitionTime": "2020-03-13T03:40:18Z", + "status": "True", + "type": "Ready" + }, + { + "lastProbeTime": null, + "lastTransitionTime": "2020-03-13T03:40:18Z", + "status": "True", + "type": "ContainersReady" + }, + { + "lastProbeTime": null, + "lastTransitionTime": "2020-03-13T03:38:57Z", + "status": "True", + "type": "PodScheduled" + } + ], + "containerStatuses": [ + { + "containerID": "docker://2870d6abc19ca7fe22ca635ebcfac5d48c6d5550a659bafd74fb48104f6dfe3c", + "image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client:latest", + "imageID": "docker-pullable://registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client@sha256:1f075131f708a0d400339e81549d7c4d4ed917ab0b6bd38ef458dd06ad25a559", + "lastState": {}, + "name": "client", + "ready": true, + "restartCount": 0, + "state": { + "running": { + "startedAt": "2020-03-13T03:40:17Z" + } + } + } + ], + "hostIP": "10.0.2.15", + "phase": "Running", + "podIP": "172.17.0.8", + "qosClass": "BestEffort", + "startTime": "2020-03-13T03:38:57Z" + } +} +` + +func Test_DataChange(t *testing.T) { + listener := NewRegistryDataListener(&MockDataListener{}) + url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100") + listener.AddInterestedURL(&url) + int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) + assert.Equal(t, true, int) +} + +type MockDataListener struct{} + +func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {} + +type KubernetesRegistryTestSuite struct { + suite.Suite + + currentPod v1.Pod +} + +func (s *KubernetesRegistryTestSuite) initRegistry() *kubernetesRegistry { + + t := s.T() + + regurl, err := common.NewURL("registry://127.0.0.1:443", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + if err != nil { + t.Fatal(err) + } + + mock, err := newMockKubernetesRegistry(®url, s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) { + + out := fake.NewSimpleClientset() + + // mock current pod + if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil { + t.Fatal(err) + } + return out, nil + }) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Second) + return mock.(*kubernetesRegistry) +} + +func (s *KubernetesRegistryTestSuite) SetupSuite() { + + t := s.T() + + const ( + // kubernetes inject the var + podNameKey = "HOSTNAME" + nameSpaceKey = "NAMESPACE" + ) + + // 1. install test data + if err := json.Unmarshal([]byte(clientPodJsonData), &s.currentPod); err != nil { + t.Fatal(err) + } + + // 2. set downward-api inject env + if err := os.Setenv(podNameKey, s.currentPod.GetName()); err != nil { + t.Fatal(err) + } + if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil { + t.Fatal(err) + } + +} + +func (s *KubernetesRegistryTestSuite) TestDataChange() { + + t := s.T() + + listener := NewRegistryDataListener(&MockDataListener{}) + url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100") + listener.AddInterestedURL(&url) + if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) { + t.Fatal("data change not ok") + } +} + +func TestKubernetesRegistrySuite(t *testing.T) { + suite.Run(t, &KubernetesRegistryTestSuite{}) +} diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go new file mode 100644 index 0000000000..7212a83d63 --- /dev/null +++ b/registry/kubernetes/registry.go @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "fmt" + "os" + "path" + "strings" + "sync" + "time" +) + +import ( + "github.com/dubbogo/getty" + "github.com/dubbogo/gost/net" + perrors "github.com/pkg/errors" + k8s "k8s.io/client-go/kubernetes" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting/kubernetes" +) + +var ( + processID = "" + localIP = "" +) + +const ( + Name = "kubernetes" + ConnDelay = 3 + MaxFailTimes = 15 +) + +func init() { + processID = fmt.Sprintf("%d", os.Getpid()) + localIP, _ = gxnet.GetLocalIP() + extension.SetRegistry(Name, newKubernetesRegistry) +} + +type kubernetesRegistry struct { + registry.BaseRegistry + cltLock sync.RWMutex + client *kubernetes.Client + listenerLock sync.Mutex + listener *kubernetes.EventListener + dataListener *dataListener + configListener *configurationListener +} + +func (r *kubernetesRegistry) Client() *kubernetes.Client { + r.cltLock.RLock() + client := r.client + r.cltLock.RUnlock() + return client +} +func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { + r.cltLock.Lock() + r.client = client + r.cltLock.Unlock() +} + +func (r *kubernetesRegistry) CloseAndNilClient() { + r.client.Close() + r.client = nil +} + +func (r *kubernetesRegistry) CloseListener() { + + r.cltLock.Lock() + l := r.configListener + r.cltLock.Unlock() + if l != nil { + l.Close() + } + r.configListener = nil +} + +func (r *kubernetesRegistry) CreatePath(k string) error { + if err := r.client.Create(k, ""); err != nil { + return perrors.WithMessagef(err, "create path %s in kubernetes", k) + } + return nil +} + +func (r *kubernetesRegistry) DoRegister(root string, node string) error { + return r.client.Create(path.Join(root, node), "") +} + +func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { + + var ( + configListener *configurationListener + ) + + r.listenerLock.Lock() + configListener = r.configListener + r.listenerLock.Unlock() + if r.listener == nil { + r.cltLock.Lock() + client := r.client + r.cltLock.Unlock() + if client == nil { + return nil, perrors.New("kubernetes client broken") + } + + r.listenerLock.Lock() + if r.listener == nil { + // double check + r.listener = kubernetes.NewEventListener(r.client) + } + r.listenerLock.Unlock() + } + + //register the svc to dataListener + r.dataListener.AddInterestedURL(svc) + for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") { + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener) + } + + return configListener, nil +} + +func (r *kubernetesRegistry) InitListeners() { + r.listener = kubernetes.NewEventListener(r.client) + r.configListener = NewConfigurationListener(r) + r.dataListener = NewRegistryDataListener(r.configListener) +} + +func newKubernetesRegistry(url *common.URL) (registry.Registry, error) { + + // actually, kubernetes use in-cluster config, + r := &kubernetesRegistry{} + + r.InitBaseRegistry(url, r) + + if err := kubernetes.ValidateClient(r); err != nil { + return nil, perrors.WithStack(err) + } + + r.WaitGroup().Add(1) + go r.HandleClientRestart() + r.InitListeners() + + logger.Debugf("the kubernetes registry started") + + return r, nil +} + +func newMockKubernetesRegistry( + url *common.URL, + namespace string, + clientGeneratorFunc func() (k8s.Interface, error), +) (registry.Registry, error) { + + var err error + + r := &kubernetesRegistry{} + + r.InitBaseRegistry(url, r) + r.client, err = kubernetes.NewMockClient(namespace, clientGeneratorFunc) + if err != nil { + return nil, perrors.WithMessage(err, "new mock client") + } + r.InitListeners() + return r, nil +} + +func (r *kubernetesRegistry) HandleClientRestart() { + + var ( + err error + failTimes int + ) + + defer r.WaitGroup() +LOOP: + for { + select { + case <-r.Done(): + logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...") + break LOOP + // re-register all services + case <-r.Client().Done(): + r.Client().Close() + r.SetClient(nil) + + // try to connect to kubernetes, + failTimes = 0 + for { + select { + case <-r.Done(): + logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...") + break LOOP + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent + } + err = kubernetes.ValidateClient(r) + logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err)) + + if err == nil { + if r.RestartCallBack() { + break + } + } + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + } + } + } +} + +func timeSecondDuration(sec int) time.Duration { + return time.Duration(sec) * time.Second +} diff --git a/registry/kubernetes/registry_test.go b/registry/kubernetes/registry_test.go new file mode 100644 index 0000000000..ea6d7663a9 --- /dev/null +++ b/registry/kubernetes/registry_test.go @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "strconv" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +func (s *KubernetesRegistryTestSuite) TestRegister() { + + t := s.T() + + r := s.initRegistry() + defer r.Destroy() + + url, _ := common.NewURL( + "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", + common.WithParamsValue(constant.CLUSTER_KEY, "mock"), + common.WithMethods([]string{"GetUser", "AddUser"}), + ) + + err := r.Register(url) + assert.NoError(t, err) + _, _, err = r.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") + if err != nil { + t.Fatal(err) + } +} + +func (s *KubernetesRegistryTestSuite) TestSubscribe() { + + t := s.T() + + r := s.initRegistry() + defer r.Destroy() + + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + + listener, err := r.DoSubscribe(&url) + if err != nil { + t.Fatal(err) + } + time.Sleep(1e9) + + go func() { + err := r.Register(url) + if err != nil { + t.Fatal(err) + } + }() + + serviceEvent, err := listener.Next() + if err != nil { + t.Fatal(err) + } + + t.Logf("got event %s", serviceEvent) +} + +func (s *KubernetesRegistryTestSuite) TestConsumerDestroy() { + + t := s.T() + + r := s.initRegistry() + + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", + common.WithParamsValue(constant.CLUSTER_KEY, "mock"), + common.WithMethods([]string{"GetUser", "AddUser"})) + + _, err := r.DoSubscribe(&url) + if err != nil { + t.Fatal(err) + } + + //listener.Close() + time.Sleep(1e9) + r.Destroy() + + assert.Equal(t, false, r.IsAvailable()) + +} + +func (s *KubernetesRegistryTestSuite) TestProviderDestroy() { + + t := s.T() + + r := s.initRegistry() + + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", + common.WithParamsValue(constant.CLUSTER_KEY, "mock"), + common.WithMethods([]string{"GetUser", "AddUser"})) + err := r.Register(url) + assert.NoError(t, err) + + time.Sleep(1e9) + r.Destroy() + assert.Equal(t, false, r.IsAvailable()) +} + +func (s *KubernetesRegistryTestSuite) TestNewRegistry() { + + t := s.T() + + regUrl, err := common.NewURL("registry://127.0.0.1:443", + common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + if err != nil { + t.Fatal(err) + } + _, err = newKubernetesRegistry(®Url) + if err == nil { + t.Fatal("not in cluster, should be a err") + } +} + +func (s *KubernetesRegistryTestSuite) TestHandleClientRestart() { + + r := s.initRegistry() + r.WaitGroup().Add(1) + go r.HandleClientRestart() + time.Sleep(timeSecondDuration(1)) + r.client.Close() +} diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index fe8e42db9f..bef1760e04 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -43,7 +43,7 @@ type RegistryDataListener struct { // NewRegistryDataListener ... func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener { - return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}} + return &RegistryDataListener{listener: listener} } // AddInterestedURL ... @@ -65,13 +65,19 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path) return false } + for _, v := range l.interestedURL { if serviceURL.URLEqual(*v) { - l.listener.Process(&config_center.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action}) + l.listener.Process( + &config_center.ConfigChangeEvent{ + Key: eventType.Path, + Value: serviceURL, + ConfigType: eventType.Action, + }, + ) return true } } - return false } diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go index d9166fc8ea..895cc2954a 100644 --- a/remoting/etcdv3/client_test.go +++ b/remoting/etcdv3/client_test.go @@ -18,8 +18,8 @@ package etcdv3 import ( - "fmt" "net/url" + "os" "path" "reflect" "strings" @@ -37,6 +37,8 @@ import ( "google.golang.org/grpc/connectivity" ) +const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-remote.etcd" + // tests dataset var tests = []struct { input struct { @@ -92,7 +94,7 @@ func (suite *ClientTestSuite) SetupSuite() { cfg := embed.NewConfig() cfg.LPUrls = []url.URL{*lpurl} cfg.LCUrls = []url.URL{*lcurl} - cfg.Dir = "/tmp/default.etcd" + cfg.Dir = defaultEtcdV3WorkDir e, err := embed.StartEtcd(cfg) if err != nil { t.Fatal(err) @@ -112,6 +114,9 @@ func (suite *ClientTestSuite) SetupSuite() { // stop etcd server func (suite *ClientTestSuite) TearDownSuite() { suite.etcd.Close() + if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil { + suite.FailNow(err.Error()) + } } func (suite *ClientTestSuite) setUpClient() *Client { @@ -135,8 +140,6 @@ func (suite *ClientTestSuite) SetupTest() { func (suite *ClientTestSuite) TestClientClose() { - fmt.Println("called client close") - c := suite.client t := suite.T() @@ -148,8 +151,6 @@ func (suite *ClientTestSuite) TestClientClose() { func (suite *ClientTestSuite) TestClientValid() { - fmt.Println("called client valid") - c := suite.client t := suite.T() diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index a51a68bce7..e3cb74e4f6 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -53,7 +53,6 @@ func NewEventListener(client *Client) *EventListener { // this method will return true when spec key deleted, // this method will return false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { - l.wg.Add(1) defer l.wg.Done() for { wc, err := l.client.Watch(key) @@ -138,8 +137,6 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin // ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { - - l.wg.Add(1) defer l.wg.Done() for { wc, err := l.client.WatchWithPrefix(prefix) @@ -202,7 +199,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis keyList, valueList, err := l.client.getChildren(key) if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children")) + logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children")) } logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList) @@ -217,12 +214,14 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis } logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key) + l.wg.Add(1) go func(key string, listener remoting.DataListener) { l.ListenServiceNodeEventWithPrefix(key, listener) logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) }(key, listener) logger.Infof("listen dubbo service key{%s}", key) + l.wg.Add(1) go func(key string) { if l.ListenServiceNodeEvent(key) { listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go new file mode 100644 index 0000000000..0c9ffd2b91 --- /dev/null +++ b/remoting/kubernetes/client.go @@ -0,0 +1,692 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "context" + "encoding/base64" + "encoding/json" + "os" + "sync" + "time" +) + +import ( + perrors "github.com/pkg/errors" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +import ( + "github.com/apache/dubbo-go/common/logger" +) + +const ( + // kubernetes inject the var + podNameKey = "HOSTNAME" + nameSpaceKey = "NAMESPACE" + // all pod annotation key + DubboIOAnnotationKey = "dubbo.io/annotation" + + DubboIOLabelKey = "dubbo.io/label" + DubboIOLabelValue = "dubbo.io-value" +) + +var ( + ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist") +) + +type Client struct { + + // kubernetes connection config + cfg *rest.Config + + // the kubernetes interface + rawClient kubernetes.Interface + + // current pod config + currentPodName string + + ns string + + // current resource version + lastResourceVersion string + + // the memory watcherSet + watcherSet WatcherSet + + // protect the wg && currentPod + lock sync.RWMutex + // current pod status + currentPod *v1.Pod + // protect the watchPods loop && watcher + wg sync.WaitGroup + + // manage the client lifecycle + ctx context.Context + cancel context.CancelFunc +} + +// load CurrentPodName +func getCurrentPodName() (string, error) { + + v := os.Getenv(podNameKey) + if len(v) == 0 { + return "", perrors.New("read value from env by key (HOSTNAME)") + } + return v, nil +} + +// load CurrentNameSpace +func getCurrentNameSpace() (string, error) { + + v := os.Getenv(nameSpaceKey) + if len(v) == 0 { + return "", perrors.New("read value from env by key (NAMESPACE)") + } + return v, nil +} + +// NewMockClient +// export for registry package test +func NewMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) { + return newMockClient(namespace, mockClientGenerator) +} + +// newMockClient +// new a client for test +func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) { + + rawClient, err := mockClientGenerator() + if err != nil { + return nil, perrors.WithMessage(err, "call mock generator") + } + + currentPodName, err := getCurrentPodName() + if err != nil { + return nil, perrors.WithMessage(err, "get pod name") + } + + ctx, cancel := context.WithCancel(context.Background()) + + c := &Client{ + currentPodName: currentPodName, + ns: namespace, + rawClient: rawClient, + ctx: ctx, + watcherSet: newWatcherSet(ctx), + cancel: cancel, + } + + currentPod, err := c.initCurrentPod() + if err != nil { + return nil, perrors.WithMessage(err, "init current pod") + } + + // record current status + c.currentPod = currentPod + + // init the watcherSet by current pods + if err := c.initWatchSet(); err != nil { + return nil, perrors.WithMessage(err, "init watcherSet") + } + + c.lastResourceVersion = c.currentPod.GetResourceVersion() + + // start kubernetes watch loop + if err := c.watchPods(); err != nil { + return nil, perrors.WithMessage(err, "watch pods") + } + + logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name) + return c, nil +} + +// newClient +// new a client for registry +func newClient(namespace string) (*Client, error) { + + cfg, err := rest.InClusterConfig() + if err != nil { + return nil, perrors.WithMessage(err, "get in-cluster config") + } + + rawClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, perrors.WithMessage(err, "new kubernetes client by in cluster config") + } + + currentPodName, err := getCurrentPodName() + if err != nil { + return nil, perrors.WithMessage(err, "get pod name") + } + + ctx, cancel := context.WithCancel(context.Background()) + + c := &Client{ + currentPodName: currentPodName, + ns: namespace, + cfg: cfg, + rawClient: rawClient, + ctx: ctx, + watcherSet: newWatcherSet(ctx), + cancel: cancel, + } + + currentPod, err := c.initCurrentPod() + if err != nil { + return nil, perrors.WithMessage(err, "init current pod") + } + + // record current status + c.currentPod = currentPod + + // init the watcherSet by current pods + if err := c.initWatchSet(); err != nil { + return nil, perrors.WithMessage(err, "init watcherSet") + } + + // start kubernetes watch loop + if err := c.watchPods(); err != nil { + return nil, perrors.WithMessage(err, "watch pods") + } + + logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name) + return c, nil +} + +// initCurrentPod +// 1. get current pod +// 2. give the dubbo-label for this pod +func (c *Client) initCurrentPod() (*v1.Pod, error) { + + // read the current pod status + currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{}) + if err != nil { + return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns) + } + + oldPod, newPod, err := c.assembleDUBBOLabel(currentPod) + if err != nil { + if err != ErrDubboLabelAlreadyExist { + return nil, perrors.WithMessage(err, "assemble dubbo label") + } + // current pod don't have label + } + + p, err := c.getPatch(oldPod, newPod) + if err != nil { + return nil, perrors.WithMessage(err, "get patch") + } + + currentPod, err = c.patchCurrentPod(p) + if err != nil { + return nil, perrors.WithMessage(err, "patch to current pod") + } + + return currentPod, nil +} + +// initWatchSet +// 1. get all with dubbo label pods +// 2. put every element to watcherSet +func (c *Client) initWatchSet() error { + + pods, err := c.rawClient.CoreV1().Pods(c.ns).List(metav1.ListOptions{ + LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), + }) + if err != nil { + return perrors.WithMessagef(err, "list pods in namespace (%s)", c.ns) + } + + // set resource version + c.lastResourceVersion = pods.GetResourceVersion() + + for _, pod := range pods.Items { + logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations()) + c.handleWatchedPodEvent(&pod, watch.Added) + } + + return nil +} + +// watchPods +// try to watch kubernetes pods +func (c *Client) watchPods() error { + + // try once + watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{ + LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), + Watch: true, + ResourceVersion: c.lastResourceVersion, + }) + if err != nil { + return perrors.WithMessagef(err, "try to watch the namespace (%s) pods", c.ns) + } + + watcher.Stop() + + c.wg.Add(1) + // add wg, grace close the client + go c.watchPodsLoop() + return nil +} + +type resourceVersionGetter interface { + GetResourceVersion() string +} + +// watchPods +// try to notify +func (c *Client) watchPodsLoop() { + + defer func() { + // notify other goroutine, this loop over + c.wg.Done() + logger.Info("watchPodsLoop goroutine game over") + }() + + for { + onceWatch: + wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{ + LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), + Watch: true, + ResourceVersion: c.lastResourceVersion, + }) + if err != nil { + logger.Warnf("watch the namespace (%s) pods: %v, retry after 2 seconds", c.ns, err) + time.Sleep(2 * time.Second) + continue + } + + logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", c.lastResourceVersion) + + for { + select { + // double check ctx + case <-c.ctx.Done(): + logger.Infof("the kubernetes client stopped, resultChan len %d", len(wc.ResultChan())) + return + + // get one element from result-chan + case event, ok := <-wc.ResultChan(): + if !ok { + wc.Stop() + logger.Info("kubernetes watch chan die, create new") + goto onceWatch + } + + if event.Type == watch.Error { + // watched a error event + logger.Warnf("kubernetes watch api report err (%#v)", event) + continue + } + + o, ok := event.Object.(resourceVersionGetter) + if !ok { + logger.Warnf("kubernetes response object not a versioned object, its real type %T", event.Object) + continue + } + + // record the last resource version avoid to sync all pod + c.lastResourceVersion = o.GetResourceVersion() + logger.Infof("kubernetes get the current resource version %v", c.lastResourceVersion) + + // check event object type + p, ok := event.Object.(*v1.Pod) + if !ok { + logger.Warnf("kubernetes response object not a Pod, its real type %T", event.Object) + continue + } + + logger.Debugf("kubernetes got pod %#v", p) + // handle the watched pod + go c.handleWatchedPodEvent(p, event.Type) + } + } + } +} + +// handleWatchedPodEvent +// handle watched pod event +func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) { + + for ak, av := range p.GetAnnotations() { + + // not dubbo interest annotation + if ak != DubboIOAnnotationKey { + continue + } + + ol, err := c.unmarshalRecord(av) + if err != nil { + logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err) + return + } + + for _, o := range ol { + + switch eventType { + case watch.Added: + // if pod is added, the record always be create + o.EventType = Create + case watch.Modified: + o.EventType = Update + case watch.Deleted: + o.EventType = Delete + default: + logger.Errorf("no valid kubernetes event-type (%s) ", eventType) + return + } + + logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o) + + if err := c.watcherSet.Put(o); err != nil { + logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err) + return + } + + } + + } +} + +// unmarshalRecord +// unmarshal the kubernetes dubbo annotation value +func (c *Client) unmarshalRecord(record string) ([]*WatcherEvent, error) { + + if len(record) == 0 { + // []*WatcherEvent is nil. + return nil, nil + } + + rawMsg, err := base64.URLEncoding.DecodeString(record) + if err != nil { + return nil, perrors.WithMessagef(err, "decode record (%s)", record) + } + + var out []*WatcherEvent + if err := json.Unmarshal(rawMsg, &out); err != nil { + return nil, perrors.WithMessage(err, "decode json") + } + return out, nil +} + +// marshalRecord +// marshal the kubernetes dubbo annotation value +func (c *Client) marshalRecord(ol []*WatcherEvent) (string, error) { + + msg, err := json.Marshal(ol) + if err != nil { + return "", perrors.WithMessage(err, "json encode object list") + } + return base64.URLEncoding.EncodeToString(msg), nil +} + +// readCurrentPod +// read the current pod status from kubernetes api +func (c *Client) readCurrentPod() (*v1.Pod, error) { + + currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{}) + if err != nil { + return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns) + } + return currentPod, nil +} + +// Create +// create k/v pair in watcher-set +func (c *Client) Create(k, v string) error { + + // the read current pod must be lock, protect every + // create operation can be atomic + c.lock.Lock() + defer c.lock.Unlock() + + // 1. accord old pod && (k, v) assemble new pod dubbo annotion v + // 2. get patch data + // 3. PATCH the pod + currentPod, err := c.readCurrentPod() + if err != nil { + return perrors.WithMessage(err, "read current pod") + } + + oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod) + if err != nil { + return perrors.WithMessage(err, "assemble") + } + + patchBytes, err := c.getPatch(oldPod, newPod) + if err != nil { + return perrors.WithMessage(err, "get patch") + } + + updatedPod, err := c.patchCurrentPod(patchBytes) + if err != nil { + return perrors.WithMessage(err, "patch current pod") + } + + c.currentPod = updatedPod + logger.Debugf("put the @key = %s @value = %s success", k, v) + // not update the watcherSet, the watcherSet should be write by the watchPodsLoop + return nil +} + +// patch current pod +// write new meta for current pod +func (c *Client) patchCurrentPod(patch []byte) (*v1.Pod, error) { + + updatedPod, err := c.rawClient.CoreV1().Pods(c.ns).Patch(c.currentPodName, types.StrategicMergePatchType, patch) + if err != nil { + return nil, perrors.WithMessage(err, "patch in kubernetes pod ") + } + return updatedPod, nil +} + +// assemble the dubbo kubernetes label +// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label +func (c *Client) assembleDUBBOLabel(currentPod *v1.Pod) (*v1.Pod, *v1.Pod, error) { + + var ( + oldPod = &v1.Pod{} + newPod = &v1.Pod{} + ) + + oldPod.Labels = make(map[string]string, 8) + newPod.Labels = make(map[string]string, 8) + + if currentPod.GetLabels() != nil { + + if currentPod.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue { + // already have label + return nil, nil, ErrDubboLabelAlreadyExist + } + } + + // copy current pod labels to oldPod && newPod + for k, v := range currentPod.GetLabels() { + oldPod.Labels[k] = v + newPod.Labels[k] = v + } + // assign new label for current pod + newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue + return oldPod, newPod, nil +} + +// assemble the dubbo kubernetes annotations +// accord the current pod && (k,v) assemble the old-pod, new-pod +func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) { + + oldPod = &v1.Pod{} + newPod = &v1.Pod{} + oldPod.Annotations = make(map[string]string, 8) + newPod.Annotations = make(map[string]string, 8) + + for k, v := range currentPod.GetAnnotations() { + oldPod.Annotations[k] = v + newPod.Annotations[k] = v + } + + al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey]) + if err != nil { + err = perrors.WithMessage(err, "unmarshal record") + return + } + + newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v})) + if err != nil { + err = perrors.WithMessage(err, "marshal record") + return + } + + newPod.Annotations[DubboIOAnnotationKey] = newAnnotations + return +} + +// getPatch +// get the kubernetes pod patch bytes +func (c *Client) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) { + + oldData, err := json.Marshal(oldPod) + if err != nil { + return nil, perrors.WithMessage(err, "marshal old pod") + } + + newData, err := json.Marshal(newPod) + if err != nil { + return nil, perrors.WithMessage(err, "marshal newPod pod") + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{}) + if err != nil { + return nil, perrors.WithMessage(err, "create two-way-merge-patch") + } + return patchBytes, nil +} + +// GetChildren +// get k children list from kubernetes-watcherSet +func (c *Client) GetChildren(k string) ([]string, []string, error) { + + objectList, err := c.watcherSet.Get(k, true) + if err != nil { + return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k) + } + + var kList []string + var vList []string + + for _, o := range objectList { + kList = append(kList, o.Key) + vList = append(vList, o.Value) + } + + return kList, vList, nil +} + +// Watch +// watch on spec key +func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) { + + w, err := c.watcherSet.Watch(k, false) + if err != nil { + return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k) + } + + return w.ResultChan(), w.done(), nil +} + +// Watch +// watch on spec prefix +func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) { + + w, err := c.watcherSet.Watch(prefix, true) + if err != nil { + return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix) + } + + return w.ResultChan(), w.done(), nil +} + +// Valid +// Valid the client +// if return false, the client is die +func (c *Client) Valid() bool { + + select { + case <-c.Done(): + return false + default: + } + c.lock.RLock() + defer c.lock.RUnlock() + return c.rawClient != nil +} + +// Done +// read the client status +func (c *Client) Done() <-chan struct{} { + return c.ctx.Done() +} + +// Stop +// read the client status +func (c *Client) Close() { + + select { + case <-c.ctx.Done(): + //already stopped + return + default: + } + c.cancel() + + // the client ctx be canceled + // will trigger the watcherSet watchers all stopped + // so, just wait + c.wg.Wait() +} + +// ValidateClient +// validate the kubernetes client +func ValidateClient(container clientFacade) error { + + client := container.Client() + + // new Client + if client == nil || client.Valid() { + ns, err := getCurrentNameSpace() + if err != nil { + return perrors.WithMessage(err, "get current namespace") + } + newClient, err := newClient(ns) + if err != nil { + logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err) + return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns) + } + container.SetClient(newClient) + } + + return nil +} diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go new file mode 100644 index 0000000000..342285b345 --- /dev/null +++ b/remoting/kubernetes/client_test.go @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "runtime" + "strings" + "sync" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/suite" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +// tests dataset +var tests = []struct { + input struct { + k string + v string + } +}{ + {input: struct { + k string + v string + }{k: "name", v: "scott.wang"}}, + {input: struct { + k string + v string + }{k: "namePrefix", v: "prefix.scott.wang"}}, + {input: struct { + k string + v string + }{k: "namePrefix1", v: "prefix1.scott.wang"}}, + {input: struct { + k string + v string + }{k: "age", v: "27"}}, +} + +// test dataset prefix +const prefix = "name" + +var clientPodJsonData = `{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "annotations": { + "dubbo.io/annotation": "W3siayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzL2NvbnN1bWVyJTNBJTJGJTJGMTcyLjE3LjAuOCUyRlVzZXJQcm92aWRlciUzRmNhdGVnb3J5JTNEY29uc3VtZXJzJTI2ZHViYm8lM0RkdWJib2dvLWNvbnN1bWVyLTIuNi4wJTI2cHJvdG9jb2wlM0RkdWJibyIsInYiOiIifV0=" + }, + "creationTimestamp": "2020-03-13T03:38:57Z", + "labels": { + "dubbo.io/label": "dubbo.io-value" + }, + "name": "client", + "namespace": "default", + "resourceVersion": "2449700", + "selfLink": "/api/v1/namespaces/default/pods/client", + "uid": "3ec394f5-dcc6-49c3-8061-57b4b2b41344" + }, + "spec": { + "containers": [ + { + "env": [ + { + "name": "NAMESPACE", + "valueFrom": { + "fieldRef": { + "apiVersion": "v1", + "fieldPath": "metadata.namespace" + } + } + } + ], + "image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client", + "imagePullPolicy": "Always", + "name": "client", + "resources": {}, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "volumeMounts": [ + { + "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount", + "name": "dubbo-sa-token-l2lzh", + "readOnly": true + } + ] + } + ], + "dnsPolicy": "ClusterFirst", + "enableServiceLinks": true, + "nodeName": "minikube", + "priority": 0, + "restartPolicy": "Never", + "schedulerName": "default-scheduler", + "securityContext": {}, + "serviceAccount": "dubbo-sa", + "serviceAccountName": "dubbo-sa", + "terminationGracePeriodSeconds": 30, + "tolerations": [ + { + "effect": "NoExecute", + "key": "node.kubernetes.io/not-ready", + "operator": "Exists", + "tolerationSeconds": 300 + }, + { + "effect": "NoExecute", + "key": "node.kubernetes.io/unreachable", + "operator": "Exists", + "tolerationSeconds": 300 + } + ], + "volumes": [ + { + "name": "dubbo-sa-token-l2lzh", + "secret": { + "defaultMode": 420, + "secretName": "dubbo-sa-token-l2lzh" + } + } + ] + }, + "status": { + "conditions": [ + { + "lastProbeTime": null, + "lastTransitionTime": "2020-03-13T03:38:57Z", + "status": "True", + "type": "Initialized" + }, + { + "lastProbeTime": null, + "lastTransitionTime": "2020-03-13T03:40:18Z", + "status": "True", + "type": "Ready" + }, + { + "lastProbeTime": null, + "lastTransitionTime": "2020-03-13T03:40:18Z", + "status": "True", + "type": "ContainersReady" + }, + { + "lastProbeTime": null, + "lastTransitionTime": "2020-03-13T03:38:57Z", + "status": "True", + "type": "PodScheduled" + } + ], + "containerStatuses": [ + { + "containerID": "docker://2870d6abc19ca7fe22ca635ebcfac5d48c6d5550a659bafd74fb48104f6dfe3c", + "image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client:latest", + "imageID": "docker-pullable://registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client@sha256:1f075131f708a0d400339e81549d7c4d4ed917ab0b6bd38ef458dd06ad25a559", + "lastState": {}, + "name": "client", + "ready": true, + "restartCount": 0, + "state": { + "running": { + "startedAt": "2020-03-13T03:40:17Z" + } + } + } + ], + "hostIP": "10.0.2.15", + "phase": "Running", + "podIP": "172.17.0.8", + "qosClass": "BestEffort", + "startTime": "2020-03-13T03:38:57Z" + } +} +` + +type KubernetesClientTestSuite struct { + suite.Suite + + currentPod v1.Pod +} + +func (s *KubernetesClientTestSuite) initClient() *Client { + + t := s.T() + + client, err := newMockClient(s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) { + + out := fake.NewSimpleClientset() + + // mock current pod + if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil { + t.Fatal(err) + } + return out, nil + }) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Second) + return client +} + +func (s *KubernetesClientTestSuite) SetupSuite() { + + runtime.GOMAXPROCS(1) + + t := s.T() + + // 1. install test data + if err := json.Unmarshal([]byte(clientPodJsonData), &s.currentPod); err != nil { + t.Fatal(err) + } + + // 2. set downward-api inject env + if err := os.Setenv(podNameKey, s.currentPod.GetName()); err != nil { + t.Fatal(err) + } + if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil { + t.Fatal(err) + } + + go http.ListenAndServe(":6061", nil) + +} + +func (s *KubernetesClientTestSuite) TestReadCurrentPodName() { + t := s.T() + + n, err := getCurrentPodName() + if err != nil { + t.Fatal(err) + } + + if n != s.currentPod.GetName() { + t.Fatalf("expect %s but got %s", s.currentPod.GetName(), n) + } + +} +func (s *KubernetesClientTestSuite) TestReadCurrentNameSpace() { + t := s.T() + + ns, err := getCurrentNameSpace() + if err != nil { + t.Fatal(err) + } + + if ns != s.currentPod.GetNamespace() { + t.Fatalf("expect %s but got %s", s.currentPod.GetNamespace(), ns) + } + +} +func (s *KubernetesClientTestSuite) TestClientValid() { + + t := s.T() + + client := s.initClient() + defer client.Close() + + if client.Valid() != true { + t.Fatal("client is not valid") + } + + client.Close() + if client.Valid() != false { + t.Fatal("client is valid") + } +} + +func (s *KubernetesClientTestSuite) TestClientDone() { + + t := s.T() + + client := s.initClient() + + go func() { + time.Sleep(time.Second) + client.Close() + }() + + <-client.Done() + + if client.Valid() == true { + t.Fatal("client should be invalid then") + } +} + +func (s *KubernetesClientTestSuite) TestClientCreateKV() { + + t := s.T() + + client := s.initClient() + defer client.Close() + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + + if err := client.Create(k, v); err != nil { + t.Fatal(err) + } + + } +} + +func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() { + + t := s.T() + + client := s.initClient() + defer client.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + + syncDataComplete := make(chan struct{}) + + go func() { + + wc, done, err := client.WatchWithPrefix(prefix) + if err != nil { + t.Fatal(err) + } + + wg.Done() + i := 0 + + for { + select { + case e := <-wc: + i++ + fmt.Printf("got event %v k %s v %s\n", e.EventType, e.Key, e.Value) + if i == 3 { + // already sync all event + syncDataComplete <- struct{}{} + return + } + case <-done: + t.Log("the watcherSet watcher was stopped") + return + } + } + }() + + // wait the watch goroutine start + wg.Wait() + + expect := make(map[string]string) + got := make(map[string]string) + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + + if strings.Contains(k, prefix) { + expect[k] = v + } + + if err := client.Create(k, v); err != nil { + t.Fatal(err) + } + } + + <-syncDataComplete + + // start get all children + kList, vList, err := client.GetChildren(prefix) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < len(kList); i++ { + got[kList[i]] = vList[i] + } + + for expectK, expectV := range expect { + + if got[expectK] != expectV { + t.Fatalf("expect {%s: %s} but got {%s: %v}", expectK, expectV, expectK, got[expectK]) + } + } + +} + +func (s *KubernetesClientTestSuite) TestClientWatchPrefix() { + + t := s.T() + + client := s.initClient() + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + + wc, done, err := client.WatchWithPrefix(prefix) + if err != nil { + t.Fatal(err) + } + + wg.Done() + + for { + select { + case e := <-wc: + t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) + case <-done: + t.Log("the watcherSet watcher was stopped") + return + } + } + }() + + // must wait the watch goroutine work + wg.Wait() + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + + if err := client.Create(k, v); err != nil { + t.Fatal(err) + } + } + + client.Close() +} + +func (s *KubernetesClientTestSuite) TestNewClient() { + + t := s.T() + + _, err := newClient(s.currentPod.GetNamespace()) + if err == nil { + t.Fatal("the out of cluster test should fail") + } + +} + +func (s *KubernetesClientTestSuite) TestClientWatch() { + + t := s.T() + + client := s.initClient() + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + + wc, done, err := client.Watch(prefix) + if err != nil { + t.Fatal(err) + } + wg.Done() + + for { + select { + case e := <-wc: + t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) + case <-done: + t.Log("the watcherSet watcher was stopped") + return + } + } + + }() + + // must wait the watch goroutine already start the watch goroutine + wg.Wait() + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + + if err := client.Create(k, v); err != nil { + t.Fatal(err) + } + } + + client.Close() +} + +func TestKubernetesClient(t *testing.T) { + suite.Run(t, new(KubernetesClientTestSuite)) +} diff --git a/remoting/kubernetes/facade.go b/remoting/kubernetes/facade.go new file mode 100644 index 0000000000..dd15c918b4 --- /dev/null +++ b/remoting/kubernetes/facade.go @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +type clientFacade interface { + Client() *Client + SetClient(*Client) +} diff --git a/remoting/kubernetes/facade_test.go b/remoting/kubernetes/facade_test.go new file mode 100644 index 0000000000..024264ffde --- /dev/null +++ b/remoting/kubernetes/facade_test.go @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "sync" +) +import ( + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +type mockFacade struct { + client *Client + cltLock sync.Mutex + done chan struct{} +} + +func (r *mockFacade) Client() *Client { + return r.client +} + +func (r *mockFacade) SetClient(client *Client) { + r.client = client +} + +func (s *KubernetesClientTestSuite) Test_Facade() { + + t := s.T() + + mockClient, err := newMockClient(s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) { + + out := fake.NewSimpleClientset() + + // mock current pod + if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil { + t.Fatal(err) + } + return out, nil + }) + if err != nil { + t.Fatal(err) + } + + m := &mockFacade{ + client: mockClient, + } + + if err := ValidateClient(m); err == nil { + t.Fatal("out of cluster should err") + } + mockClient.Close() +} diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go new file mode 100644 index 0000000000..4c198c66cc --- /dev/null +++ b/remoting/kubernetes/listener.go @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "sync" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/remoting" +) + +type EventListener struct { + client *Client + keyMapLock sync.RWMutex + keyMap map[string]struct{} + wg sync.WaitGroup +} + +func NewEventListener(client *Client) *EventListener { + return &EventListener{ + client: client, + keyMap: make(map[string]struct{}, 8), + } +} + +// Listen on a spec key +// this method will return true when spec key deleted, +// this method will return false when deep layer connection lose +func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { + defer l.wg.Done() + for { + wc, done, err := l.client.Watch(key) + if err != nil { + logger.Warnf("watch exist{key:%s} = error{%v}", key, err) + return false + } + + select { + + // client stopped + case <-l.client.Done(): + logger.Warnf("kubernetes client stopped") + return false + + // watcherSet watcher stopped + case <-done: + logger.Warnf("kubernetes watcherSet watcher stopped") + return false + + // handle kubernetes-watcherSet events + case e, ok := <-wc: + if !ok { + logger.Warnf("kubernetes-watcherSet watch-chan closed") + return false + } + + if l.handleEvents(e, listener...) { + // if event is delete + return true + } + } + } + + return false +} + +// return true mean the event type is DELETE +// return false mean the event type is CREATE || UPDATE +func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool { + + logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key) + + switch event.EventType { + case Create: + for _, listener := range listeners { + logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataCreated}", event.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Key), + Action: remoting.EventTypeAdd, + Content: string(event.Value), + }) + } + return false + case Update: + for _, listener := range listeners { + logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataChanged}", event.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Key), + Action: remoting.EventTypeUpdate, + Content: string(event.Value), + }) + } + return false + case Delete: + logger.Warnf("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDeleted}", event.Key) + return true + default: + return false + } +} + +// Listen on a set of key with spec prefix +func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { + + defer l.wg.Done() + for { + wc, done, err := l.client.WatchWithPrefix(prefix) + if err != nil { + logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err) + } + + select { + // client stopped + case <-l.client.Done(): + logger.Warnf("kubernetes client stopped") + return + + // watcher stopped + case <-done: + logger.Warnf("kubernetes watcherSet watcher stopped") + return + + // kuberentes-watcherSet event stream + case e, ok := <-wc: + + if !ok { + logger.Warnf("kubernetes-watcherSet watch-chan closed") + return + } + + l.handleEvents(e, listener...) + } + } +} + +// this func is invoked by kubernetes ConsumerRegistry::Registry/ kubernetes ConsumerRegistry::get/kubernetes ConsumerRegistry::getListener +// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent +// | +// --------> ListenServiceNodeEvent +func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { + + l.keyMapLock.RLock() + _, ok := l.keyMap[key] + l.keyMapLock.RUnlock() + if ok { + logger.Warnf("kubernetes-watcherSet key %s has already been listened.", key) + return + } + + l.keyMapLock.Lock() + // double check + if _, ok := l.keyMap[key]; ok { + // another goroutine already set it + l.keyMapLock.Unlock() + return + } + l.keyMap[key] = struct{}{} + l.keyMapLock.Unlock() + + keyList, valueList, err := l.client.GetChildren(key) + if err != nil { + logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children")) + } + + logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList) + + for i, k := range keyList { + logger.Infof("got children list key -> %s", k) + listener.DataChange(remoting.Event{ + Path: k, + Action: remoting.EventTypeAdd, + Content: valueList[i], + }) + } + + logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key) + + l.wg.Add(1) + go func(key string, listener remoting.DataListener) { + l.ListenServiceNodeEventWithPrefix(key, listener) + logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) + }(key, listener) + + logger.Infof("listen dubbo service key{%s}", key) + l.wg.Add(1) + go func(key string) { + if l.ListenServiceNodeEvent(key) { + listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) + } + logger.Warnf("listenSelf(kubernetes key{%s}) goroutine exit now", key) + }(key) +} + +func (l *EventListener) Close() { + l.wg.Wait() +} diff --git a/remoting/kubernetes/listener_test.go b/remoting/kubernetes/listener_test.go new file mode 100644 index 0000000000..a9446782a5 --- /dev/null +++ b/remoting/kubernetes/listener_test.go @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/remoting" +) + +var changedData = ` + dubbo.consumer.request_timeout=3s + dubbo.consumer.connect_timeout=5s + dubbo.application.organization=ikurento.com + dubbo.application.name=BDTService + dubbo.application.module=dubbogo user-info server + dubbo.application.version=0.0.1 + dubbo.application.owner=ZX + dubbo.application.environment=dev + dubbo.registries.hangzhouzk.protocol=zookeeper + dubbo.registries.hangzhouzk.timeout=3s + dubbo.registries.hangzhouzk.address=127.0.0.1:2181 + dubbo.registries.shanghaizk.protocol=zookeeper + dubbo.registries.shanghaizk.timeout=3s + dubbo.registries.shanghaizk.address=127.0.0.1:2182 + dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo + dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider + dubbo.service.com.ikurento.user.UserProvider.loadbalance=random + dubbo.service.com.ikurento.user.UserProvider.warmup=100 + dubbo.service.com.ikurento.user.UserProvider.cluster=failover +` + +type mockDataListener struct { + eventList []remoting.Event + client *Client + changedData string + + rc chan remoting.Event +} + +func (m *mockDataListener) DataChange(eventType remoting.Event) bool { + m.eventList = append(m.eventList, eventType) + if eventType.Content == m.changedData { + m.rc <- eventType + } + return true +} + +func (s *KubernetesClientTestSuite) TestListener() { + + t := s.T() + + var tests = []struct { + input struct { + k string + v string + } + }{ + {input: struct { + k string + v string + }{k: "/dubbo", v: changedData}}, + } + + c := s.initClient() + defer c.Close() + + listener := NewEventListener(c) + dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)} + listener.ListenServiceEvent("/dubbo", dataListener) + + // NOTICE: direct listen will lose create msg + time.Sleep(time.Second) + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + if err := c.Create(k, v); err != nil { + t.Fatal(err) + } + + } + msg := <-dataListener.rc + assert.Equal(t, changedData, msg.Content) +} diff --git a/remoting/kubernetes/watch.go b/remoting/kubernetes/watch.go new file mode 100644 index 0000000000..c99a3ebcc0 --- /dev/null +++ b/remoting/kubernetes/watch.go @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "context" + "strconv" + "strings" + "sync" +) + +import ( + perrors "github.com/pkg/errors" +) + +var ( + ErrWatcherSetAlreadyStopped = perrors.New("the watcher-set already be stopped") + ErrKVPairNotFound = perrors.New("k/v pair not found") +) + +const ( + defaultWatcherChanSize = 100 +) + +type eventType int + +const ( + Create eventType = iota + Update + Delete +) + +func (e eventType) String() string { + + switch e { + case Create: + return "CREATE" + case Update: + return "UPDATE" + case Delete: + return "DELETE" + default: + return "UNKNOWN" + } +} + +// WatcherEvent +// watch event is element in watcherSet +type WatcherEvent struct { + // event-type + EventType eventType `json:"-"` + // the dubbo-go should consume the key + Key string `json:"k"` + // the dubbo-go should consume the value + Value string `json:"v"` +} + +// Watchable WatcherSet +type WatcherSet interface { + + // put the watch event to the watch set + Put(object *WatcherEvent) error + // if prefix is false, + // the len([]*WatcherEvent) == 1 + Get(key string, prefix bool) ([]*WatcherEvent, error) + // watch the spec key or key prefix + Watch(key string, prefix bool) (Watcher, error) + // check the watcher set status + Done() <-chan struct{} +} + +// Watcher +type Watcher interface { + // the watcher's id + ID() string + // result stream + ResultChan() <-chan *WatcherEvent + // Stop the watcher + stop() + // check the watcher status + done() <-chan struct{} +} + +// the watch set implement +type watcherSetImpl struct { + + // Client's ctx, client die, the watch set will die too + ctx context.Context + + // protect watcher-set and watchers + lock sync.RWMutex + + // the key is dubbo-go interest meta + cache map[string]*WatcherEvent + + currentWatcherId uint64 + watchers map[uint64]*watcher +} + +// closeWatchers +// when the watcher-set was closed +func (s *watcherSetImpl) closeWatchers() { + + select { + case <-s.ctx.Done(): + + // parent ctx be canceled, close the watch-set's watchers + s.lock.Lock() + watchers := s.watchers + s.lock.Unlock() + + for _, w := range watchers { + // stop data stream + // close(w.ch) + // stop watcher + w.stop() + } + } +} + +// Watch +// watch on spec key, with or without prefix +func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) { + return s.addWatcher(key, prefix) +} + +// Done +// get the watcher-set status +func (s *watcherSetImpl) Done() <-chan struct{} { + return s.ctx.Done() +} + +// Put +// put the watch event to watcher-set +func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error { + + sendMsg := func(object *WatcherEvent, w *watcher) { + + select { + case <-w.done(): + // the watcher already stop + case w.ch <- object: + // block send the msg + } + } + + s.lock.Lock() + defer s.lock.Unlock() + + if err := s.valid(); err != nil { + return err + } + + // put to watcher-set + if watcherEvent.EventType == Delete { + delete(s.cache, watcherEvent.Key) + } else { + + old, ok := s.cache[watcherEvent.Key] + if ok { + if old.Value == watcherEvent.Value { + // already have this k/v pair + return nil + } + } + + // refresh the watcherEvent + s.cache[watcherEvent.Key] = watcherEvent + } + + // notify watcher + for _, w := range s.watchers { + + w := w + + if !strings.Contains(watcherEvent.Key, w.interested.key) { + // this watcher no interest in this element + continue + } + + if !w.interested.prefix { + if watcherEvent.Key == w.interested.key { + go sendMsg(watcherEvent, w) + } + // not interest + continue + } + go sendMsg(watcherEvent, w) + } + return nil +} + +// valid +func (s *watcherSetImpl) valid() error { + select { + case <-s.ctx.Done(): + return ErrWatcherSetAlreadyStopped + default: + return nil + } +} + +// addWatcher +func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) { + + if err := s.valid(); err != nil { + return nil, err + } + + s.lock.Lock() + defer s.lock.Unlock() + + // increase the watcher-id + s.currentWatcherId++ + + w := &watcher{ + id: s.currentWatcherId, + watcherSet: s, + interested: struct { + key string + prefix bool + }{key: key, prefix: prefix}, + ch: make(chan *WatcherEvent, defaultWatcherChanSize), + exit: make(chan struct{}), + } + s.watchers[s.currentWatcherId] = w + return w, nil +} + +// Get +// get elements from watcher-set +func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) { + + s.lock.RLock() + defer s.lock.RUnlock() + + if err := s.valid(); err != nil { + return nil, err + } + + if !prefix { + for k, v := range s.cache { + if k == key { + return []*WatcherEvent{v}, nil + } + } + // object + return nil, ErrKVPairNotFound + } + + var out []*WatcherEvent + + for k, v := range s.cache { + if strings.Contains(k, key) { + out = append(out, v) + } + } + + if len(out) == 0 { + return nil, ErrKVPairNotFound + } + + return out, nil +} + +// the watcher-set watcher +type watcher struct { + id uint64 + + // the underlay watcherSet + watcherSet *watcherSetImpl + + // the interest topic + interested struct { + key string + prefix bool + } + ch chan *WatcherEvent + + closeOnce sync.Once + exit chan struct{} +} + +// ResultChan +func (w *watcher) ResultChan() <-chan *WatcherEvent { + return w.ch +} + +// ID +// the watcher's id +func (w *watcher) ID() string { + return strconv.FormatUint(w.id, 10) +} + +// stop +// stop the watcher +func (w *watcher) stop() { + + // double close will panic + w.closeOnce.Do(func() { + close(w.exit) + }) +} + +// done +// check watcher status +func (w *watcher) done() <-chan struct{} { + return w.exit +} + +// newWatcherSet +// new watcher set from parent context +func newWatcherSet(ctx context.Context) WatcherSet { + s := &watcherSetImpl{ + ctx: ctx, + cache: map[string]*WatcherEvent{}, + watchers: map[uint64]*watcher{}, + } + go s.closeWatchers() + return s +} diff --git a/remoting/kubernetes/watch_test.go b/remoting/kubernetes/watch_test.go new file mode 100644 index 0000000000..8889103be2 --- /dev/null +++ b/remoting/kubernetes/watch_test.go @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "context" + "strconv" + "sync" + "testing" + "time" +) + +func TestWatchSet(t *testing.T) { + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + s := newWatcherSet(ctx) + + wg := sync.WaitGroup{} + + for i := 0; i < 2; i++ { + + wg.Add(1) + + go func() { + defer wg.Done() + w, err := s.Watch("key-1", false) + if err != nil { + t.Fatal(err) + } + for { + select { + case e := <-w.ResultChan(): + t.Logf("consumer %s got %s\n", w.ID(), e.Key) + + case <-w.done(): + t.Logf("consumer %s stopped", w.ID()) + return + } + } + }() + } + for i := 2; i < 3; i++ { + + wg.Add(1) + go func() { + + defer wg.Done() + w, err := s.Watch("key", true) + if err != nil { + t.Fatal(err) + } + + for { + select { + case e := <-w.ResultChan(): + t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key) + + case <-w.done(): + t.Logf("prefix consumer %s stopped", w.ID()) + return + } + } + }() + } + + for i := 0; i < 5; i++ { + go func(i int) { + if err := s.Put(&WatcherEvent{ + Key: "key-" + strconv.Itoa(i), + Value: strconv.Itoa(i), + }); err != nil { + t.Fatal(err) + } + }(i) + } + + wg.Wait() +} diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 77aa05ee9e..eaf259f441 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -59,7 +59,6 @@ func (l *ZkEventListener) SetClient(client *ZookeeperClient) { // ListenServiceNodeEvent ... func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { - l.wg.Add(1) defer l.wg.Done() var zkEvent zk.Event for { @@ -145,6 +144,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li continue } // listen l service node + l.wg.Add(1) go func(node string, zkPath string, listener remoting.DataListener) { logger.Infof("delete zkNode{%s}", node) if l.ListenServiceNodeEvent(node, listener) { @@ -174,7 +174,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li } func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) { - l.wg.Add(1) defer l.wg.Done() var ( @@ -250,6 +249,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi continue } logger.Infof("listen dubbo service key{%s}", dubboPath) + l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { if l.ListenServiceNodeEvent(zkPath) { listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) @@ -261,6 +261,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi //if zkPath is end of "providers/ & consumers/" we do not listen children dir if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) == -1 && strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 { + l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) @@ -292,6 +293,7 @@ func timeSecondDuration(sec int) time.Duration { // --------> ListenServiceNodeEvent func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { logger.Infof("listen dubbo path{%s}", zkPath) + l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)