From 939b139db21b55e4d840fcc134710bf849de5dc2 Mon Sep 17 00:00:00 2001 From: Craig Johnston Date: Thu, 28 Jan 2021 15:47:52 -0800 Subject: [PATCH] added validation endpoint support --- amp.go | 187 +++++++++++++++++++++++++++++++++++++++++++++++++---- cmd/amp.go | 50 +++++++------- 2 files changed, 203 insertions(+), 34 deletions(-) diff --git a/amp.go b/amp.go index 7ac22cc..7673844 100644 --- a/amp.go +++ b/amp.go @@ -21,12 +21,20 @@ import ( "k8s.io/client-go/kubernetes" ) +type AdmissionReview string + +const ( + AdmisionReviewValidate AdmissionReview = "validate" + AdmisionReviewMutate AdmissionReview = "mutate" +) + // Config configures the API type Config struct { - Log *zap.Logger - HttpClient *http.Client - Cs *kubernetes.Clientset - EpAnnotation string + Log *zap.Logger + HttpClient *http.Client + Cs *kubernetes.Clientset + EpAnnotation string + EpValidatingAnnotation string } // Api @@ -82,8 +90,8 @@ func NewApi(cfg *Config) (*Api, error) { return a, nil } -// MutatePodsHandler -func (a *Api) MutatePodsHandler() gin.HandlerFunc { +// AdmissionReviewHandler +func (a *Api) AdmissionReviewHandler(admissionReview AdmissionReview) gin.HandlerFunc { return func(c *gin.Context) { rs, err := c.GetRawData() if err != nil { @@ -103,7 +111,7 @@ func (a *Api) MutatePodsHandler() gin.HandlerFunc { return } - a.Log.Info("handling request") + a.Log.Info("Handling AdmissionReview request", zap.Any("type", admissionReview)) // The AdmissionReview that was sent to the web hook requestedAdmissionReview := admissionv1.AdmissionReview{} @@ -116,8 +124,15 @@ func (a *Api) MutatePodsHandler() gin.HandlerFunc { a.Log.Error("decode error", zap.Error(err)) responseAdmissionReview.Response = toAdmissionResponse(err) } else { - // pass to mutatePod - responseAdmissionReview.Response = a.mutatePod(requestedAdmissionReview) + // mutate + if admissionReview == AdmisionReviewMutate { + responseAdmissionReview.Response = a.mutatePod(requestedAdmissionReview) + } + + // validate + if admissionReview == AdmisionReviewValidate { + responseAdmissionReview.Response = a.validatePod(requestedAdmissionReview) + } } // Return the same UID @@ -125,12 +140,160 @@ func (a *Api) MutatePodsHandler() gin.HandlerFunc { responseAdmissionReview.Kind = "AdmissionReview" responseAdmissionReview.APIVersion = "admission.k8s.io/v1" - a.Log.Info("sending response", zap.ByteString("value", responseAdmissionReview.Response.Patch)) + a.Log.Info("Returning response to Kubernetes", zap.ByteString("value", responseAdmissionReview.Response.Patch)) c.JSON(http.StatusOK, responseAdmissionReview) } } +// validatePod +func (a *Api) validatePod(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { + a.Log.Info("started validatePod admission review", + zap.Bool("DryRun", *ar.Request.DryRun), + zap.String("Namespace", ar.Request.Namespace)) + + logInfo := []zap.Field{ + zap.String("namespace", ar.Request.Namespace), + zap.String("annotation", a.EpAnnotation), + } + + reviewResponse := admissionv1.AdmissionResponse{} + + // amp is for pods + podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + if ar.Request.Resource != podResource { + a.Log.Error("unexpected resource", + append(logInfo, + zap.String("expected", podResource.Resource), + zap.String("received", ar.Request.Resource.Resource), + )..., + ) + return nil + } + + raw := ar.Request.Object.Raw + pod := corev1.Pod{} + deserializer := codecs.UniversalDeserializer() + if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil { + a.Log.Error("deserializer failure", zap.Error(err)) + return &reviewResponse + } + logInfo = append(logInfo, zap.String("Pod", pod.Name)) + + a.Log.Info("Pod for validation review", + append(logInfo, + zap.Any("PodLabels", pod.Labels), + zap.Any("PodAnnotations", pod.Annotations), + zap.Any("PodNamespace", pod.Namespace), + )..., + ) + + ns, err := a.Cs.CoreV1().Namespaces().Get(ar.Request.Namespace, metav1.GetOptions{}) + if err != nil { + a.Log.Error("unable to get namespace", + append(logInfo, zap.Error(err))..., + ) + return &reviewResponse + } + + // lookup endpoint by namespace annotation + annotations := ns.GetAnnotations() + ep, ok := annotations[a.EpValidatingAnnotation] + if ok == false { + a.Log.Warn("DEFAULT ALLOW if no validation endpoint is configured for namespace.", logInfo...) + reviewResponse.Allowed = true + return &reviewResponse + } + + logInfo = append(logInfo, + zap.String("endpoint", ep), + zap.String("annotation", a.EpValidatingAnnotation), + ) + + a.Log.Info("got validation endpoint from namespace", logInfo...) + + body, err := json.Marshal(pod) + if err != nil { + a.Log.Info("unable to marshal pod", + append(logInfo, zap.Error(err))..., + ) + reviewResponse.Allowed = false + reviewResponse.Result = &metav1.Status{ + Code: 500, + Message: fmt.Sprintf("validatePod is unable to Marshal pod: %s", err.Error()), + } + return &reviewResponse + } + + req, err := http.NewRequest("POST", ep, bytes.NewBuffer(body)) + if err != nil { + a.Log.Error("Unable to build NewRequest", + append(logInfo, zap.Error(err))..., + ) + reviewResponse.Allowed = false + reviewResponse.Result = &metav1.Status{ + Code: 500, + Message: fmt.Sprintf("validatePod is unable to build NewRequest: %s", err.Error()), + } + return &reviewResponse + } + + resp, err := a.HttpClient.Do(req) + if err != nil { + a.Log.Error("Unable make endpoint request", + append(logInfo, zap.Error(err))..., + ) + reviewResponse.Allowed = false + reviewResponse.Result = &metav1.Status{ + Code: 500, + Message: fmt.Sprintf("validatePod is unable make endpoint request: %s", err.Error()), + } + return &reviewResponse + } + + if resp.StatusCode != http.StatusOK { + a.Log.Error("Endpoint request returned non-200 response", + append(logInfo, zap.Int("http_status_code", resp.StatusCode))..., + ) + reviewResponse.Allowed = false + reviewResponse.Result = &metav1.Status{ + Code: 500, + Message: fmt.Sprintf("validatePod endpoint returned non-200, got: %v", resp.StatusCode), + } + return &reviewResponse + } + + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + a.Log.Error("Error reading response body", + append(logInfo, zap.Error(err))..., + ) + reviewResponse.Allowed = false + reviewResponse.Result = &metav1.Status{ + Code: 500, + Message: fmt.Sprintf("validatePod is unable to read endpoint response body: %s", err.Error()), + } + return &reviewResponse + } + _ = resp.Body.Close() + + // unmarshal response body into admissionv1.AdmissionResponse + err = json.Unmarshal(respBody, &reviewResponse) + if err != nil { + a.Log.Error("unable to unmarshal response body into admissionv1.AdmissionResponse", + append(logInfo, zap.Error(err))..., + ) + reviewResponse.Allowed = false + reviewResponse.Result = &metav1.Status{ + Code: 500, + Message: fmt.Sprintf("validatePod is unable to unmarshal response body into admissionv1.AdmissionResponse: %s", err.Error()), + } + return &reviewResponse + } + + return &reviewResponse +} + // mutatePod func (a *Api) mutatePod(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { a.Log.Info("started mutatePod admission review", @@ -143,7 +306,7 @@ func (a *Api) mutatePod(ar admissionv1.AdmissionReview) *admissionv1.AdmissionRe } reviewResponse := admissionv1.AdmissionResponse{} - // always allow (amp is only for pod mutation) + // always allow, mutation happens first, validation can deny if it needs to reviewResponse.Allowed = true podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} @@ -166,7 +329,7 @@ func (a *Api) mutatePod(ar admissionv1.AdmissionReview) *admissionv1.AdmissionRe } logInfo = append(logInfo, zap.String("Pod", pod.Name)) - a.Log.Info("Pod for review", + a.Log.Info("Pod for mutation review", append(logInfo, zap.Any("PodLabels", pod.Labels), zap.Any("PodAnnotations", pod.Annotations), diff --git a/cmd/amp.go b/cmd/amp.go index 69a1f22..7e8aed3 100644 --- a/cmd/amp.go +++ b/cmd/amp.go @@ -28,14 +28,15 @@ import ( ) var ( - ipEnv = getEnv("IP", "127.0.0.1") - portEnv = getEnv("PORT", "8070") - metricsPortEnv = getEnv("METRICS_PORT", "2112") - modeEnv = getEnv("MODE", "release") - httpReadTimeoutEnv = getEnv("HTTP_READ_TIMEOUT", "10") - httpWriteTimeoutEnv = getEnv("HTTP_WRITE_TIMEOUT", "10") - certPathEnv = getEnv("CERT_PATH", "") - epAnnotationEnv = getEnv("ANNOTATION_EP", "amp.txn2.com/ep") + ipEnv = getEnv("IP", "127.0.0.1") + portEnv = getEnv("PORT", "8070") + metricsPortEnv = getEnv("METRICS_PORT", "2112") + modeEnv = getEnv("MODE", "release") + httpReadTimeoutEnv = getEnv("HTTP_READ_TIMEOUT", "10") + httpWriteTimeoutEnv = getEnv("HTTP_WRITE_TIMEOUT", "10") + certPathEnv = getEnv("CERT_PATH", "") + epAnnotationEnv = getEnv("ANNOTATION_EP", "amp.txn2.com/ep") + epValidatingAnnotationEnv = getEnv("VALIDATING_ANNOTATION_EP", "amp.txn2.com/ep/validating") ) var Version = "0.0.0" @@ -71,14 +72,15 @@ func main() { } var ( - ip = flag.String("ip", ipEnv, "Server IP address to bind to.") - port = flag.String("port", portEnv, "Server port.") - certPath = flag.String("certPath", certPathEnv, "Cert path. If populated will serve TLS.") - metricsPort = flag.String("metricsPort", metricsPortEnv, "Metrics port.") - mode = flag.String("mode", modeEnv, "debug or release") - httpReadTimeout = flag.Int("httpReadTimeout", httpReadTimeoutInt, "HTTP read timeout") - httpWriteTimeout = flag.Int("httpWriteTimeout", httpWriteTimeoutInt, "HTTP write timeout") - epAnnotation = flag.String("epAnnotation", epAnnotationEnv, "Endpoint annotation") + ip = flag.String("ip", ipEnv, "Server IP address to bind to.") + port = flag.String("port", portEnv, "Server port.") + certPath = flag.String("certPath", certPathEnv, "Cert path. If populated will serve TLS.") + metricsPort = flag.String("metricsPort", metricsPortEnv, "Metrics port.") + mode = flag.String("mode", modeEnv, "debug or release") + httpReadTimeout = flag.Int("httpReadTimeout", httpReadTimeoutInt, "HTTP read timeout") + httpWriteTimeout = flag.Int("httpWriteTimeout", httpWriteTimeoutInt, "HTTP write timeout") + epAnnotation = flag.String("epAnnotation", epAnnotationEnv, "Endpoint annotation") + epValidatingAnnotation = flag.String("epValidatingAnnotation", epValidatingAnnotationEnv, "Endpoint annotation for validating") ) flag.Parse() @@ -169,10 +171,11 @@ func main() { // get api api, err := amp.NewApi(&.Config{ - Log: logger, - HttpClient: httpClient, - Cs: cs, - EpAnnotation: *epAnnotation, + Log: logger, + HttpClient: httpClient, + Cs: cs, + EpAnnotation: *epAnnotation, + EpValidatingAnnotation: *epValidatingAnnotation, }) if err != nil { logger.Fatal("Error getting API.", zap.Error(err)) @@ -181,8 +184,11 @@ func main() { // status r.GET("/", api.OkHandler(Version, *mode, Service)) - // status - r.POST("/mutate", api.MutatePodsHandler()) + // validate proxy + r.POST("/validate", api.AdmissionReviewHandler(amp.AdmisionReviewValidate)) + + // mutate proxy + r.POST("/mutate", api.AdmissionReviewHandler(amp.AdmisionReviewMutate)) // metrics server (run in go routine) go func() {