Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
grac3gao committed Dec 16, 2020
1 parent 9efbb64 commit 5b119af
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 180 deletions.
3 changes: 1 addition & 2 deletions cmd/broker/fanout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler"
authcheckclient "github.com/google/knative-gcp/pkg/gclient/authcheck"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
"github.com/google/knative-gcp/pkg/utils/appcredentials"
Expand Down Expand Up @@ -104,7 +103,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create fanout sync pool", zap.Error(err))
}
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, env.AuthType, authcheckclient.NewAuthCheckClient()); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, authcheck.NewDefaultAuthenticationCheck(env.AuthType)); err != nil {
logger.Fatalw("Failed to start fanout sync pool", zap.Error(err))
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/broker/retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"context"
"flag"
"net/http"
"time"

"cloud.google.com/go/pubsub"
Expand Down Expand Up @@ -102,7 +101,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to get retry sync pool", zap.Error(err))
}
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, env.AuthType, http.DefaultClient); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, authcheck.NewDefaultAuthenticationCheck(env.AuthType)); err != nil {
logger.Fatal("Failed to start retry sync pool", zap.Error(err))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/intevents/v1/pullsubscription_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func TestPropagateDeploymentAvailability(t *testing.T) {
s := &PullSubscriptionStatus{}
got := s.PropagateDeploymentAvailability(replicaUnavailableDeployment)
want := false
if diff := cmp.Diff(got, want); diff != "" {
if diff := cmp.Diff(want, got); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
}
6 changes: 3 additions & 3 deletions pkg/apis/intevents/v1alpha1/brokercell_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func TestPropagateDeploymentAvailability(t *testing.T) {
s := &BrokerCellStatus{}
got := s.PropagateIngressAvailability(&corev1.Endpoints{}, replicaUnavailableDeployment)
want := false
if diff := cmp.Diff(got, want); diff != "" {
if diff := cmp.Diff(want, got); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
})
Expand All @@ -448,7 +448,7 @@ func TestPropagateDeploymentAvailability(t *testing.T) {
s := &BrokerCellStatus{}
got := s.PropagateFanoutAvailability(replicaUnavailableDeployment)
want := false
if diff := cmp.Diff(got, want); diff != "" {
if diff := cmp.Diff(want, got); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
})
Expand All @@ -457,7 +457,7 @@ func TestPropagateDeploymentAvailability(t *testing.T) {
s := &BrokerCellStatus{}
got := s.PropagateRetryAvailability(replicaUnavailableDeployment)
want := false
if diff := cmp.Diff(got, want); diff != "" {
if diff := cmp.Diff(want, got); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func TestPropagateDeploymentAvailability(t *testing.T) {
s := &PullSubscriptionStatus{}
got := s.PropagateDeploymentAvailability(replicaUnavailableDeployment)
want := false
if diff := cmp.Diff(got, want); diff != "" {
if diff := cmp.Diff(want, got); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
}
6 changes: 3 additions & 3 deletions pkg/broker/handler/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/eventutil"
handlertesting "github.com/google/knative-gcp/pkg/broker/handler/testing"
authchecktesting "github.com/google/knative-gcp/pkg/gclient/authcheck/testing"
reportertest "github.com/google/knative-gcp/pkg/metrics/testing"
"github.com/google/knative-gcp/pkg/utils/authcheck"

_ "knative.dev/pkg/metrics/testing"
)
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestFanoutWatchAndSync(t *testing.T) {
}

t.Run("start sync pool creates no handler", func(t *testing.T) {
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, "", authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted))
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, NewFakeAuthenticationCheck(authcheck.WorkloadIdentity, true))
if err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
t.Fatalf("failed to get random free port: %v", err)
}

if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, "", authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted)); err != nil {
if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, NewFakeAuthenticationCheck(authcheck.WorkloadIdentity, true)); err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}

Expand Down
14 changes: 5 additions & 9 deletions pkg/broker/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sync"
"time"

authcheckclient "github.com/google/knative-gcp/pkg/gclient/authcheck"
"github.com/google/knative-gcp/pkg/logging"
"github.com/google/knative-gcp/pkg/utils/authcheck"

Expand All @@ -45,8 +44,7 @@ type probeChecker struct {
lastReportTime time.Time
maxStaleDuration time.Duration
port int
authCheckClient authcheckclient.Client
authType authcheck.AuthType
authCheck authcheck.AuthenticationCheck
}

func (c *probeChecker) reportHealth() {
Expand Down Expand Up @@ -87,9 +85,9 @@ func (c *probeChecker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
// Perform Authentication check.
if err := authcheck.AuthenticationCheck(req.Context(), c.authType, c.authCheckClient); err != nil {
if err := c.authCheck.Check(req.Context()); err != nil {
c.logger.Error("authentication check failed", zap.Error(err))
w.WriteHeader(http.StatusUnauthorized)
w.WriteHeader(http.StatusInternalServerError)
return
}

Expand All @@ -111,8 +109,7 @@ func StartSyncPool(
syncSignal <-chan struct{},
maxStaleDuration time.Duration,
probeCheckPort int,
authType authcheck.AuthType,
authCheckClient authcheckclient.Client,
authCheck authcheck.AuthenticationCheck,
) (SyncPool, error) {

if err := syncPool.SyncOnce(ctx); err != nil {
Expand All @@ -122,8 +119,7 @@ func StartSyncPool(
logger: logging.FromContext(ctx),
maxStaleDuration: maxStaleDuration,
port: probeCheckPort,
authType: authType,
authCheckClient: authCheckClient,
authCheck: authCheck,
}
go c.start(ctx)
if syncSignal != nil {
Expand Down
25 changes: 22 additions & 3 deletions pkg/broker/handler/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package handler

import (
"context"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -26,7 +27,6 @@ import (

"github.com/google/go-cmp/cmp"

authchecktesting "github.com/google/knative-gcp/pkg/gclient/authcheck/testing"
"github.com/google/knative-gcp/pkg/utils/authcheck"
)

Expand All @@ -45,7 +45,7 @@ func TestSyncPool(t *testing.T) {
t.Fatalf("failed to get random free port: %v", err)
}

_, gotErr := StartSyncPool(ctx, syncPool, make(chan struct{}), 30*time.Second, p, authcheck.WorkloadIdentityGSA, authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted))
_, gotErr := StartSyncPool(ctx, syncPool, make(chan struct{}), 30*time.Second, p, NewFakeAuthenticationCheck(authcheck.WorkloadIdentity, true))
if gotErr == nil {
t.Error("StartSyncPool got unexpected result")
}
Expand All @@ -68,7 +68,7 @@ func TestSyncPool(t *testing.T) {
}

ch := make(chan struct{})
if _, err := StartSyncPool(ctx, syncPool, ch, time.Second, p, authcheck.WorkloadIdentityGSA, authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted)); err != nil {
if _, err := StartSyncPool(ctx, syncPool, ch, time.Second, p, NewFakeAuthenticationCheck(authcheck.WorkloadIdentity, true)); err != nil {
t.Errorf("StartSyncPool got unexpected error: %v", err)
}
syncPool.verifySyncOnceCalled(t)
Expand Down Expand Up @@ -143,3 +143,22 @@ func GetFreePort() (int, error) {
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}

type FakeAuthenticationCheck struct {
authType authcheck.AuthType
noError bool
}

func NewFakeAuthenticationCheck(authType authcheck.AuthType, noError bool) authcheck.AuthenticationCheck {
return &FakeAuthenticationCheck{
authType: authType,
noError: noError,
}
}

func (ac *FakeAuthenticationCheck) Check(ctx context.Context) error {
if ac.noError {
return nil
}
return errors.New("induced error")
}
7 changes: 3 additions & 4 deletions pkg/broker/handler/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package handler

import (
"context"
"net/http"
"testing"
"time"

Expand All @@ -29,8 +28,8 @@ import (
"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/eventutil"
handlertesting "github.com/google/knative-gcp/pkg/broker/handler/testing"
authchecktesting "github.com/google/knative-gcp/pkg/gclient/authcheck/testing"
reportertest "github.com/google/knative-gcp/pkg/metrics/testing"
"github.com/google/knative-gcp/pkg/utils/authcheck"

_ "knative.dev/pkg/metrics/testing"
)
Expand Down Expand Up @@ -63,7 +62,7 @@ func TestRetryWatchAndSync(t *testing.T) {
}

t.Run("start sync pool creates no handler", func(t *testing.T) {
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, "", authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted))
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, NewFakeAuthenticationCheck(authcheck.WorkloadIdentity, true))
if err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}
Expand Down Expand Up @@ -159,7 +158,7 @@ func TestRetrySyncPoolE2E(t *testing.T) {
t.Fatalf("failed to get random free port: %v", err)
}

if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, "", authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted)); err != nil {
if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, NewFakeAuthenticationCheck(authcheck.WorkloadIdentity, true)); err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}

Expand Down
38 changes: 0 additions & 38 deletions pkg/gclient/authcheck/client.go

This file was deleted.

29 changes: 0 additions & 29 deletions pkg/gclient/authcheck/interfaces.go

This file was deleted.

43 changes: 0 additions & 43 deletions pkg/gclient/authcheck/testing/fake.go

This file was deleted.

Loading

0 comments on commit 5b119af

Please sign in to comment.