Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Authentication check running inside the Pod for Source and BrokerCell Fanout and Retry #1982

Merged
merged 6 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/broker/handler/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestFanoutWatchAndSync(t *testing.T) {
}

t.Run("start sync pool creates no handler", func(t *testing.T) {
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, &authcheck.FakeAuthenticationCheck{NoError: true})
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, &authcheck.FakeAuthenticationCheck{})
if err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
t.Fatalf("failed to get random free port: %v", err)
}

if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, &authcheck.FakeAuthenticationCheck{NoError: true}); err != nil {
if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, &authcheck.FakeAuthenticationCheck{}); err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/broker/handler/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestSyncPool(t *testing.T) {
t.Fatalf("failed to get random free port: %v", err)
}

_, gotErr := StartSyncPool(ctx, syncPool, make(chan struct{}), 30*time.Second, p, &authcheck.FakeAuthenticationCheck{NoError: true})
_, gotErr := StartSyncPool(ctx, syncPool, make(chan struct{}), 30*time.Second, p, &authcheck.FakeAuthenticationCheck{})
if gotErr == nil {
t.Error("StartSyncPool got unexpected result")
}
Expand All @@ -67,7 +67,7 @@ func TestSyncPool(t *testing.T) {
}

ch := make(chan struct{})
if _, err := StartSyncPool(ctx, syncPool, ch, time.Second, p, &authcheck.FakeAuthenticationCheck{NoError: true}); err != nil {
if _, err := StartSyncPool(ctx, syncPool, ch, time.Second, p, &authcheck.FakeAuthenticationCheck{}); err != nil {
t.Errorf("StartSyncPool got unexpected error: %v", err)
}
syncPool.verifySyncOnceCalled(t)
Expand Down
4 changes: 2 additions & 2 deletions pkg/broker/handler/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestRetryWatchAndSync(t *testing.T) {
}

t.Run("start sync pool creates no handler", func(t *testing.T) {
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, &authcheck.FakeAuthenticationCheck{NoError: true})
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, &authcheck.FakeAuthenticationCheck{})
if err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestRetrySyncPoolE2E(t *testing.T) {
t.Fatalf("failed to get random free port: %v", err)
}

if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, &authcheck.FakeAuthenticationCheck{NoError: true}); err != nil {
if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, &authcheck.FakeAuthenticationCheck{}); err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/brokercell/brokercell.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, bc *intv1alpha1.BrokerCe
if replicaAvailable := bc.Status.PropagateIngressAvailability(endpoints, ind); !replicaAvailable {
podList, err := authcheck.GetPodList(ctx, resources.GetLabelSelector(bc.Name, resources.IngressName), r.KubeClientSet, bc.Namespace)
if err != nil {
logging.FromContext(ctx).Error("Failed to propagate authentication check message from ingress component", zap.Any("namespace", bc.Namespace), zap.Any("name", bc.Name), zap.Error(err))
logging.FromContext(ctx).Error("Failed to propagate authentication check message from ingress component", zap.Error(err))
return err
}
if authenticationCheckMessage := authcheck.GetTerminationLogFromPodList(podList); authenticationCheckMessage != "" {
Expand Down Expand Up @@ -207,7 +207,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, bc *intv1alpha1.BrokerCe
if replicaAvailable := bc.Status.PropagateFanoutAvailability(fd); !replicaAvailable {
podList, err := authcheck.GetPodList(ctx, resources.GetLabelSelector(bc.Name, resources.FanoutName), r.KubeClientSet, bc.Namespace)
if err != nil {
logging.FromContext(ctx).Error("Failed to propagate authentication check message from fanout component", zap.Any("namespace", bc.Namespace), zap.Any("name", bc.Name), zap.Error(err))
logging.FromContext(ctx).Error("Failed to propagate authentication check message from fanout component", zap.Error(err))
return err
}
if authenticationCheckMessage := authcheck.GetTerminationLogFromPodList(podList); authenticationCheckMessage != "" {
Expand All @@ -232,7 +232,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, bc *intv1alpha1.BrokerCe
if replicaAvailable := bc.Status.PropagateRetryAvailability(rd); !replicaAvailable {
podList, err := authcheck.GetPodList(ctx, resources.GetLabelSelector(bc.Name, resources.RetryName), r.KubeClientSet, bc.Namespace)
if err != nil {
logging.FromContext(ctx).Error("Failed to propagate authentication check message from retry component", zap.Any("namespace", bc.Namespace), zap.Any("name", bc.Name), zap.Error(err))
logging.FromContext(ctx).Error("Failed to propagate authentication check message from retry component", zap.Error(err))
return err
}
if authenticationCheckMessage := authcheck.GetTerminationLogFromPodList(podList); authenticationCheckMessage != "" {
Expand Down
11 changes: 6 additions & 5 deletions pkg/utils/authcheck/authcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@ type AuthenticationCheck interface {
Check(ctx context.Context) error
}

type DefaultAuthenticationCheck struct {
type defaultAuthenticationCheck struct {
authType AuthType
client *http.Client
url string
}

func NewDefault(authType AuthType) AuthenticationCheck {
return &DefaultAuthenticationCheck{
return &defaultAuthenticationCheck{
authType: authType,
client: http.DefaultClient,
url: resource,
}
}

// AuthenticationCheck performs the authentication check running in the Pod.
func (ac *DefaultAuthenticationCheck) Check(ctx context.Context) error {
func (ac *defaultAuthenticationCheck) Check(ctx context.Context) error {
var err error
switch ac.authType {
case Secret:
Expand Down Expand Up @@ -109,8 +109,9 @@ func AuthenticationCheckForWorkloadIdentityGSA(resource string, client *http.Cli
defer resp.Body.Close()
// Check if we can successfully get the token.
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest {
return errors.New("the Pod is not fully authenticated, " +
"probably due to corresponding k8s service account and google service account do not establish a correct relationship")
return fmt.Errorf("the Pod is not fully authenticated, "+
"probably due to corresponding k8s service account and google service account do not establish a correct relationship, "+
"request returns status code: %d", resp.StatusCode)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/authcheck/authcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestAuthenticationCheck(t *testing.T) {

defer server.Close()

authCheck := &DefaultAuthenticationCheck{
authCheck := &defaultAuthenticationCheck{
authType: tc.authType,
client: server.Client(),
url: server.URL,
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/authcheck/authtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func getAuthTypeForWorkloadIdentity(ctx context.Context, serviceAccountLister co
return "", fmt.Errorf("error getting Kubernetes Service Account: %w", err)
} else if email := kServiceAccount.Annotations[resources.WorkloadIdentityKey]; email != "" {
// Check if email is a valid google service account email.
if match := emailRegexp.FindStringSubmatch(email); len(match) == 0 {
if match := emailRegexp.FindString(email); match == "" {
return "", fmt.Errorf("%s is not a valid Google Service Account as the value of Kubernetes Service Account %s for annotation %s",
email, args.ServiceAccountName, resources.WorkloadIdentityKey)
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/utils/authcheck/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@ package authcheck

import (
"context"
"errors"
)

type FakeAuthenticationCheck struct {
NoError bool
Err error
}

func (ac *FakeAuthenticationCheck) Check(ctx context.Context) error {
if ac.NoError {
return nil
}
return errors.New("induced error")
return ac.Err
}
32 changes: 20 additions & 12 deletions pkg/utils/authcheck/probechecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package authcheck

import (
"context"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -30,17 +31,17 @@ import (
func TestProbeCheckResult(t *testing.T) {
testCases := []struct {
name string
noError bool
err error
wantStatusCode int
}{
{
name: "probe check got a failure result",
noError: false,
err: errors.New("induced error"),
wantStatusCode: http.StatusInternalServerError,
},
{
name: "probe check got a success result",
noError: true,
err: nil,
grac3gao-zz marked this conversation as resolved.
Show resolved Hide resolved
wantStatusCode: http.StatusOK,
},
}
Expand All @@ -51,22 +52,16 @@ func TestProbeCheckResult(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Get a free port.
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
port, err := getFreePort()
if err != nil {
t.Fatal("Failed to resolve TCP address:", err)
t.Fatal(err)
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
t.Fatal("Failed to listen TCP:", err)
}
l.Close()
port := l.Addr().(*net.TCPAddr).Port

logger := logging.FromContext(ctx)
probeChecker := ProbeChecker{
logger: logger,
port: port,
authCheck: &FakeAuthenticationCheck{NoError: tc.noError},
authCheck: &FakeAuthenticationCheck{Err: tc.err},
}
go probeChecker.Start(ctx)

Expand All @@ -90,3 +85,16 @@ func TestProbeCheckResult(t *testing.T) {
})
}
}

func getFreePort() (int, error) {
grac3gao-zz marked this conversation as resolved.
Show resolved Hide resolved
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, fmt.Errorf("failed to resolve TCP address: %w", err)
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, fmt.Errorf("failed to listen TCP: %w", err)
}
l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}