Skip to content

Commit

Permalink
added validation endpoint support
Browse files Browse the repository at this point in the history
  • Loading branch information
cjimti committed Jan 28, 2021
1 parent e5e3134 commit 939b139
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 34 deletions.
187 changes: 175 additions & 12 deletions amp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ import (
"k8s.io/client-go/kubernetes"
)

type AdmissionReview string

const (
AdmisionReviewValidate AdmissionReview = "validate"
AdmisionReviewMutate AdmissionReview = "mutate"
)

// Config configures the API
type Config struct {
Log *zap.Logger
HttpClient *http.Client
Cs *kubernetes.Clientset
EpAnnotation string
Log *zap.Logger
HttpClient *http.Client
Cs *kubernetes.Clientset
EpAnnotation string
EpValidatingAnnotation string
}

// Api
Expand Down Expand Up @@ -82,8 +90,8 @@ func NewApi(cfg *Config) (*Api, error) {
return a, nil
}

// MutatePodsHandler
func (a *Api) MutatePodsHandler() gin.HandlerFunc {
// AdmissionReviewHandler
func (a *Api) AdmissionReviewHandler(admissionReview AdmissionReview) gin.HandlerFunc {
return func(c *gin.Context) {
rs, err := c.GetRawData()
if err != nil {
Expand All @@ -103,7 +111,7 @@ func (a *Api) MutatePodsHandler() gin.HandlerFunc {
return
}

a.Log.Info("handling request")
a.Log.Info("Handling AdmissionReview request", zap.Any("type", admissionReview))

// The AdmissionReview that was sent to the web hook
requestedAdmissionReview := admissionv1.AdmissionReview{}
Expand All @@ -116,21 +124,176 @@ func (a *Api) MutatePodsHandler() gin.HandlerFunc {
a.Log.Error("decode error", zap.Error(err))
responseAdmissionReview.Response = toAdmissionResponse(err)
} else {
// pass to mutatePod
responseAdmissionReview.Response = a.mutatePod(requestedAdmissionReview)
// mutate
if admissionReview == AdmisionReviewMutate {
responseAdmissionReview.Response = a.mutatePod(requestedAdmissionReview)
}

// validate
if admissionReview == AdmisionReviewValidate {
responseAdmissionReview.Response = a.validatePod(requestedAdmissionReview)
}
}

// Return the same UID
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
responseAdmissionReview.Kind = "AdmissionReview"
responseAdmissionReview.APIVersion = "admission.k8s.io/v1"

a.Log.Info("sending response", zap.ByteString("value", responseAdmissionReview.Response.Patch))
a.Log.Info("Returning response to Kubernetes", zap.ByteString("value", responseAdmissionReview.Response.Patch))

c.JSON(http.StatusOK, responseAdmissionReview)
}
}

// validatePod
func (a *Api) validatePod(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
a.Log.Info("started validatePod admission review",
zap.Bool("DryRun", *ar.Request.DryRun),
zap.String("Namespace", ar.Request.Namespace))

logInfo := []zap.Field{
zap.String("namespace", ar.Request.Namespace),
zap.String("annotation", a.EpAnnotation),
}

reviewResponse := admissionv1.AdmissionResponse{}

// amp is for pods
podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
if ar.Request.Resource != podResource {
a.Log.Error("unexpected resource",
append(logInfo,
zap.String("expected", podResource.Resource),
zap.String("received", ar.Request.Resource.Resource),
)...,
)
return nil
}

raw := ar.Request.Object.Raw
pod := corev1.Pod{}
deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
a.Log.Error("deserializer failure", zap.Error(err))
return &reviewResponse
}
logInfo = append(logInfo, zap.String("Pod", pod.Name))

a.Log.Info("Pod for validation review",
append(logInfo,
zap.Any("PodLabels", pod.Labels),
zap.Any("PodAnnotations", pod.Annotations),
zap.Any("PodNamespace", pod.Namespace),
)...,
)

ns, err := a.Cs.CoreV1().Namespaces().Get(ar.Request.Namespace, metav1.GetOptions{})
if err != nil {
a.Log.Error("unable to get namespace",
append(logInfo, zap.Error(err))...,
)
return &reviewResponse
}

// lookup endpoint by namespace annotation
annotations := ns.GetAnnotations()
ep, ok := annotations[a.EpValidatingAnnotation]
if ok == false {
a.Log.Warn("DEFAULT ALLOW if no validation endpoint is configured for namespace.", logInfo...)
reviewResponse.Allowed = true
return &reviewResponse
}

logInfo = append(logInfo,
zap.String("endpoint", ep),
zap.String("annotation", a.EpValidatingAnnotation),
)

a.Log.Info("got validation endpoint from namespace", logInfo...)

body, err := json.Marshal(pod)
if err != nil {
a.Log.Info("unable to marshal pod",
append(logInfo, zap.Error(err))...,
)
reviewResponse.Allowed = false
reviewResponse.Result = &metav1.Status{
Code: 500,
Message: fmt.Sprintf("validatePod is unable to Marshal pod: %s", err.Error()),
}
return &reviewResponse
}

req, err := http.NewRequest("POST", ep, bytes.NewBuffer(body))
if err != nil {
a.Log.Error("Unable to build NewRequest",
append(logInfo, zap.Error(err))...,
)
reviewResponse.Allowed = false
reviewResponse.Result = &metav1.Status{
Code: 500,
Message: fmt.Sprintf("validatePod is unable to build NewRequest: %s", err.Error()),
}
return &reviewResponse
}

resp, err := a.HttpClient.Do(req)
if err != nil {
a.Log.Error("Unable make endpoint request",
append(logInfo, zap.Error(err))...,
)
reviewResponse.Allowed = false
reviewResponse.Result = &metav1.Status{
Code: 500,
Message: fmt.Sprintf("validatePod is unable make endpoint request: %s", err.Error()),
}
return &reviewResponse
}

if resp.StatusCode != http.StatusOK {
a.Log.Error("Endpoint request returned non-200 response",
append(logInfo, zap.Int("http_status_code", resp.StatusCode))...,
)
reviewResponse.Allowed = false
reviewResponse.Result = &metav1.Status{
Code: 500,
Message: fmt.Sprintf("validatePod endpoint returned non-200, got: %v", resp.StatusCode),
}
return &reviewResponse
}

respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
a.Log.Error("Error reading response body",
append(logInfo, zap.Error(err))...,
)
reviewResponse.Allowed = false
reviewResponse.Result = &metav1.Status{
Code: 500,
Message: fmt.Sprintf("validatePod is unable to read endpoint response body: %s", err.Error()),
}
return &reviewResponse
}
_ = resp.Body.Close()

// unmarshal response body into admissionv1.AdmissionResponse
err = json.Unmarshal(respBody, &reviewResponse)
if err != nil {
a.Log.Error("unable to unmarshal response body into admissionv1.AdmissionResponse",
append(logInfo, zap.Error(err))...,
)
reviewResponse.Allowed = false
reviewResponse.Result = &metav1.Status{
Code: 500,
Message: fmt.Sprintf("validatePod is unable to unmarshal response body into admissionv1.AdmissionResponse: %s", err.Error()),
}
return &reviewResponse
}

return &reviewResponse
}

// mutatePod
func (a *Api) mutatePod(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
a.Log.Info("started mutatePod admission review",
Expand All @@ -143,7 +306,7 @@ func (a *Api) mutatePod(ar admissionv1.AdmissionReview) *admissionv1.AdmissionRe
}

reviewResponse := admissionv1.AdmissionResponse{}
// always allow (amp is only for pod mutation)
// always allow, mutation happens first, validation can deny if it needs to
reviewResponse.Allowed = true

podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
Expand All @@ -166,7 +329,7 @@ func (a *Api) mutatePod(ar admissionv1.AdmissionReview) *admissionv1.AdmissionRe
}
logInfo = append(logInfo, zap.String("Pod", pod.Name))

a.Log.Info("Pod for review",
a.Log.Info("Pod for mutation review",
append(logInfo,
zap.Any("PodLabels", pod.Labels),
zap.Any("PodAnnotations", pod.Annotations),
Expand Down
50 changes: 28 additions & 22 deletions cmd/amp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import (
)

var (
ipEnv = getEnv("IP", "127.0.0.1")
portEnv = getEnv("PORT", "8070")
metricsPortEnv = getEnv("METRICS_PORT", "2112")
modeEnv = getEnv("MODE", "release")
httpReadTimeoutEnv = getEnv("HTTP_READ_TIMEOUT", "10")
httpWriteTimeoutEnv = getEnv("HTTP_WRITE_TIMEOUT", "10")
certPathEnv = getEnv("CERT_PATH", "")
epAnnotationEnv = getEnv("ANNOTATION_EP", "amp.txn2.com/ep")
ipEnv = getEnv("IP", "127.0.0.1")
portEnv = getEnv("PORT", "8070")
metricsPortEnv = getEnv("METRICS_PORT", "2112")
modeEnv = getEnv("MODE", "release")
httpReadTimeoutEnv = getEnv("HTTP_READ_TIMEOUT", "10")
httpWriteTimeoutEnv = getEnv("HTTP_WRITE_TIMEOUT", "10")
certPathEnv = getEnv("CERT_PATH", "")
epAnnotationEnv = getEnv("ANNOTATION_EP", "amp.txn2.com/ep")
epValidatingAnnotationEnv = getEnv("VALIDATING_ANNOTATION_EP", "amp.txn2.com/ep/validating")
)

var Version = "0.0.0"
Expand Down Expand Up @@ -71,14 +72,15 @@ func main() {
}

var (
ip = flag.String("ip", ipEnv, "Server IP address to bind to.")
port = flag.String("port", portEnv, "Server port.")
certPath = flag.String("certPath", certPathEnv, "Cert path. If populated will serve TLS.")
metricsPort = flag.String("metricsPort", metricsPortEnv, "Metrics port.")
mode = flag.String("mode", modeEnv, "debug or release")
httpReadTimeout = flag.Int("httpReadTimeout", httpReadTimeoutInt, "HTTP read timeout")
httpWriteTimeout = flag.Int("httpWriteTimeout", httpWriteTimeoutInt, "HTTP write timeout")
epAnnotation = flag.String("epAnnotation", epAnnotationEnv, "Endpoint annotation")
ip = flag.String("ip", ipEnv, "Server IP address to bind to.")
port = flag.String("port", portEnv, "Server port.")
certPath = flag.String("certPath", certPathEnv, "Cert path. If populated will serve TLS.")
metricsPort = flag.String("metricsPort", metricsPortEnv, "Metrics port.")
mode = flag.String("mode", modeEnv, "debug or release")
httpReadTimeout = flag.Int("httpReadTimeout", httpReadTimeoutInt, "HTTP read timeout")
httpWriteTimeout = flag.Int("httpWriteTimeout", httpWriteTimeoutInt, "HTTP write timeout")
epAnnotation = flag.String("epAnnotation", epAnnotationEnv, "Endpoint annotation")
epValidatingAnnotation = flag.String("epValidatingAnnotation", epValidatingAnnotationEnv, "Endpoint annotation for validating")
)
flag.Parse()

Expand Down Expand Up @@ -169,10 +171,11 @@ func main() {

// get api
api, err := amp.NewApi(&amp.Config{
Log: logger,
HttpClient: httpClient,
Cs: cs,
EpAnnotation: *epAnnotation,
Log: logger,
HttpClient: httpClient,
Cs: cs,
EpAnnotation: *epAnnotation,
EpValidatingAnnotation: *epValidatingAnnotation,
})
if err != nil {
logger.Fatal("Error getting API.", zap.Error(err))
Expand All @@ -181,8 +184,11 @@ func main() {
// status
r.GET("/", api.OkHandler(Version, *mode, Service))

// status
r.POST("/mutate", api.MutatePodsHandler())
// validate proxy
r.POST("/validate", api.AdmissionReviewHandler(amp.AdmisionReviewValidate))

// mutate proxy
r.POST("/mutate", api.AdmissionReviewHandler(amp.AdmisionReviewMutate))

// metrics server (run in go routine)
go func() {
Expand Down

0 comments on commit 939b139

Please sign in to comment.