Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Webhook enhancements #543

Merged
merged 16 commits into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/quick-start-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ The operator also sets both `WebUIAddress` which is accessible from within the c

The Kubernetes Operator for Apache Spark comes with an optional mutating admission webhook for customizing Spark driver and executor pods based on the specification in `SparkApplication` objects, e.g., mounting user-specified ConfigMaps and volumes, and setting pod affinity/anti-affinity, and adding tolerations.

The webhook requires a X509 certificate for TLS for pod admission requests and responses between the Kubernetes API server and the webhook server running inside the operator. For that, the certificate and key files must be accessible by the webhook server.
The Kubernetes Operator for Spark ships with a tool at `hack/gencerts.sh` for generating the CA and server certificate and putting the certificate and key files into a secret named `spark-webhook-certs` in the namespace `spark-operator`. This secret will be mounted into the operator pod.
The webhook requires a X509 certificate for TLS for pod admission requests and responses between the Kubernetes API server and the webhook server running inside the operator. For that, the certificate and key files must be accessible by the webhook server. The location of these certs is configurable and they will be reloaded on a configurable period.
The Kubernetes Operator for Spark ships with a tool at `hack/gencerts.sh` for generating the CA and server certificate and putting the certificate and key files into a secret named `spark-webhook-certs` in the namespace `spark-operator`. This secret will be mounted into the operator pod.

Run the following command to create the secret with a certificate and key files using a batch Job, and install the operator Deployment with the mutating admission webhook:

Expand Down
36 changes: 15 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,18 @@ import (
)

var (
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
installCRDs = flag.Bool("install-crds", true, "Whether to install CRDs")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
webhookConfigName = flag.String("webhook-config-name", "spark-webhook-config", "The name of the MutatingWebhookConfiguration object to create.")
webhookCertDir = flag.String("webhook-cert-dir", "/etc/webhook-certs", "The directory where x509 certificate and key files are stored.")
webhookSvcNamespace = flag.String("webhook-svc-namespace", "spark-operator", "The namespace of the Service for the webhook server.")
webhookSvcName = flag.String("webhook-svc-name", "spark-webhook", "The name of the Service for the webhook server.")
webhookPort = flag.Int("webhook-port", 8080, "Service port of the webhook server.")
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressUrlFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
installCRDs = flag.Bool("install-crds", true, "Whether to install CRDs")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressUrlFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
)

func main() {
Expand Down Expand Up @@ -142,13 +137,12 @@ func main() {

var hook *webhook.WebHook
if *enableWebhook {
var err error
hook, err = webhook.New(kubeClient, crInformerFactory, *webhookCertDir, *webhookSvcNamespace, *webhookSvcName, *webhookPort, *namespace)
hook, err = webhook.New(kubeClient, crInformerFactory, *namespace)
if err != nil {
glog.Fatal(err)
}

if err = hook.Start(*webhookConfigName); err != nil {
if err = hook.Start(); err != nil {
glog.Fatal(err)
}
}
Expand All @@ -163,7 +157,7 @@ func main() {
applicationController.Stop()
scheduledApplicationController.Stop()
if *enableWebhook {
if err := hook.Stop(*webhookConfigName); err != nil {
if err := hook.Stop(); err != nil {
glog.Fatal(err)
}
}
Expand Down
73 changes: 64 additions & 9 deletions pkg/webhook/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,79 @@ package webhook
import (
"crypto/tls"
"io/ioutil"
"sync"
"time"

"github.com/golang/glog"
)

// certBundle is a container of a X509 certificate file and a corresponding key file for the
// certProvider is a container of a X509 certificate file and a corresponding key file for the
// webhook server, and a CA certificate file for the API server to verify the server certificate.
type certBundle struct {
serverCertFile string
serverKeyFile string
caCertFile string
type certProvider struct {
serverCertFile string
serverKeyFile string
caCertFile string
reloadInterval time.Duration
ticker *time.Ticker
stopChannel chan interface{}
currentCert *tls.Certificate
certPointerMutex *sync.RWMutex
}

// configServerTLS configures TLS for the admission webhook server.
func configServerTLS(certBundle *certBundle) (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(certBundle.serverCertFile, certBundle.serverKeyFile)
func NewCertProvider(serverCertFile, serverKeyFile, caCertFile string, reloadInterval time.Duration) (*certProvider, error) {
cert, err := tls.LoadX509KeyPair(serverCertFile, serverKeyFile)
if err != nil {
return nil, err
}
return &certProvider{
serverCertFile: serverCertFile,
serverKeyFile: serverKeyFile,
caCertFile: caCertFile,
reloadInterval: reloadInterval,
currentCert: &cert,
stopChannel: make(chan interface{}),
ticker: time.NewTicker(reloadInterval),
certPointerMutex: &sync.RWMutex{},
}, nil
}

func (c *certProvider) Start() {
go func() {
for {
select {
case <-c.stopChannel:
return
case <-c.ticker.C:
c.updateCert()
}
}
}()
}

return &tls.Config{Certificates: []tls.Certificate{cert}}, nil
func (c *certProvider) tlsConfig() *tls.Config {
return &tls.Config{
GetCertificate: func(ch *tls.ClientHelloInfo) (*tls.Certificate, error) {
c.certPointerMutex.RLock()
defer c.certPointerMutex.RUnlock()
return c.currentCert, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use a mutex to guard currentCert, which may be read in tlsConfig and written in updateCert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably a good idea, since this behavior is technically undefined (although AFAICT it's safe on all modern architectures).

},
}
}

func (c *certProvider) Stop() {
close(c.stopChannel)
c.ticker.Stop()
}

func (c *certProvider) updateCert() {
cert, err := tls.LoadX509KeyPair(c.serverCertFile, c.serverKeyFile)
if err != nil {
glog.Errorf("could not reload certificate %s (key %s): %v", c.serverCertFile, c.serverKeyFile, err)
return
}
c.certPointerMutex.Lock()
c.currentCert = &cert
c.certPointerMutex.Unlock()
}

func readCertFile(certFile string) ([]byte, error) {
Expand Down
Loading