Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
grac3gao committed Dec 10, 2020
1 parent 31527c0 commit 67ccacb
Show file tree
Hide file tree
Showing 26 changed files with 431 additions and 214 deletions.
3 changes: 2 additions & 1 deletion cmd/broker/fanout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler"
authcheckclient "github.com/google/knative-gcp/pkg/gclient/authcheck"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
"github.com/google/knative-gcp/pkg/utils/appcredentials"
Expand Down Expand Up @@ -103,7 +104,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create fanout sync pool", zap.Error(err))
}
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, env.AuthType); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, env.AuthType, authcheckclient.NewAuthCheckClient()); err != nil {
logger.Fatalw("Failed to start fanout sync pool", zap.Error(err))
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/broker/retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"flag"
"net/http"
"time"

"cloud.google.com/go/pubsub"
Expand Down Expand Up @@ -101,7 +102,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to get retry sync pool", zap.Error(err))
}
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, env.AuthType); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, env.AuthType, http.DefaultClient); err != nil {
logger.Fatal("Failed to start retry sync pool", zap.Error(err))
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/intevents/v1/pullsubscription_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,20 @@ func (s *PullSubscriptionStatus) MarkDeployedUnknown(reason, messageFormat strin

// PropagateDeploymentAvailability uses the availability of the provided Deployment to determine if
// PullSubscriptionConditionDeployed should be marked as true or false.
// For authentication check purpose, this method will return true if a false condition
// For authentication check purpose, this method will return false if a false condition
// is caused by deployment's replicaset unavailable.
// ReplicaSet unavailable is a sign for potential authentication problems.
func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) bool {
deploymentAvailableFound := false
replicaUnavailable := false
replicaAvailable := true
for _, cond := range d.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
deploymentAvailableFound = true
if cond.Status == corev1.ConditionTrue {
pullSubscriptionCondSet.Manage(s).MarkTrue(PullSubscriptionConditionDeployed)
} else if cond.Status == corev1.ConditionFalse {
if cond.Reason == replicaUnavailableReason {
replicaUnavailable = true
replicaAvailable = false
}
pullSubscriptionCondSet.Manage(s).MarkFalse(PullSubscriptionConditionDeployed, cond.Reason, cond.Message)
} else if cond.Status == corev1.ConditionUnknown {
Expand All @@ -119,5 +119,5 @@ func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deplo
if !deploymentAvailableFound {
pullSubscriptionCondSet.Manage(s).MarkUnknown(PullSubscriptionConditionDeployed, "DeploymentUnavailable", "Deployment %q is unavailable.", d.Name)
}
return replicaUnavailable
return replicaAvailable
}
2 changes: 1 addition & 1 deletion pkg/apis/intevents/v1/pullsubscription_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func TestPullSubscriptionStatusGetCondition(t *testing.T) {
func TestPropagateDeploymentAvailability(t *testing.T) {
s := &PullSubscriptionStatus{}
got := s.PropagateDeploymentAvailability(replicaUnavailableDeployment)
want := true
want := false
if diff := cmp.Diff(got, want); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/apis/intevents/v1alpha1/brokercell_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,26 @@ func (bs *BrokerCellStatus) InitializeConditions() {
// PropagateIngressAvailability uses the availability of the provided Endpoints
// to determine if BrokerCellConditionIngress should be marked as true or
// false.
// For authentication check purpose, this method will return true if a false condition
// For authentication check purpose, this method will return false if a false condition
// is caused by deployment's replicaset unavailable.
// ReplicaSet unavailable is a sign for potential authentication problems.
func (bs *BrokerCellStatus) PropagateIngressAvailability(ep *corev1.Endpoints, ind *appsv1.Deployment) bool {
replicaUnavailable := false
replicaAvailable := true
if duck.EndpointsAreAvailable(ep) {
brokerCellCondSet.Manage(bs).MarkTrue(BrokerCellConditionIngress)
} else {
for _, cond := range ind.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
if cond.Status == corev1.ConditionFalse {
if cond.Reason == replicaUnavailableReason {
replicaUnavailable = true
replicaAvailable = false
}
}
}
}
brokerCellCondSet.Manage(bs).MarkFalse(BrokerCellConditionIngress, "EndpointsUnavailable", "Endpoints %q is unavailable.", ep.Name)
}
return replicaUnavailable
return replicaAvailable
}

func (bs *BrokerCellStatus) MarkIngressFailed(reason, format string, args ...interface{}) {
Expand All @@ -110,20 +110,20 @@ func (bs *BrokerCellStatus) MarkIngressUnknown(reason, format string, args ...in
// PropagateFanoutAvailability uses the availability of the provided Deployment
// to determine if BrokerCellConditionFanout should be marked as true or
// false.
// For authentication check purpose, this method will return true if a false condition
// For authentication check purpose, this method will return false if a false condition
// is caused by deployment's replicaset unavailable.
// ReplicaSet unavailable is a sign for potential authentication problems.
func (bs *BrokerCellStatus) PropagateFanoutAvailability(d *appsv1.Deployment) bool {
deploymentAvailableFound := false
replicaUnavailable := false
replicaAvailable := true
for _, cond := range d.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
deploymentAvailableFound = true
if cond.Status == corev1.ConditionTrue {
brokerCellCondSet.Manage(bs).MarkTrue(BrokerCellConditionFanout)
} else if cond.Status == corev1.ConditionFalse {
if cond.Reason == replicaUnavailableReason {
replicaUnavailable = true
replicaAvailable = false
}
brokerCellCondSet.Manage(bs).MarkFalse(BrokerCellConditionFanout, cond.Reason, cond.Message)
} else if cond.Status == corev1.ConditionUnknown {
Expand All @@ -134,7 +134,7 @@ func (bs *BrokerCellStatus) PropagateFanoutAvailability(d *appsv1.Deployment) bo
if !deploymentAvailableFound {
brokerCellCondSet.Manage(bs).MarkUnknown(BrokerCellConditionFanout, "DeploymentUnavailable", "Deployment %q is unavailable.", d.Name)
}
return replicaUnavailable
return replicaAvailable
}

func (bs *BrokerCellStatus) MarkFanoutFailed(reason, format string, args ...interface{}) {
Expand All @@ -153,15 +153,15 @@ func (bs *BrokerCellStatus) MarkFanoutUnknown(reason, format string, args ...int
// ReplicaSet unavailable is a sign for potential authentication problems.
func (bs *BrokerCellStatus) PropagateRetryAvailability(d *appsv1.Deployment) bool {
deploymentAvailableFound := false
replicaUnavailable := false
replicaAvailable := true
for _, cond := range d.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
deploymentAvailableFound = true
if cond.Status == corev1.ConditionTrue {
brokerCellCondSet.Manage(bs).MarkTrue(BrokerCellConditionRetry)
} else if cond.Status == corev1.ConditionFalse {
if cond.Reason == replicaUnavailableReason {
replicaUnavailable = true
replicaAvailable = false
}
brokerCellCondSet.Manage(bs).MarkFalse(BrokerCellConditionRetry, cond.Reason, cond.Message)
} else if cond.Status == corev1.ConditionUnknown {
Expand All @@ -172,7 +172,7 @@ func (bs *BrokerCellStatus) PropagateRetryAvailability(d *appsv1.Deployment) boo
if !deploymentAvailableFound {
brokerCellCondSet.Manage(bs).MarkUnknown(BrokerCellConditionRetry, "DeploymentUnavailable", "Deployment %q is unavailable.", d.Name)
}
return replicaUnavailable
return replicaAvailable
}

func (bs *BrokerCellStatus) MarkRetryFailed(reason, format string, args ...interface{}) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/intevents/v1alpha1/brokercell_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func TestPropagateDeploymentAvailability(t *testing.T) {
t.Run("propagate ingress availability", func(t *testing.T) {
s := &BrokerCellStatus{}
got := s.PropagateIngressAvailability(&corev1.Endpoints{}, replicaUnavailableDeployment)
want := true
want := false
if diff := cmp.Diff(got, want); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
Expand All @@ -447,7 +447,7 @@ func TestPropagateDeploymentAvailability(t *testing.T) {
t.Run("propagate fanout availability", func(t *testing.T) {
s := &BrokerCellStatus{}
got := s.PropagateFanoutAvailability(replicaUnavailableDeployment)
want := true
want := false
if diff := cmp.Diff(got, want); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
Expand All @@ -456,7 +456,7 @@ func TestPropagateDeploymentAvailability(t *testing.T) {
t.Run("propagate retry availability", func(t *testing.T) {
s := &BrokerCellStatus{}
got := s.PropagateRetryAvailability(replicaUnavailableDeployment)
want := true
want := false
if diff := cmp.Diff(got, want); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/intevents/v1beta1/pullsubscription_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ func (s *PullSubscriptionStatus) MarkNoSubscription(reason, messageFormat string

// PropagateDeploymentAvailability uses the availability of the provided Deployment to determine if
// PullSubscriptionConditionDeployed should be marked as true or false.
// For authentication check purpose, this method will return true if a false condition
// For authentication check purpose, this method will return false if a false condition
// is caused by deployment's replicaset unavailable.
// ReplicaSet unavailable is a sign for potential authentication problems.
func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) bool {
deploymentAvailableFound := false
replicaUnavailable := false
replicaAvailable := true
for _, cond := range d.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
deploymentAvailableFound = true
if cond.Status == corev1.ConditionTrue {
pullSubscriptionCondSet.Manage(s).MarkTrue(PullSubscriptionConditionDeployed)
} else if cond.Status == corev1.ConditionFalse {
if cond.Reason == replicaUnavailableReason {
replicaUnavailable = true
replicaAvailable = false
}
pullSubscriptionCondSet.Manage(s).MarkFalse(PullSubscriptionConditionDeployed, cond.Reason, cond.Message)
} else if cond.Status == corev1.ConditionUnknown {
Expand All @@ -111,5 +111,5 @@ func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deplo
if !deploymentAvailableFound {
pullSubscriptionCondSet.Manage(s).MarkUnknown(PullSubscriptionConditionDeployed, "DeploymentUnavailable", "Deployment %q is unavailable.", d.Name)
}
return replicaUnavailable
return replicaAvailable
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func TestPullSubscriptionStatusGetCondition(t *testing.T) {
func TestPropagateDeploymentAvailability(t *testing.T) {
s := &PullSubscriptionStatus{}
got := s.PropagateDeploymentAvailability(replicaUnavailableDeployment)
want := true
want := false
if diff := cmp.Diff(got, want); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/broker/handler/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/eventutil"
handlertesting "github.com/google/knative-gcp/pkg/broker/handler/testing"
authchecktesting "github.com/google/knative-gcp/pkg/gclient/authcheck/testing"
reportertest "github.com/google/knative-gcp/pkg/metrics/testing"

_ "knative.dev/pkg/metrics/testing"
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestFanoutWatchAndSync(t *testing.T) {
}

t.Run("start sync pool creates no handler", func(t *testing.T) {
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, "")
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, "", authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted))
if err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
t.Fatalf("failed to get random free port: %v", err)
}

if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, ""); err != nil {
if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, "", authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted)); err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/broker/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

authcheckclient "github.com/google/knative-gcp/pkg/gclient/authcheck"
"github.com/google/knative-gcp/pkg/logging"
"github.com/google/knative-gcp/pkg/utils/authcheck"

Expand All @@ -44,6 +45,7 @@ type probeChecker struct {
lastReportTime time.Time
maxStaleDuration time.Duration
port int
authCheckClient authcheckclient.Client
authType authcheck.AuthType
}

Expand Down Expand Up @@ -85,7 +87,11 @@ func (c *probeChecker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
// Perform Authentication check.
authcheck.AuthenticationCheck(req.Context(), c.logger, c.authType, w)
if err := authcheck.AuthenticationCheck(req.Context(), c.authType, c.authCheckClient); err != nil {
c.logger.Error("authentication check failed", zap.Error(err))
w.WriteHeader(http.StatusUnauthorized)
return
}

// Zero maxStaleDuration means infinite.
if c.maxStaleDuration == 0 {
Expand All @@ -106,6 +112,7 @@ func StartSyncPool(
maxStaleDuration time.Duration,
probeCheckPort int,
authType authcheck.AuthType,
authCheckClient authcheckclient.Client,
) (SyncPool, error) {

if err := syncPool.SyncOnce(ctx); err != nil {
Expand All @@ -116,6 +123,7 @@ func StartSyncPool(
maxStaleDuration: maxStaleDuration,
port: probeCheckPort,
authType: authType,
authCheckClient: authCheckClient,
}
go c.start(ctx)
if syncSignal != nil {
Expand Down
14 changes: 10 additions & 4 deletions pkg/broker/handler/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"time"

"github.com/google/go-cmp/cmp"

authchecktesting "github.com/google/knative-gcp/pkg/gclient/authcheck/testing"
"github.com/google/knative-gcp/pkg/utils/authcheck"
)

func TestSyncPool(t *testing.T) {
Expand All @@ -42,7 +45,7 @@ func TestSyncPool(t *testing.T) {
t.Fatalf("failed to get random free port: %v", err)
}

_, gotErr := StartSyncPool(ctx, syncPool, make(chan struct{}), 30*time.Second, p, "")
_, gotErr := StartSyncPool(ctx, syncPool, make(chan struct{}), 30*time.Second, p, authcheck.WorkloadIdentityGSA, authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted))
if gotErr == nil {
t.Error("StartSyncPool got unexpected result")
}
Expand All @@ -65,7 +68,7 @@ func TestSyncPool(t *testing.T) {
}

ch := make(chan struct{})
if _, err := StartSyncPool(ctx, syncPool, ch, time.Second, p, ""); err != nil {
if _, err := StartSyncPool(ctx, syncPool, ch, time.Second, p, authcheck.WorkloadIdentityGSA, authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted)); err != nil {
t.Errorf("StartSyncPool got unexpected error: %v", err)
}
syncPool.verifySyncOnceCalled(t)
Expand All @@ -74,10 +77,13 @@ func TestSyncPool(t *testing.T) {

ch <- struct{}{}
syncPool.verifySyncOnceCalled(t)
// False because authentication check will fail.
assertProbeCheckResult(t, p, false, "healthz")
assertProbeCheckResult(t, p, true, "healthz")
// False because path is not healthz.
assertProbeCheckResult(t, p, false, "empty")

time.Sleep(time.Second)
// False because it exceeds StaleDuration.
assertProbeCheckResult(t, p, false, "healthz")
})
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/broker/handler/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package handler

import (
"context"
"net/http"
"testing"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/eventutil"
handlertesting "github.com/google/knative-gcp/pkg/broker/handler/testing"
authchecktesting "github.com/google/knative-gcp/pkg/gclient/authcheck/testing"
reportertest "github.com/google/knative-gcp/pkg/metrics/testing"

_ "knative.dev/pkg/metrics/testing"
Expand Down Expand Up @@ -61,7 +63,7 @@ func TestRetryWatchAndSync(t *testing.T) {
}

t.Run("start sync pool creates no handler", func(t *testing.T) {
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, "")
_, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, "", authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted))
if err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}
Expand Down Expand Up @@ -157,7 +159,7 @@ func TestRetrySyncPoolE2E(t *testing.T) {
t.Fatalf("failed to get random free port: %v", err)
}

if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, ""); err != nil {
if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, "", authchecktesting.NewFakeAuthCheckClient(http.StatusAccepted)); err != nil {
t.Errorf("unexpected error from starting sync pool: %v", err)
}

Expand Down
Loading

0 comments on commit 67ccacb

Please sign in to comment.