Skip to content

Commit

Permalink
Webhook enhancements (#543)
Browse files Browse the repository at this point in the history
* Cert configuration and reloading

* Add support for strict webhook error handling

* Improve webhook error handling

* Don't deregister the webhook when failure policy is strict

* standard error message capitalization

* have the webhook parse its own configuration from flags

* clean up cert provider code

* Add explanation for skipping deregistration

* Tests and fixes
  • Loading branch information
kevin hogeland authored and liyinan926 committed Aug 15, 2019
1 parent 79ec720 commit 249860f
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 103 deletions.
4 changes: 2 additions & 2 deletions docs/quick-start-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ The operator also sets both `WebUIAddress` which is accessible from within the c

The Kubernetes Operator for Apache Spark comes with an optional mutating admission webhook for customizing Spark driver and executor pods based on the specification in `SparkApplication` objects, e.g., mounting user-specified ConfigMaps and volumes, and setting pod affinity/anti-affinity, and adding tolerations.

The webhook requires a X509 certificate for TLS for pod admission requests and responses between the Kubernetes API server and the webhook server running inside the operator. For that, the certificate and key files must be accessible by the webhook server.
The Kubernetes Operator for Spark ships with a tool at `hack/gencerts.sh` for generating the CA and server certificate and putting the certificate and key files into a secret named `spark-webhook-certs` in the namespace `spark-operator`. This secret will be mounted into the operator pod.
The webhook requires a X509 certificate for TLS for pod admission requests and responses between the Kubernetes API server and the webhook server running inside the operator. For that, the certificate and key files must be accessible by the webhook server. The location of these certs is configurable and they will be reloaded on a configurable period.
The Kubernetes Operator for Spark ships with a tool at `hack/gencerts.sh` for generating the CA and server certificate and putting the certificate and key files into a secret named `spark-webhook-certs` in the namespace `spark-operator`. This secret will be mounted into the operator pod.

Run the following command to create the secret with a certificate and key files using a batch Job, and install the operator Deployment with the mutating admission webhook:

Expand Down
36 changes: 15 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,18 @@ import (
)

var (
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
installCRDs = flag.Bool("install-crds", true, "Whether to install CRDs")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
webhookConfigName = flag.String("webhook-config-name", "spark-webhook-config", "The name of the MutatingWebhookConfiguration object to create.")
webhookCertDir = flag.String("webhook-cert-dir", "/etc/webhook-certs", "The directory where x509 certificate and key files are stored.")
webhookSvcNamespace = flag.String("webhook-svc-namespace", "spark-operator", "The namespace of the Service for the webhook server.")
webhookSvcName = flag.String("webhook-svc-name", "spark-webhook", "The name of the Service for the webhook server.")
webhookPort = flag.Int("webhook-port", 8080, "Service port of the webhook server.")
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressUrlFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
installCRDs = flag.Bool("install-crds", true, "Whether to install CRDs")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressUrlFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
)

func main() {
Expand Down Expand Up @@ -142,13 +137,12 @@ func main() {

var hook *webhook.WebHook
if *enableWebhook {
var err error
hook, err = webhook.New(kubeClient, crInformerFactory, *webhookCertDir, *webhookSvcNamespace, *webhookSvcName, *webhookPort, *namespace)
hook, err = webhook.New(kubeClient, crInformerFactory, *namespace)
if err != nil {
glog.Fatal(err)
}

if err = hook.Start(*webhookConfigName); err != nil {
if err = hook.Start(); err != nil {
glog.Fatal(err)
}
}
Expand All @@ -163,7 +157,7 @@ func main() {
applicationController.Stop()
scheduledApplicationController.Stop()
if *enableWebhook {
if err := hook.Stop(*webhookConfigName); err != nil {
if err := hook.Stop(); err != nil {
glog.Fatal(err)
}
}
Expand Down
73 changes: 64 additions & 9 deletions pkg/webhook/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,79 @@ package webhook
import (
"crypto/tls"
"io/ioutil"
"sync"
"time"

"github.com/golang/glog"
)

// certBundle is a container of a X509 certificate file and a corresponding key file for the
// certProvider is a container of a X509 certificate file and a corresponding key file for the
// webhook server, and a CA certificate file for the API server to verify the server certificate.
type certBundle struct {
serverCertFile string
serverKeyFile string
caCertFile string
type certProvider struct {
serverCertFile string
serverKeyFile string
caCertFile string
reloadInterval time.Duration
ticker *time.Ticker
stopChannel chan interface{}
currentCert *tls.Certificate
certPointerMutex *sync.RWMutex
}

// configServerTLS configures TLS for the admission webhook server.
func configServerTLS(certBundle *certBundle) (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(certBundle.serverCertFile, certBundle.serverKeyFile)
func NewCertProvider(serverCertFile, serverKeyFile, caCertFile string, reloadInterval time.Duration) (*certProvider, error) {
cert, err := tls.LoadX509KeyPair(serverCertFile, serverKeyFile)
if err != nil {
return nil, err
}
return &certProvider{
serverCertFile: serverCertFile,
serverKeyFile: serverKeyFile,
caCertFile: caCertFile,
reloadInterval: reloadInterval,
currentCert: &cert,
stopChannel: make(chan interface{}),
ticker: time.NewTicker(reloadInterval),
certPointerMutex: &sync.RWMutex{},
}, nil
}

func (c *certProvider) Start() {
go func() {
for {
select {
case <-c.stopChannel:
return
case <-c.ticker.C:
c.updateCert()
}
}
}()
}

return &tls.Config{Certificates: []tls.Certificate{cert}}, nil
func (c *certProvider) tlsConfig() *tls.Config {
return &tls.Config{
GetCertificate: func(ch *tls.ClientHelloInfo) (*tls.Certificate, error) {
c.certPointerMutex.RLock()
defer c.certPointerMutex.RUnlock()
return c.currentCert, nil
},
}
}

func (c *certProvider) Stop() {
close(c.stopChannel)
c.ticker.Stop()
}

func (c *certProvider) updateCert() {
cert, err := tls.LoadX509KeyPair(c.serverCertFile, c.serverKeyFile)
if err != nil {
glog.Errorf("could not reload certificate %s (key %s): %v", c.serverCertFile, c.serverKeyFile, err)
return
}
c.certPointerMutex.Lock()
c.currentCert = &cert
c.certPointerMutex.Unlock()
}

func readCertFile(certFile string) ([]byte, error) {
Expand Down
Loading

0 comments on commit 249860f

Please sign in to comment.