diff --git a/cmd/broker/fanout/main.go b/cmd/broker/fanout/main.go index 20fd81ad32..ca44a0469a 100644 --- a/cmd/broker/fanout/main.go +++ b/cmd/broker/fanout/main.go @@ -27,6 +27,7 @@ import ( "github.com/google/knative-gcp/pkg/metrics" "github.com/google/knative-gcp/pkg/utils" "github.com/google/knative-gcp/pkg/utils/appcredentials" + "github.com/google/knative-gcp/pkg/utils/authcheck" "github.com/google/knative-gcp/pkg/utils/clients" "github.com/google/knative-gcp/pkg/utils/mainhelper" @@ -45,6 +46,9 @@ type envConfig struct { HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"` + // Environment variable containing the authType, which represents the authentication configuration mode the Pod is using. + AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""` + // MaxStaleDuration is the max duration of the handler pool without being synced. // With the internal pool resync period being 15s, it requires at least 4 // continuous sync failures (or no sync at all) to be stale. @@ -99,7 +103,7 @@ func main() { if err != nil { logger.Fatal("Failed to create fanout sync pool", zap.Error(err)) } - if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort); err != nil { + if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, env.AuthType); err != nil { logger.Fatalw("Failed to start fanout sync pool", zap.Error(err)) } diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 725e18fbfd..23a78c760e 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -20,6 +20,7 @@ import ( "github.com/google/knative-gcp/pkg/metrics" "github.com/google/knative-gcp/pkg/utils" "github.com/google/knative-gcp/pkg/utils/appcredentials" + "github.com/google/knative-gcp/pkg/utils/authcheck" "github.com/google/knative-gcp/pkg/utils/clients" "github.com/google/knative-gcp/pkg/utils/mainhelper" @@ -31,6 +32,9 @@ type envConfig struct { PodName string `envconfig:"POD_NAME" required:"true"` Port int `envconfig:"PORT" default:"8080"` + // Environment variable containing the authType, which represents the authentication configuration mode the Pod is using. + AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""` + // Default 300Mi. PublishBufferedByteLimit int `envconfig:"PUBLISH_BUFFERED_BYTES_LIMIT" default:"314572800"` } @@ -66,6 +70,7 @@ func main() { metrics.PodName(env.PodName), metrics.ContainerName(component), publishSetting(logger.Desugar(), env), + env.AuthType, ) if err != nil { logger.Desugar().Fatal("Unable to create ingress handler: ", zap.Error(err)) diff --git a/cmd/broker/ingress/wire.go b/cmd/broker/ingress/wire.go index f72c07e99f..4c3253ff34 100644 --- a/cmd/broker/ingress/wire.go +++ b/cmd/broker/ingress/wire.go @@ -25,6 +25,7 @@ import ( "github.com/google/knative-gcp/pkg/broker/config/volume" "github.com/google/knative-gcp/pkg/broker/ingress" "github.com/google/knative-gcp/pkg/metrics" + "github.com/google/knative-gcp/pkg/utils/authcheck" "github.com/google/knative-gcp/pkg/utils/clients" "github.com/google/wire" ) @@ -36,6 +37,7 @@ func InitializeHandler( podName metrics.PodName, containerName metrics.ContainerName, publishSettings pubsub.PublishSettings, + authType authcheck.AuthType, ) (*ingress.Handler, error) { panic(wire.Build( ingress.HandlerSet, diff --git a/cmd/broker/ingress/wire_gen.go b/cmd/broker/ingress/wire_gen.go index ea881832f8..5c59f4c340 100644 --- a/cmd/broker/ingress/wire_gen.go +++ b/cmd/broker/ingress/wire_gen.go @@ -11,12 +11,13 @@ import ( "github.com/google/knative-gcp/pkg/broker/config/volume" "github.com/google/knative-gcp/pkg/broker/ingress" "github.com/google/knative-gcp/pkg/metrics" + "github.com/google/knative-gcp/pkg/utils/authcheck" "github.com/google/knative-gcp/pkg/utils/clients" ) // Injectors from wire.go: -func InitializeHandler(ctx context.Context, port clients.Port, projectID clients.ProjectID, podName metrics.PodName, containerName metrics.ContainerName, publishSettings pubsub.PublishSettings) (*ingress.Handler, error) { +func InitializeHandler(ctx context.Context, port clients.Port, projectID clients.ProjectID, podName metrics.PodName, containerName metrics.ContainerName, publishSettings pubsub.PublishSettings, authType authcheck.AuthType) (*ingress.Handler, error) { httpMessageReceiver := clients.NewHTTPMessageReceiver(port) v := _wireValue readonlyTargets, err := volume.NewTargetsFromFile(v...) @@ -32,7 +33,7 @@ func InitializeHandler(ctx context.Context, port clients.Port, projectID clients if err != nil { return nil, err } - handler := ingress.NewHandler(ctx, httpMessageReceiver, multiTopicDecoupleSink, ingressReporter) + handler := ingress.NewHandler(ctx, httpMessageReceiver, multiTopicDecoupleSink, ingressReporter, authType) return handler, nil } diff --git a/cmd/broker/retry/main.go b/cmd/broker/retry/main.go index 7f7b28f5a4..2d0facc22a 100644 --- a/cmd/broker/retry/main.go +++ b/cmd/broker/retry/main.go @@ -29,6 +29,7 @@ import ( "github.com/google/knative-gcp/pkg/metrics" "github.com/google/knative-gcp/pkg/utils" "github.com/google/knative-gcp/pkg/utils/appcredentials" + "github.com/google/knative-gcp/pkg/utils/authcheck" "github.com/google/knative-gcp/pkg/utils/clients" "github.com/google/knative-gcp/pkg/utils/mainhelper" ) @@ -44,6 +45,9 @@ type envConfig struct { TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"` HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` + // Environment variable containing the authType, which represents the authentication configuration mode the Pod is using. + AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""` + // Outstanding messages effectively limits how many connections we will create to each subscriber. // If such connections are long, it will consume a lot of memory (aggregated) without limiting. OutstandingMessagesPerSub int `envconfig:"OUTSTANDING_MESSAGES_PER_SUB" default:"200"` @@ -97,7 +101,7 @@ func main() { if err != nil { logger.Fatal("Failed to get retry sync pool", zap.Error(err)) } - if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort); err != nil { + if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, env.AuthType); err != nil { logger.Fatal("Failed to start retry sync pool", zap.Error(err)) } diff --git a/cmd/pubsub/publisher/main.go b/cmd/pubsub/publisher/main.go index 0d38618c08..31ef26e50e 100644 --- a/cmd/pubsub/publisher/main.go +++ b/cmd/pubsub/publisher/main.go @@ -21,22 +21,27 @@ import ( "flag" "log" + "github.com/kelseyhightower/envconfig" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "knative.dev/pkg/tracing" + . "github.com/google/knative-gcp/pkg/pubsub/publisher" "github.com/google/knative-gcp/pkg/testing/testloggingutil" tracingconfig "github.com/google/knative-gcp/pkg/tracing" "github.com/google/knative-gcp/pkg/utils" "github.com/google/knative-gcp/pkg/utils/appcredentials" + "github.com/google/knative-gcp/pkg/utils/authcheck" "github.com/google/knative-gcp/pkg/utils/clients" - "github.com/kelseyhightower/envconfig" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "knative.dev/pkg/tracing" ) type envConfig struct { // Environment variable containing the port for the publisher. Port int `envconfig:"PORT" default:"8080"` + // Environment variable containing the authType, which represents the authentication configuration mode the Pod is using. + AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""` + // Topic is the environment variable containing the PubSub Topic being // subscribed to's name. In the form that is unique within the project. // E.g. 'laconia', not 'projects/my-gcp-project/topics/laconia'. @@ -95,6 +100,7 @@ func main() { clients.Port(env.Port), clients.ProjectID(projectID), TopicID(topicID), + env.AuthType, ) if err != nil { diff --git a/cmd/pubsub/publisher/wire.go b/cmd/pubsub/publisher/wire.go index 8c0e95f988..5464e574d8 100644 --- a/cmd/pubsub/publisher/wire.go +++ b/cmd/pubsub/publisher/wire.go @@ -22,6 +22,7 @@ import ( "context" "github.com/google/knative-gcp/pkg/pubsub/publisher" + "github.com/google/knative-gcp/pkg/utils/authcheck" "github.com/google/knative-gcp/pkg/utils/clients" "github.com/google/wire" @@ -32,6 +33,7 @@ func InitializePublisher( port clients.Port, projectID clients.ProjectID, topicID publisher.TopicID, + authType authcheck.AuthType, ) (*publisher.Publisher, error) { panic(wire.Build( publisher.PublisherSet, diff --git a/cmd/pubsub/publisher/wire_gen.go b/cmd/pubsub/publisher/wire_gen.go index 8110e6af06..3bbf09fe7f 100644 --- a/cmd/pubsub/publisher/wire_gen.go +++ b/cmd/pubsub/publisher/wire_gen.go @@ -8,18 +8,19 @@ package main import ( "context" "github.com/google/knative-gcp/pkg/pubsub/publisher" + "github.com/google/knative-gcp/pkg/utils/authcheck" "github.com/google/knative-gcp/pkg/utils/clients" ) // Injectors from wire.go: -func InitializePublisher(ctx context.Context, port clients.Port, projectID clients.ProjectID, topicID publisher.TopicID) (*publisher.Publisher, error) { +func InitializePublisher(ctx context.Context, port clients.Port, projectID clients.ProjectID, topicID publisher.TopicID, authType authcheck.AuthType) (*publisher.Publisher, error) { httpMessageReceiver := clients.NewHTTPMessageReceiver(port) client, err := clients.NewPubsubClient(ctx, projectID) if err != nil { return nil, err } topic := publisher.NewPubSubTopic(ctx, client, topicID) - publisherPublisher := publisher.NewPublisher(ctx, httpMessageReceiver, topic) + publisherPublisher := publisher.NewPublisher(ctx, httpMessageReceiver, topic, authType) return publisherPublisher, nil } diff --git a/cmd/pubsub/receive_adapter/main.go b/cmd/pubsub/receive_adapter/main.go index b2317899f3..f0a518083a 100644 --- a/cmd/pubsub/receive_adapter/main.go +++ b/cmd/pubsub/receive_adapter/main.go @@ -22,6 +22,7 @@ import ( "time" "github.com/google/knative-gcp/pkg/testing/testloggingutil" + "github.com/google/knative-gcp/pkg/utils/authcheck" "go.uber.org/zap" "knative.dev/pkg/logging" @@ -48,6 +49,9 @@ const ( // TODO we should refactor this and reduce the number of environment variables. // most of them are due to metrics, which has to change anyways. type envConfig struct { + // Environment variable containing the authType, which represents the authentication configuration mode the Pod is using. + AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""` + // Environment variable containing the sink URI. Sink string `envconfig:"SINK_URI" required:"true"` @@ -173,6 +177,7 @@ func main() { SinkURI: env.Sink, TransformerURI: env.Transformer, Extensions: extensions, + AuthType: env.AuthType, } adapter, err := InitializeAdapter(ctx, diff --git a/go.mod b/go.mod index 632f951434..3a93f0e31f 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( go.uber.org/multierr v1.6.0 go.uber.org/zap v1.16.0 golang.org/x/net v0.0.0-20201110031124-69a78807bb2b + golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 google.golang.org/api v0.34.0 google.golang.org/genproto v0.0.0-20200929141702-51c3e5b607fe diff --git a/pkg/apis/intevents/v1/pullsubscription_lifecycle.go b/pkg/apis/intevents/v1/pullsubscription_lifecycle.go index ae45fdcc37..5a2f8c3ca6 100644 --- a/pkg/apis/intevents/v1/pullsubscription_lifecycle.go +++ b/pkg/apis/intevents/v1/pullsubscription_lifecycle.go @@ -22,6 +22,8 @@ import ( "knative.dev/pkg/apis" ) +const replicaUnavailableReason = "MinimumReplicasUnavailable" + // GetCondition returns the condition currently associated with the given type, or nil. func (s *PullSubscriptionStatus) GetCondition(t apis.ConditionType) *apis.Condition { return pullSubscriptionCondSet.Manage(s).GetCondition(t) @@ -93,14 +95,21 @@ func (s *PullSubscriptionStatus) MarkDeployedUnknown(reason, messageFormat strin // PropagateDeploymentAvailability uses the availability of the provided Deployment to determine if // PullSubscriptionConditionDeployed should be marked as true or false. -func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) { +// For authentication check purpose, this method will return true if a false condition +// is caused by deployment's replicaset unavailable. +// ReplicaSet unavailable is a sign for potential authentication problems. +func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) bool { deploymentAvailableFound := false + replicaUnavailable := false for _, cond := range d.Status.Conditions { if cond.Type == appsv1.DeploymentAvailable { deploymentAvailableFound = true if cond.Status == corev1.ConditionTrue { pullSubscriptionCondSet.Manage(s).MarkTrue(PullSubscriptionConditionDeployed) } else if cond.Status == corev1.ConditionFalse { + if cond.Reason == replicaUnavailableReason { + replicaUnavailable = true + } pullSubscriptionCondSet.Manage(s).MarkFalse(PullSubscriptionConditionDeployed, cond.Reason, cond.Message) } else if cond.Status == corev1.ConditionUnknown { pullSubscriptionCondSet.Manage(s).MarkUnknown(PullSubscriptionConditionDeployed, cond.Reason, cond.Message) @@ -110,4 +119,5 @@ func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deplo if !deploymentAvailableFound { pullSubscriptionCondSet.Manage(s).MarkUnknown(PullSubscriptionConditionDeployed, "DeploymentUnavailable", "Deployment %q is unavailable.", d.Name) } + return replicaUnavailable } diff --git a/pkg/apis/intevents/v1/pullsubscription_lifecycle_test.go b/pkg/apis/intevents/v1/pullsubscription_lifecycle_test.go index 443b61d56d..b99eaed73c 100644 --- a/pkg/apis/intevents/v1/pullsubscription_lifecycle_test.go +++ b/pkg/apis/intevents/v1/pullsubscription_lifecycle_test.go @@ -69,6 +69,22 @@ var ( }, }, } + + replicaUnavailableDeployment = &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Status: appsv1.DeploymentStatus{ + Conditions: []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionFalse, + Reason: "MinimumReplicasUnavailable", + Message: "False Status", + }, + }, + }, + } ) func TestPullSubscriptionStatusIsReady(t *testing.T) { @@ -506,3 +522,12 @@ func TestPullSubscriptionStatusGetCondition(t *testing.T) { }) } } + +func TestPropagateDeploymentAvailability(t *testing.T) { + s := &PullSubscriptionStatus{} + got := s.PropagateDeploymentAvailability(replicaUnavailableDeployment) + want := true + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } +} diff --git a/pkg/apis/intevents/v1alpha1/brokercell_lifecycle.go b/pkg/apis/intevents/v1alpha1/brokercell_lifecycle.go index f9219ecf68..ab2d26464f 100644 --- a/pkg/apis/intevents/v1alpha1/brokercell_lifecycle.go +++ b/pkg/apis/intevents/v1alpha1/brokercell_lifecycle.go @@ -50,6 +50,8 @@ const ( // BrokerCellConditionTargetsConfig reports the readiness of the // BrokerCell's targets configmap. BrokerCellConditionTargetsConfig apis.ConditionType = "TargetsConfigReady" + + replicaUnavailableReason = "MinimumReplicasUnavailable" ) // GetCondition returns the condition currently associated with the given type, or nil. @@ -75,12 +77,26 @@ func (bs *BrokerCellStatus) InitializeConditions() { // PropagateIngressAvailability uses the availability of the provided Endpoints // to determine if BrokerCellConditionIngress should be marked as true or // false. -func (bs *BrokerCellStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { +// For authentication check purpose, this method will return true if a false condition +// is caused by deployment's replicaset unavailable. +// ReplicaSet unavailable is a sign for potential authentication problems. +func (bs *BrokerCellStatus) PropagateIngressAvailability(ep *corev1.Endpoints, ind *appsv1.Deployment) bool { + replicaUnavailable := false if duck.EndpointsAreAvailable(ep) { brokerCellCondSet.Manage(bs).MarkTrue(BrokerCellConditionIngress) } else { + for _, cond := range ind.Status.Conditions { + if cond.Type == appsv1.DeploymentAvailable { + if cond.Status == corev1.ConditionFalse { + if cond.Reason == replicaUnavailableReason { + replicaUnavailable = true + } + } + } + } brokerCellCondSet.Manage(bs).MarkFalse(BrokerCellConditionIngress, "EndpointsUnavailable", "Endpoints %q is unavailable.", ep.Name) } + return replicaUnavailable } func (bs *BrokerCellStatus) MarkIngressFailed(reason, format string, args ...interface{}) { @@ -94,14 +110,21 @@ func (bs *BrokerCellStatus) MarkIngressUnknown(reason, format string, args ...in // PropagateFanoutAvailability uses the availability of the provided Deployment // to determine if BrokerCellConditionFanout should be marked as true or // false. -func (bs *BrokerCellStatus) PropagateFanoutAvailability(d *appsv1.Deployment) { +// For authentication check purpose, this method will return true if a false condition +// is caused by deployment's replicaset unavailable. +// ReplicaSet unavailable is a sign for potential authentication problems. +func (bs *BrokerCellStatus) PropagateFanoutAvailability(d *appsv1.Deployment) bool { deploymentAvailableFound := false + replicaUnavailable := false for _, cond := range d.Status.Conditions { if cond.Type == appsv1.DeploymentAvailable { deploymentAvailableFound = true if cond.Status == corev1.ConditionTrue { brokerCellCondSet.Manage(bs).MarkTrue(BrokerCellConditionFanout) } else if cond.Status == corev1.ConditionFalse { + if cond.Reason == replicaUnavailableReason { + replicaUnavailable = true + } brokerCellCondSet.Manage(bs).MarkFalse(BrokerCellConditionFanout, cond.Reason, cond.Message) } else if cond.Status == corev1.ConditionUnknown { brokerCellCondSet.Manage(bs).MarkUnknown(BrokerCellConditionFanout, cond.Reason, cond.Message) @@ -111,6 +134,7 @@ func (bs *BrokerCellStatus) PropagateFanoutAvailability(d *appsv1.Deployment) { if !deploymentAvailableFound { brokerCellCondSet.Manage(bs).MarkUnknown(BrokerCellConditionFanout, "DeploymentUnavailable", "Deployment %q is unavailable.", d.Name) } + return replicaUnavailable } func (bs *BrokerCellStatus) MarkFanoutFailed(reason, format string, args ...interface{}) { @@ -124,14 +148,21 @@ func (bs *BrokerCellStatus) MarkFanoutUnknown(reason, format string, args ...int // PropagateRetryAvailability uses the availability of the provided Deployment // to determine if BrokerCellConditionRetry should be marked as true or // unknown. -func (bs *BrokerCellStatus) PropagateRetryAvailability(d *appsv1.Deployment) { +// For authentication check purpose, this method will return true if a false condition +// is caused by deployment's replicaset unavailable. +// ReplicaSet unavailable is a sign for potential authentication problems. +func (bs *BrokerCellStatus) PropagateRetryAvailability(d *appsv1.Deployment) bool { deploymentAvailableFound := false + replicaUnavailable := false for _, cond := range d.Status.Conditions { if cond.Type == appsv1.DeploymentAvailable { deploymentAvailableFound = true if cond.Status == corev1.ConditionTrue { brokerCellCondSet.Manage(bs).MarkTrue(BrokerCellConditionRetry) } else if cond.Status == corev1.ConditionFalse { + if cond.Reason == replicaUnavailableReason { + replicaUnavailable = true + } brokerCellCondSet.Manage(bs).MarkFalse(BrokerCellConditionRetry, cond.Reason, cond.Message) } else if cond.Status == corev1.ConditionUnknown { brokerCellCondSet.Manage(bs).MarkUnknown(BrokerCellConditionRetry, cond.Reason, cond.Message) @@ -141,6 +172,7 @@ func (bs *BrokerCellStatus) PropagateRetryAvailability(d *appsv1.Deployment) { if !deploymentAvailableFound { brokerCellCondSet.Manage(bs).MarkUnknown(BrokerCellConditionRetry, "DeploymentUnavailable", "Deployment %q is unavailable.", d.Name) } + return replicaUnavailable } func (bs *BrokerCellStatus) MarkRetryFailed(reason, format string, args ...interface{}) { diff --git a/pkg/apis/intevents/v1alpha1/brokercell_lifecycle_test.go b/pkg/apis/intevents/v1alpha1/brokercell_lifecycle_test.go index a55f8ab82c..7d2e3c2228 100644 --- a/pkg/apis/intevents/v1alpha1/brokercell_lifecycle_test.go +++ b/pkg/apis/intevents/v1alpha1/brokercell_lifecycle_test.go @@ -23,10 +23,29 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" ) +var ( + replicaUnavailableDeployment = &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Status: appsv1.DeploymentStatus{ + Conditions: []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionFalse, + Reason: "MinimumReplicasUnavailable", + Message: "False Status", + }, + }, + }, + } +) + var ( brokerCellConditionReady = apis.Condition{ Type: BrokerCellConditionReady, @@ -296,9 +315,9 @@ func TestBrokerCellConditionStatus(t *testing.T) { bs.PropagateFanoutAvailability(&appsv1.Deployment{}) } if test.ingressStatus != nil { - bs.PropagateIngressAvailability(test.ingressStatus) + bs.PropagateIngressAvailability(test.ingressStatus, &appsv1.Deployment{}) } else { - bs.PropagateIngressAvailability(&corev1.Endpoints{}) + bs.PropagateIngressAvailability(&corev1.Endpoints{}, &appsv1.Deployment{}) } if test.retryStatus != nil { bs.PropagateRetryAvailability(test.retryStatus) @@ -414,3 +433,32 @@ func TestMarkBrokerCellStatus(t *testing.T) { }) } } + +func TestPropagateDeploymentAvailability(t *testing.T) { + t.Run("propagate ingress availability", func(t *testing.T) { + s := &BrokerCellStatus{} + got := s.PropagateIngressAvailability(&corev1.Endpoints{}, replicaUnavailableDeployment) + want := true + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } + }) + + t.Run("propagate fanout availability", func(t *testing.T) { + s := &BrokerCellStatus{} + got := s.PropagateFanoutAvailability(replicaUnavailableDeployment) + want := true + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } + }) + + t.Run("propagate retry availability", func(t *testing.T) { + s := &BrokerCellStatus{} + got := s.PropagateRetryAvailability(replicaUnavailableDeployment) + want := true + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } + }) +} diff --git a/pkg/apis/intevents/v1alpha1/test_helper.go b/pkg/apis/intevents/v1alpha1/test_helper.go index 5594abfcad..ce3e16c303 100644 --- a/pkg/apis/intevents/v1alpha1/test_helper.go +++ b/pkg/apis/intevents/v1alpha1/test_helper.go @@ -88,7 +88,7 @@ func (t testHelper) UnknownDeployment() *appsv1.Deployment { func (t testHelper) ReadyBrokerCellStatus() *BrokerCellStatus { bs := &BrokerCellStatus{} - bs.PropagateIngressAvailability(t.AvailableEndpoints()) + bs.PropagateIngressAvailability(t.AvailableEndpoints(), t.AvailableDeployment()) bs.SetIngressTemplate("http://localhost") bs.PropagateFanoutAvailability(t.AvailableDeployment()) bs.PropagateRetryAvailability(t.AvailableDeployment()) diff --git a/pkg/apis/intevents/v1beta1/pullsubscription_lifecycle.go b/pkg/apis/intevents/v1beta1/pullsubscription_lifecycle.go index dbaeacfdb1..42ae9af4d9 100644 --- a/pkg/apis/intevents/v1beta1/pullsubscription_lifecycle.go +++ b/pkg/apis/intevents/v1beta1/pullsubscription_lifecycle.go @@ -22,6 +22,8 @@ import ( "knative.dev/pkg/apis" ) +const replicaUnavailableReason = "MinimumReplicasUnavailable" + // GetCondition returns the condition currently associated with the given type, or nil. func (s *PullSubscriptionStatus) GetCondition(t apis.ConditionType) *apis.Condition { return pullSubscriptionCondSet.Manage(s).GetCondition(t) @@ -85,14 +87,21 @@ func (s *PullSubscriptionStatus) MarkNoSubscription(reason, messageFormat string // PropagateDeploymentAvailability uses the availability of the provided Deployment to determine if // PullSubscriptionConditionDeployed should be marked as true or false. -func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) { +// For authentication check purpose, this method will return true if a false condition +// is caused by deployment's replicaset unavailable. +// ReplicaSet unavailable is a sign for potential authentication problems. +func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) bool { deploymentAvailableFound := false + replicaUnavailable := false for _, cond := range d.Status.Conditions { if cond.Type == appsv1.DeploymentAvailable { deploymentAvailableFound = true if cond.Status == corev1.ConditionTrue { pullSubscriptionCondSet.Manage(s).MarkTrue(PullSubscriptionConditionDeployed) } else if cond.Status == corev1.ConditionFalse { + if cond.Reason == replicaUnavailableReason { + replicaUnavailable = true + } pullSubscriptionCondSet.Manage(s).MarkFalse(PullSubscriptionConditionDeployed, cond.Reason, cond.Message) } else if cond.Status == corev1.ConditionUnknown { pullSubscriptionCondSet.Manage(s).MarkUnknown(PullSubscriptionConditionDeployed, cond.Reason, cond.Message) @@ -102,4 +111,5 @@ func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deplo if !deploymentAvailableFound { pullSubscriptionCondSet.Manage(s).MarkUnknown(PullSubscriptionConditionDeployed, "DeploymentUnavailable", "Deployment %q is unavailable.", d.Name) } + return replicaUnavailable } diff --git a/pkg/apis/intevents/v1beta1/pullsubscription_lifecycle_test.go b/pkg/apis/intevents/v1beta1/pullsubscription_lifecycle_test.go index 8d11e0553f..b2f3ba2788 100644 --- a/pkg/apis/intevents/v1beta1/pullsubscription_lifecycle_test.go +++ b/pkg/apis/intevents/v1beta1/pullsubscription_lifecycle_test.go @@ -69,6 +69,22 @@ var ( }, }, } + + replicaUnavailableDeployment = &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Status: appsv1.DeploymentStatus{ + Conditions: []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionFalse, + Reason: "MinimumReplicasUnavailable", + Message: "False Status", + }, + }, + }, + } ) func TestPullSubscriptionStatusIsReady(t *testing.T) { @@ -506,3 +522,12 @@ func TestPullSubscriptionStatusGetCondition(t *testing.T) { }) } } + +func TestPropagateDeploymentAvailability(t *testing.T) { + s := &PullSubscriptionStatus{} + got := s.PropagateDeploymentAvailability(replicaUnavailableDeployment) + want := true + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } +} diff --git a/pkg/broker/handler/fanout_test.go b/pkg/broker/handler/fanout_test.go index 875c8b7641..de4aa6f007 100644 --- a/pkg/broker/handler/fanout_test.go +++ b/pkg/broker/handler/fanout_test.go @@ -62,7 +62,7 @@ func TestFanoutWatchAndSync(t *testing.T) { } t.Run("start sync pool creates no handler", func(t *testing.T) { - _, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p) + _, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, "") if err != nil { t.Errorf("unexpected error from starting sync pool: %v", err) } @@ -155,7 +155,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) { t.Fatalf("failed to get random free port: %v", err) } - if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p); err != nil { + if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, ""); err != nil { t.Errorf("unexpected error from starting sync pool: %v", err) } diff --git a/pkg/broker/handler/pool.go b/pkg/broker/handler/pool.go index 48a2c22f4c..c2ac0cab22 100644 --- a/pkg/broker/handler/pool.go +++ b/pkg/broker/handler/pool.go @@ -24,6 +24,8 @@ import ( "time" "github.com/google/knative-gcp/pkg/logging" + "github.com/google/knative-gcp/pkg/utils/authcheck" + "go.uber.org/zap" ) @@ -37,10 +39,12 @@ type SyncPool interface { } type probeChecker struct { + logger *zap.Logger mux sync.RWMutex lastReportTime time.Time maxStaleDuration time.Duration port int + authType authcheck.AuthType } func (c *probeChecker) reportHealth() { @@ -80,6 +84,9 @@ func (c *probeChecker) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusNotFound) return } + // Perform Authentication check. + authcheck.AuthenticationCheck(req.Context(), c.logger, c.authType, w) + // Zero maxStaleDuration means infinite. if c.maxStaleDuration == 0 { w.WriteHeader(http.StatusOK) @@ -89,7 +96,6 @@ func (c *probeChecker) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) return } - w.WriteHeader(http.StatusOK) } // StartSyncPool starts the sync pool. @@ -99,14 +105,17 @@ func StartSyncPool( syncSignal <-chan struct{}, maxStaleDuration time.Duration, probeCheckPort int, + authType authcheck.AuthType, ) (SyncPool, error) { if err := syncPool.SyncOnce(ctx); err != nil { return nil, err } c := &probeChecker{ + logger: logging.FromContext(ctx), maxStaleDuration: maxStaleDuration, port: probeCheckPort, + authType: authType, } go c.start(ctx) if syncSignal != nil { diff --git a/pkg/broker/handler/pool_test.go b/pkg/broker/handler/pool_test.go index 50856d0372..0cd366ec9d 100644 --- a/pkg/broker/handler/pool_test.go +++ b/pkg/broker/handler/pool_test.go @@ -42,7 +42,7 @@ func TestSyncPool(t *testing.T) { t.Fatalf("failed to get random free port: %v", err) } - _, gotErr := StartSyncPool(ctx, syncPool, make(chan struct{}), 30*time.Second, p) + _, gotErr := StartSyncPool(ctx, syncPool, make(chan struct{}), 30*time.Second, p, "") if gotErr == nil { t.Error("StartSyncPool got unexpected result") } @@ -65,7 +65,7 @@ func TestSyncPool(t *testing.T) { } ch := make(chan struct{}) - if _, err := StartSyncPool(ctx, syncPool, ch, time.Second, p); err != nil { + if _, err := StartSyncPool(ctx, syncPool, ch, time.Second, p, ""); err != nil { t.Errorf("StartSyncPool got unexpected error: %v", err) } syncPool.verifySyncOnceCalled(t) @@ -74,17 +74,16 @@ func TestSyncPool(t *testing.T) { ch <- struct{}{} syncPool.verifySyncOnceCalled(t) - assertProbeCheckResult(t, p, true) - - // Intentionally causing a failed check. - time.Sleep(time.Second) - assertProbeCheckResult(t, p, false) + // False because authentication check will fail. + assertProbeCheckResult(t, p, false, "healthz") + // False because path is not healthz. + assertProbeCheckResult(t, p, false, "empty") }) } -func assertProbeCheckResult(t *testing.T, port int, ok bool) { +func assertProbeCheckResult(t *testing.T, port int, ok bool, path string) { t.Helper() - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/healthz", port), nil) + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/%s", port, path), nil) if err != nil { t.Fatal("Failed to create probe check request:", err) } diff --git a/pkg/broker/handler/retry_test.go b/pkg/broker/handler/retry_test.go index b73f134ae5..3db8e2c48e 100644 --- a/pkg/broker/handler/retry_test.go +++ b/pkg/broker/handler/retry_test.go @@ -61,7 +61,7 @@ func TestRetryWatchAndSync(t *testing.T) { } t.Run("start sync pool creates no handler", func(t *testing.T) { - _, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p) + _, err = StartSyncPool(ctx, syncPool, signal, time.Minute, p, "") if err != nil { t.Errorf("unexpected error from starting sync pool: %v", err) } @@ -157,7 +157,7 @@ func TestRetrySyncPoolE2E(t *testing.T) { t.Fatalf("failed to get random free port: %v", err) } - if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p); err != nil { + if _, err := StartSyncPool(ctx, syncPool, signal, time.Minute, p, ""); err != nil { t.Errorf("unexpected error from starting sync pool: %v", err) } diff --git a/pkg/broker/ingress/handler.go b/pkg/broker/ingress/handler.go index 293ee899fa..1e0d891a44 100644 --- a/pkg/broker/ingress/handler.go +++ b/pkg/broker/ingress/handler.go @@ -28,10 +28,6 @@ import ( ceclient "github.com/cloudevents/sdk-go/v2/client" "github.com/cloudevents/sdk-go/v2/protocol" "github.com/cloudevents/sdk-go/v2/protocol/http" - "github.com/google/knative-gcp/pkg/logging" - "github.com/google/knative-gcp/pkg/metrics" - "github.com/google/knative-gcp/pkg/tracing" - "github.com/google/knative-gcp/pkg/utils/clients" "github.com/google/wire" "go.opencensus.io/resource" "go.opencensus.io/trace" @@ -43,6 +39,12 @@ import ( "knative.dev/eventing/pkg/kncloudevents" kntracing "knative.dev/eventing/pkg/tracing" "knative.dev/pkg/metrics/metricskey" + + "github.com/google/knative-gcp/pkg/logging" + "github.com/google/knative-gcp/pkg/metrics" + "github.com/google/knative-gcp/pkg/tracing" + "github.com/google/knative-gcp/pkg/utils/authcheck" + "github.com/google/knative-gcp/pkg/utils/clients" ) const ( @@ -97,15 +99,17 @@ type Handler struct { decouple DecoupleSink logger *zap.Logger reporter *metrics.IngressReporter + authType authcheck.AuthType } // NewHandler creates a new ingress handler. -func NewHandler(ctx context.Context, httpReceiver HttpMessageReceiver, decouple DecoupleSink, reporter *metrics.IngressReporter) *Handler { +func NewHandler(ctx context.Context, httpReceiver HttpMessageReceiver, decouple DecoupleSink, reporter *metrics.IngressReporter, authType authcheck.AuthType) *Handler { return &Handler{ httpReceiver: httpReceiver, decouple: decouple, reporter: reporter, logger: logging.FromContext(ctx), + authType: authType, } } diff --git a/pkg/broker/ingress/handler_test.go b/pkg/broker/ingress/handler_test.go index 7722eda421..c2db594210 100644 --- a/pkg/broker/ingress/handler_test.go +++ b/pkg/broker/ingress/handler_test.go @@ -415,7 +415,7 @@ func runIngressHandlerBenchmark(b *testing.B, eventSize int, targetCounts int) { if err != nil { b.Fatal(err) } - h := NewHandler(ctx, nil, decouple, statsReporter) + h := NewHandler(ctx, nil, decouple, statsReporter, "") if _, err := psClient.CreateTopic(ctx, topicID); err != nil { b.Fatal(err) @@ -523,7 +523,7 @@ func createAndStartIngress(ctx context.Context, t testing.TB, psSrv *pstest.Serv if err != nil { t.Fatal(err) } - h := NewHandler(ctx, receiver, decouple, statsReporter) + h := NewHandler(ctx, receiver, decouple, statsReporter, "") errCh := make(chan error, 1) go func() { diff --git a/pkg/pubsub/adapter/adapter.go b/pkg/pubsub/adapter/adapter.go index d4409502c7..8aa69208e4 100644 --- a/pkg/pubsub/adapter/adapter.go +++ b/pkg/pubsub/adapter/adapter.go @@ -27,15 +27,17 @@ import ( "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/extensions" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "go.opencensus.io/trace" + "k8s.io/apimachinery/pkg/types" + kntracing "knative.dev/eventing/pkg/tracing" + "github.com/google/knative-gcp/pkg/apis/messaging" "github.com/google/knative-gcp/pkg/logging" . "github.com/google/knative-gcp/pkg/pubsub/adapter/context" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" "github.com/google/knative-gcp/pkg/tracing" + "github.com/google/knative-gcp/pkg/utils/authcheck" "github.com/google/knative-gcp/pkg/utils/clients" - "go.opencensus.io/trace" - "k8s.io/apimachinery/pkg/types" - kntracing "knative.dev/eventing/pkg/tracing" ) // AdapterArgs has a bundle of arguments needed to create an Adapter. @@ -55,6 +57,9 @@ type AdapterArgs struct { // ConverterType use to select which converter to use. ConverterType converters.ConverterType + + // AuthType is the authentication configuration mode the Pod uses. + AuthType authcheck.AuthType } // Adapter implements the Pub/Sub adapter to deliver Pub/Sub messages from a @@ -123,6 +128,9 @@ func (a *Adapter) Start(ctx context.Context) error { ctx = WithTopicKey(ctx, a.args.TopicID) ctx = WithSubscriptionKey(ctx, a.subscription.ID()) + // Initialize probe checker to run authentication check. + pc := authcheck.NewProbeChecker(logging.FromContext(ctx), a.args.AuthType) + go pc.Start(ctx) return a.subscription.Receive(ctx, a.receive) } diff --git a/pkg/pubsub/publisher/publisher.go b/pkg/pubsub/publisher/publisher.go index 7fede6df46..7b723525c6 100644 --- a/pkg/pubsub/publisher/publisher.go +++ b/pkg/pubsub/publisher/publisher.go @@ -28,6 +28,8 @@ import ( "github.com/cloudevents/sdk-go/v2/binding/transformer" "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/google/knative-gcp/pkg/logging" + "github.com/google/knative-gcp/pkg/utils/authcheck" + "go.uber.org/zap" cev2 "github.com/cloudevents/sdk-go/v2" @@ -60,14 +62,18 @@ type Publisher struct { topic *pubsub.Topic logger *zap.Logger + // AuthType is the authentication configuration mode the Pod uses. + authType authcheck.AuthType } // NewPublisher creates a new publisher. -func NewPublisher(ctx context.Context, inbound HttpMessageReceiver, topic *pubsub.Topic) *Publisher { +func NewPublisher(ctx context.Context, inbound HttpMessageReceiver, topic *pubsub.Topic, authType authcheck.AuthType) *Publisher { return &Publisher{ inbound: inbound, topic: topic, logger: logging.FromContext(ctx), + // AuthType is the authentication configuration mode the Pod uses. + authType: authType, } } diff --git a/pkg/reconciler/brokercell/brokercell.go b/pkg/reconciler/brokercell/brokercell.go index e38c41d044..3681b16c6f 100644 --- a/pkg/reconciler/brokercell/brokercell.go +++ b/pkg/reconciler/brokercell/brokercell.go @@ -175,7 +175,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, bc *intv1alpha1.BrokerCe bc.Status.MarkIngressFailed("IngressServiceFailed", "Failed to reconcile ingress service: %v", err) return err } - bc.Status.PropagateIngressAvailability(endpoints) + // If deployment has replicaUnavailable error, it potentially has authentication configuration issues. + if replicaUnavailable := bc.Status.PropagateIngressAvailability(endpoints, ind); replicaUnavailable { + podList, err := authcheck.GetPodList(ctx, resources.GetLabelSelector(bc.Name, resources.IngressName), r.KubeClientSet, bc.Namespace) + if err != nil { + logging.FromContext(ctx).Error("Failed to propagate authentication check message from ingress component", zap.Any("namespace", bc.Namespace), zap.Any("name", bc.Name), zap.Error(err)) + return err + } + if authenticationCheckMessage := authcheck.GetTerminationLogFromPodList(podList); authenticationCheckMessage != "" { + bc.Status.MarkIngressUnknown(authcheck.AuthenticationCheckUnknownReason, authenticationCheckMessage) + } + } hostName := network.GetServiceHostname(endpoints.GetName(), endpoints.GetNamespace()) bc.Status.IngressTemplate = fmt.Sprintf("http://%s/{namespace}/{name}", hostName) @@ -193,8 +203,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, bc *intv1alpha1.BrokerCe bc.Status.MarkFanoutFailed("HorizontalPodAutoscalerFailed", "Failed to reconcile fanout HorizontalPodAutoscaler: %v", err) return err } - bc.Status.PropagateFanoutAvailability(fd) - + // If deployment has replicaUnavailable error, it potentially has authentication configuration issues. + if replicaUnavailable := bc.Status.PropagateFanoutAvailability(fd); replicaUnavailable { + podList, err := authcheck.GetPodList(ctx, resources.GetLabelSelector(bc.Name, resources.FanoutName), r.KubeClientSet, bc.Namespace) + if err != nil { + logging.FromContext(ctx).Error("Failed to propagate authentication check message from fanout component", zap.Any("namespace", bc.Namespace), zap.Any("name", bc.Name), zap.Error(err)) + return err + } + if authenticationCheckMessage := authcheck.GetTerminationLogFromPodList(podList); authenticationCheckMessage != "" { + bc.Status.MarkFanoutUnknown(authcheck.AuthenticationCheckUnknownReason, authenticationCheckMessage) + } + } // Reconcile retry deployment and HPA. rd, err := r.deploymentRec.ReconcileDeployment(ctx, bc, resources.MakeRetryDeployment(r.makeRetryArgs(bc, authType))) if err != nil { @@ -209,7 +228,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, bc *intv1alpha1.BrokerCe bc.Status.MarkRetryFailed("HorizontalPodAutoscalerFailed", "Failed to reconcile retry HorizontalPodAutoscaler: %v", err) return err } - bc.Status.PropagateRetryAvailability(rd) + // If deployment has replicaUnavailable error, it potentially has authentication configuration issues. + if replicaUnavailable := bc.Status.PropagateRetryAvailability(rd); replicaUnavailable { + podList, err := authcheck.GetPodList(ctx, resources.GetLabelSelector(bc.Name, resources.RetryName), r.KubeClientSet, bc.Namespace) + if err != nil { + logging.FromContext(ctx).Error("Failed to propagate authentication check message from retry component", zap.Any("namespace", bc.Namespace), zap.Any("name", bc.Name), zap.Error(err)) + return err + } + if authenticationCheckMessage := authcheck.GetTerminationLogFromPodList(podList); authenticationCheckMessage != "" { + bc.Status.MarkRetryUnknown(authcheck.AuthenticationCheckUnknownReason, authenticationCheckMessage) + } + } bc.Status.ObservedGeneration = bc.Generation return pkgreconciler.NewEvent(corev1.EventTypeNormal, "BrokerCellReconciled", "BrokerCell reconciled: \"%s/%s\"", bc.Namespace, bc.Name) diff --git a/pkg/reconciler/brokercell/resources/args.go b/pkg/reconciler/brokercell/resources/args.go index 5a569f28de..9a1e4d0b03 100644 --- a/pkg/reconciler/brokercell/resources/args.go +++ b/pkg/reconciler/brokercell/resources/args.go @@ -19,6 +19,7 @@ package resources import ( "fmt" + "k8s.io/apimachinery/pkg/labels" "knative.dev/pkg/kmeta" intv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" @@ -111,3 +112,8 @@ func CommonLabels(brokerCellName string) map[string]string { func Name(brokerCellName, componentName string) string { return kmeta.ChildName(fmt.Sprintf("%s-brokercell-", brokerCellName), componentName) } + +// GetLabelSelector get a label selector with brokercell's common labels. +func GetLabelSelector(brokerCellName, componentName string) labels.Selector { + return labels.SelectorFromSet(Labels(brokerCellName, componentName)) +} diff --git a/pkg/reconciler/brokercell/resources/args_test.go b/pkg/reconciler/brokercell/resources/args_test.go new file mode 100644 index 0000000000..c780298b3f --- /dev/null +++ b/pkg/reconciler/brokercell/resources/args_test.go @@ -0,0 +1,59 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/labels" + _ "knative.dev/pkg/system/testing" +) + +func TestLabels(t *testing.T) { + wantLabels := map[string]string{ + "app": "cloud-run-events", + "brokerCell": "default", + "role": "fanout", + } + gotLabels := Labels("default", "fanout") + + if diff := cmp.Diff(wantLabels, gotLabels); diff != "" { + t.Fatalf("Unexpected labels (-want, +got): %s", diff) + } +} + +func TestName(t *testing.T) { + wantLabels := "default-brokercell-fanout" + gotLabels := Name("default", "fanout") + + if diff := cmp.Diff(wantLabels, gotLabels); diff != "" { + t.Fatalf("Unexpected name (-want, +got): %s", diff) + } +} + +func TestGetLabelSelector(t *testing.T) { + gotLabelSelector := GetLabelSelector("default", "fanout") + wantLabelSelector := labels.SelectorFromSet(map[string]string{ + "app": "cloud-run-events", + "brokerCell": "default", + "role": "fanout", + }) + if diff := cmp.Diff(gotLabelSelector.String(), wantLabelSelector.String()); diff != "" { + t.Fatalf("Unexpected labelSelector (-want, +got): %s", diff) + } +} diff --git a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go index 525012a598..0a5c5d6bee 100644 --- a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go +++ b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go @@ -25,6 +25,9 @@ import ( pullsubscriptionreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/pullsubscription" psreconciler "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription" "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/keda/resources" + psresources "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/resources" + "github.com/google/knative-gcp/pkg/utils/authcheck" + "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -87,7 +90,17 @@ func (r *Reconciler) ReconcileScaledObject(ctx context.Context, ra *appsv1.Deplo } } - src.Status.PropagateDeploymentAvailability(existing) + // If deployment has replicaUnavailable error, it potentially has authentication configuration issues. + if replicaUnavailable := src.Status.PropagateDeploymentAvailability(existing); replicaUnavailable { + podList, err := authcheck.GetPodList(ctx, psresources.GetLabelSelector(r.ControllerAgentName, src.Name), r.KubeClientSet, src.Namespace) + if err != nil { + logging.FromContext(ctx).Error("Error propagating authentication check message", zap.Error(err)) + return err + } + if authenticationCheckMessage := authcheck.GetTerminationLogFromPodList(podList); authenticationCheckMessage != "" { + src.Status.MarkDeployedUnknown(authcheck.AuthenticationCheckUnknownReason, authenticationCheckMessage) + } + } // Now we reconcile the ScaledObject. gvr, _ := meta.UnsafeGuessKindToResource(resources.ScaledObjectGVK) diff --git a/pkg/reconciler/intevents/pullsubscription/resources/receive_adapter.go b/pkg/reconciler/intevents/pullsubscription/resources/receive_adapter.go index 9a8e8b5b7d..ea24ff10bf 100644 --- a/pkg/reconciler/intevents/pullsubscription/resources/receive_adapter.go +++ b/pkg/reconciler/intevents/pullsubscription/resources/receive_adapter.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + "k8s.io/apimachinery/pkg/util/intstr" + "github.com/google/knative-gcp/pkg/testing/testloggingutil" "github.com/google/knative-gcp/pkg/utils/authcheck" @@ -164,7 +166,24 @@ func makeReceiveAdapterPodSpec(ctx context.Context, args *ReceiveAdapterArgs) *c Ports: []corev1.ContainerPort{{ Name: "metrics", ContainerPort: 9090, + }, { + Name: "http", + ContainerPort: authcheck.DefaultProbeCheckPort, }}, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.FromInt(authcheck.DefaultProbeCheckPort), + Scheme: corev1.URISchemeHTTP, + }, + }, + FailureThreshold: 3, + PeriodSeconds: 15, + InitialDelaySeconds: 5, + SuccessThreshold: 1, + TimeoutSeconds: 5, + }, } // This is added purely for the TestCloudLogging E2E tests, which verify that the log line is diff --git a/pkg/reconciler/intevents/pullsubscription/resources/receive_adapter_test.go b/pkg/reconciler/intevents/pullsubscription/resources/receive_adapter_test.go index 625f155b36..eb39af015c 100644 --- a/pkg/reconciler/intevents/pullsubscription/resources/receive_adapter_test.go +++ b/pkg/reconciler/intevents/pullsubscription/resources/receive_adapter_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/util/intstr" "github.com/google/knative-gcp/pkg/apis/duck" gcpduckv1 "github.com/google/knative-gcp/pkg/apis/duck/v1" @@ -125,6 +126,20 @@ func TestMakeMinimumReceiveAdapter(t *testing.T) { corev1.ResourceCPU: resource.MustParse("400m"), }, }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.FromInt(authcheck.DefaultProbeCheckPort), + Scheme: corev1.URISchemeHTTP, + }, + }, + FailureThreshold: 3, + PeriodSeconds: 15, + InitialDelaySeconds: 5, + SuccessThreshold: 1, + TimeoutSeconds: 5, + }, Env: []corev1.EnvVar{{ Name: "PROJECT_ID", Value: "eventing-name", @@ -179,7 +194,9 @@ func TestMakeMinimumReceiveAdapter(t *testing.T) { Name: credsVolume, MountPath: credsMountPath, }}, - Ports: []corev1.ContainerPort{{Name: "metrics", ContainerPort: 9090}}, + Ports: []corev1.ContainerPort{ + {Name: "metrics", ContainerPort: 9090}, + {Name: "http", ContainerPort: 8080}}, }}, Volumes: []corev1.Volume{{ Name: credsVolume, @@ -294,6 +311,20 @@ func TestMakeFullReceiveAdapter(t *testing.T) { corev1.ResourceCPU: resource.MustParse("400m"), }, }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.FromInt(authcheck.DefaultProbeCheckPort), + Scheme: corev1.URISchemeHTTP, + }, + }, + FailureThreshold: 3, + PeriodSeconds: 15, + InitialDelaySeconds: 5, + SuccessThreshold: 1, + TimeoutSeconds: 5, + }, Env: []corev1.EnvVar{{ Name: "PROJECT_ID", Value: "eventing-name", @@ -350,10 +381,9 @@ func TestMakeFullReceiveAdapter(t *testing.T) { Name: credsVolume, MountPath: credsMountPath, }}, - Ports: []corev1.ContainerPort{{ - Name: "metrics", - ContainerPort: 9090, - }}, + Ports: []corev1.ContainerPort{ + {Name: "metrics", ContainerPort: 9090}, + {Name: "http", ContainerPort: 8080}}, }}, Volumes: []corev1.Volume{{ Name: credsVolume, @@ -468,6 +498,20 @@ func TestMakeReceiveAdapterWithServiceAccount(t *testing.T) { corev1.ResourceCPU: resource.MustParse("400m"), }, }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.FromInt(authcheck.DefaultProbeCheckPort), + Scheme: corev1.URISchemeHTTP, + }, + }, + FailureThreshold: 3, + PeriodSeconds: 15, + InitialDelaySeconds: 5, + SuccessThreshold: 1, + TimeoutSeconds: 5, + }, Env: []corev1.EnvVar{{ Name: "PROJECT_ID", Value: "eventing-name", @@ -514,10 +558,9 @@ func TestMakeReceiveAdapterWithServiceAccount(t *testing.T) { Name: "K_GCP_AUTH_TYPE", Value: string(authcheck.WorkloadIdentityGSA), }}, - Ports: []corev1.ContainerPort{{ - Name: "metrics", - ContainerPort: 9090, - }}, + Ports: []corev1.ContainerPort{ + {Name: "metrics", ContainerPort: 9090}, + {Name: "http", ContainerPort: 8080}}, }}, }, }, diff --git a/pkg/reconciler/intevents/pullsubscription/static/pullsubscription.go b/pkg/reconciler/intevents/pullsubscription/static/pullsubscription.go index 7d43565c5a..aad021d75a 100644 --- a/pkg/reconciler/intevents/pullsubscription/static/pullsubscription.go +++ b/pkg/reconciler/intevents/pullsubscription/static/pullsubscription.go @@ -24,6 +24,9 @@ import ( v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" pullsubscriptionreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/pullsubscription" psreconciler "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription" + psresources "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/resources" + "github.com/google/knative-gcp/pkg/utils/authcheck" + appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -58,7 +61,18 @@ func (r *Reconciler) ReconcileDeployment(ctx context.Context, ra *appsv1.Deploym } } - src.Status.PropagateDeploymentAvailability(existing) + // If deployment has replicaUnavailable error, it potentially has authentication configuration issues. + if replicaUnavailable := src.Status.PropagateDeploymentAvailability(existing); replicaUnavailable { + podList, err := authcheck.GetPodList(ctx, psresources.GetLabelSelector(r.ControllerAgentName, src.Name), r.KubeClientSet, src.Namespace) + if err != nil { + logging.FromContext(ctx).Error("Error propagating authentication check message", zap.Error(err)) + return err + } + if authenticationCheckMessage := authcheck.GetTerminationLogFromPodList(podList); authenticationCheckMessage != "" { + src.Status.MarkDeployedUnknown(authcheck.AuthenticationCheckUnknownReason, authenticationCheckMessage) + } + } + return nil } diff --git a/pkg/reconciler/testing/brokercell.go b/pkg/reconciler/testing/brokercell.go index 46107c63ba..0a0f7d8f1a 100644 --- a/pkg/reconciler/testing/brokercell.go +++ b/pkg/reconciler/testing/brokercell.go @@ -100,7 +100,7 @@ func WithBrokerCellIngressUnknown(reason, msg string) BrokerCellOption { func WithBrokerCellIngressAvailable() BrokerCellOption { return func(bc *intv1alpha1.BrokerCell) { - bc.Status.PropagateIngressAvailability(v1alpha1.TestHelper.AvailableEndpoints()) + bc.Status.PropagateIngressAvailability(v1alpha1.TestHelper.AvailableEndpoints(), v1alpha1.TestHelper.AvailableDeployment()) } } diff --git a/pkg/utils/authcheck/authcheck.go b/pkg/utils/authcheck/authcheck.go new file mode 100644 index 0000000000..2da98af469 --- /dev/null +++ b/pkg/utils/authcheck/authcheck.go @@ -0,0 +1,114 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package authcheck provides utilities to check authentication configuration for data plane resources. +// File authcheck contains functions to run customized checks inside of a Pod. +package authcheck + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + nethttp "net/http" + + "go.uber.org/zap" + "golang.org/x/oauth2/google" +) + +const ( + // Resource is used as the path to get the default token from metadata server. + // In workload-identity-gsa mode, this path will return a token if + // corresponding k8s service account and google service account establish a correct relationship. + resource = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token" + // Scope is used as the scope to get token from default credential. + scope = "https://www.googleapis.com/auth/cloud-platform" +) + +// AuthenticationCheck performs the authentication check running in the Pod. +func AuthenticationCheck(ctx context.Context, logger *zap.Logger, authType AuthType, response nethttp.ResponseWriter) { + var err error + if authType == Secret { + err = AuthenticationCheckForSecret(ctx) + } else if authType == WorkloadIdentityGSA { + err = AuthenticationCheckForWorkloadIdentityGSA(resource) + } else { + logger.Error(fmt.Sprint("unknown auth type: ", authType)) + response.WriteHeader(nethttp.StatusUnauthorized) + return + } + + if err != nil { + // Transfer the error into a string message, otherwise, marshalling error may return nil unexpectedly. + message := fmt.Sprintf("using %s mode, when checking authentication, get error: %s", authType, err.Error()) + b, err := json.Marshal(map[string]interface{}{ + "error": message, + }) + if err != nil { + logger.Error(fmt.Sprint("error marshalling the message: ", message), zap.Error(err)) + response.WriteHeader(nethttp.StatusUnauthorized) + return + } + errs := ioutil.WriteFile("/dev/termination-log", b, 0644) + if errs != nil { + logger.Error(fmt.Sprintf("error writing the message: %s into termination log", message), zap.Error(err)) + response.WriteHeader(nethttp.StatusUnauthorized) + return + } + logger.Info(message) + response.WriteHeader(nethttp.StatusUnauthorized) + return + } + response.WriteHeader(nethttp.StatusOK) +} + +// AuthenticationCheckForSecret performs the authentication check for Pod in secret mode. +func AuthenticationCheckForSecret(ctx context.Context) error { + cred, err := google.FindDefaultCredentials(ctx, scope) + if err != nil { + return fmt.Errorf("error finding the default credential: %w", err) + } + s, err := cred.TokenSource.Token() + if err != nil { + return fmt.Errorf("error getting the token, probably due to the key stored in the Kubernetes Secret is expired or revoked: %w", err) + } + if !s.Valid() { + return errors.New("token is not valid") + } + return nil +} + +// AuthenticationCheckForWorkloadIdentityGSA performs the authentication check for Pod in workload-identity-gsa mode. +func AuthenticationCheckForWorkloadIdentityGSA(resource string) error { + req, err := http.NewRequest(http.MethodGet, resource, nil) + if err != nil { + return fmt.Errorf("error setting up the http request: %w", err) + } + req.Header.Set("Metadata-Flavor", "Google") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("error getting the http response: %w", err) + } + defer resp.Body.Close() + // Check if we can successfully get the token. + if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest { + return errors.New("the Pod is not fully authenticated, " + + "probably due to corresponding k8s service account and google service account do not establish a correct relationship") + } + return nil +} diff --git a/pkg/utils/authcheck/authchek_test.go b/pkg/utils/authcheck/authchek_test.go new file mode 100644 index 0000000000..208baac69c --- /dev/null +++ b/pkg/utils/authcheck/authchek_test.go @@ -0,0 +1,81 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package authcheck + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/google/go-cmp/cmp" + + "github.com/google/knative-gcp/pkg/logging" +) + +func TestAuthenticationCheck(t *testing.T) { + testCases := []struct { + name string + authType AuthType + wantStatusCode int + setEnv bool + }{ + { + name: "authentication check uses empty authType", + authType: "", + wantStatusCode: http.StatusUnauthorized, + setEnv: false, + }, + { + name: "authentication check uses secret authType", + authType: Secret, + wantStatusCode: http.StatusUnauthorized, + setEnv: true, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := logging.FromContext(ctx) + response := httptest.NewRecorder() + + if tc.setEnv { + os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "empty") + } + + AuthenticationCheck(ctx, logger, tc.authType, response) + + if diff := cmp.Diff(tc.wantStatusCode, response.Result().StatusCode); diff != "" { + t.Error("unexpected status (-want, +got) = ", diff) + } + }) + } +} + +func TestAuthenticationCheckForWorkloadIdentityGSA(t *testing.T) { + resource := "http:/empty" + wantErr := errors.New("error getting the http response: Get \"http:///empty\": http: no Host in request URL") + gotErr := AuthenticationCheckForWorkloadIdentityGSA(resource) + if diff := cmp.Diff(gotErr.Error(), wantErr.Error()); diff != "" { + t.Error("unexpected status (-want, +got) = ", diff) + } +} diff --git a/pkg/utils/authcheck/authtype.go b/pkg/utils/authcheck/authtype.go index f30b8244c2..791b5fb1ad 100644 --- a/pkg/utils/authcheck/authtype.go +++ b/pkg/utils/authcheck/authtype.go @@ -14,12 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package authcheck provides utilities to check authentication configuration for data plane resources. +// File authtype contains functions to differentiate authentication mode. package authcheck import ( "context" "errors" "fmt" + "regexp" corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" @@ -52,12 +55,15 @@ const ( BrokerServiceAccountName = "broker" ) -var BrokerSecret = &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "google-broker-key", - }, - Key: "key.json", -} +var ( + // Regex for a valid google service account email. + emailRegexp = regexp.MustCompile(`^[a-z][a-z0-9-]{5,29}@[a-z][a-z0-9-]{5,29}\.iam\.gserviceaccount.com$`) + + BrokerSecret = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "google-broker-key"}, + Key: "key.json", + } +) // GetAuthTypeForBrokerCell will get authType for BrokerCell. func GetAuthTypeForBrokerCell(ctx context.Context, serviceAccountLister corev1listers.ServiceAccountLister, @@ -114,7 +120,19 @@ func getAuthTypeForWorkloadIdentity(ctx context.Context, serviceAccountLister co args.ServiceAccountName) } return "", fmt.Errorf("error getting Kubernetes Service Account: %w", err) - } else if kServiceAccount.Annotations[resources.WorkloadIdentityKey] != "" { + } else if email := kServiceAccount.Annotations[resources.WorkloadIdentityKey]; email != "" { + // Check if email is a valid google service account email. + if match := emailRegexp.FindStringSubmatch(email); len(match) == 0 { + // The format of google service account email is service-account-name@project-id.iam.gserviceaccount.com + + // Service account name must be between 6 and 30 characters (inclusive), + // must begin with a lowercase letter, and consist of lowercase alphanumeric characters that can be separated by hyphens. + + // Project IDs must start with a lowercase letter and can have lowercase ASCII letters, digits or hyphens, + // must be between 6 and 30 characters. + return "", fmt.Errorf("the annotation %s of Kubernetes Service Account %s does not contain a valid Google Service Account", + resources.WorkloadIdentityKey, args.ServiceAccountName) + } return WorkloadIdentityGSA, nil } // Once workload-identity new gen lands, we should also include the annotation check for it. diff --git a/pkg/utils/authcheck/authtype_test.go b/pkg/utils/authcheck/authtype_test.go index 99f146a368..9c602f4e3b 100644 --- a/pkg/utils/authcheck/authtype_test.go +++ b/pkg/utils/authcheck/authtype_test.go @@ -78,13 +78,25 @@ func TestGetAuthTypeForSources(t *testing.T) { name: "successfully get authType for workload identity", objects: []runtime.Object{ pkgtesting.NewServiceAccount(serviceAccountName, testNS, - pkgtesting.WithServiceAccountAnnotation("name"), + pkgtesting.WithServiceAccountAnnotation("service-account-name@project-id.iam.gserviceaccount.com"), ), }, args: serviceAccountArgs, wantAuthType: WorkloadIdentityGSA, wantError: nil, }, + { + name: "error get authType, invalid service account", + objects: []runtime.Object{ + pkgtesting.NewServiceAccount(serviceAccountName, testNS, + pkgtesting.WithServiceAccountAnnotation("name"), + ), + }, + args: serviceAccountArgs, + wantAuthType: "", + wantError: fmt.Errorf("using Workload Identity for authentication configuration: the annotation iam.gke.io/gcp-service-account "+ + "of Kubernetes Service Account %s does not contain a valid Google Service Account", serviceAccountName), + }, { name: "error get authType, service account doesn't exist", objects: []runtime.Object{}, diff --git a/pkg/utils/authcheck/enqueue.go b/pkg/utils/authcheck/enqueue.go index 24a0d3fd63..4abcc0f821 100644 --- a/pkg/utils/authcheck/enqueue.go +++ b/pkg/utils/authcheck/enqueue.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package authcheck provides utilities to check authentication configuration for data plane resources. +// enqueue.go contains customized EventHandlers to enqueue resources for authentication check. package authcheck import ( diff --git a/pkg/utils/authcheck/list.go b/pkg/utils/authcheck/list.go new file mode 100644 index 0000000000..7260787a9c --- /dev/null +++ b/pkg/utils/authcheck/list.go @@ -0,0 +1,66 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package authcheck provides utilities to check authentication configuration for data plane resources. +// list.go contains functions to get a list of resources based on label selector and get information from a list of resources. +package authcheck + +import ( + "context" + "strings" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" + + "knative.dev/pkg/logging" +) + +// GetPodList get a list of Pod in a certain namespace with certain label selector. +func GetPodList(ctx context.Context, ls labels.Selector, kubeClientSet kubernetes.Interface, namespace string) (*corev1.PodList, error) { + pl, err := kubeClientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: ls.String(), + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + }) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Unable to list pod", zap.Error(err)) + return nil, err + } + return pl, nil +} + +// GetTerminationLogFromPodList get termination log from Pod. +func GetTerminationLogFromPodList(pl *corev1.PodList) string { + for _, pod := range pl.Items { + for _, cs := range pod.Status.ContainerStatuses { + if cs.State.Terminated != nil && isAuthMessage(cs.State.Terminated.Message) { + return cs.State.Terminated.Message + } else if cs.LastTerminationState.Terminated != nil && isAuthMessage(cs.LastTerminationState.Terminated.Message) { + return cs.LastTerminationState.Terminated.Message + } + } + } + return "" +} + +func isAuthMessage(message string) bool { + return strings.Contains(message, "checking authentication") +} diff --git a/pkg/utils/authcheck/list_test.go b/pkg/utils/authcheck/list_test.go new file mode 100644 index 0000000000..d0461f048c --- /dev/null +++ b/pkg/utils/authcheck/list_test.go @@ -0,0 +1,202 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package authcheck + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + fakeKubeClient "k8s.io/client-go/kubernetes/fake" + pkgtesting "knative.dev/pkg/reconciler/testing" +) + +func TestGetPodList(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + objects []runtime.Object + labels labels.Selector + wantNames []string + }{ + { + name: "using correct labels to get podlist", + objects: []runtime.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: testNS, + Labels: map[string]string{ + "label1": "val1", + "label2": "val2", + }, + }, + }, + }, + labels: labels.SelectorFromSet(map[string]string{ + "label1": "val1", + "label2": "val2", + }), + wantNames: []string{ + "pod-1", + }, + }, + { + name: "using incorrect labels to get podlist", + objects: []runtime.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: testNS, + Labels: map[string]string{ + "label2": "val2", + }, + }, + }, + }, + labels: labels.SelectorFromSet(map[string]string{ + "label1": "val1", + "label2": "val2", + }), + wantNames: []string{}, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + // Sets up the the Context and the fake informers for the tests. + ctx, cancel, _ := pkgtesting.SetupFakeContextWithCancel(t) + defer cancel() + + // Form a fake Kube Client from tc.objects. + cs := fakeKubeClient.NewSimpleClientset(tc.objects...) + + pl, _ := GetPodList(ctx, tc.labels, cs, testNS) + + for _, pod := range pl.Items { + found := false + for _, wantName := range tc.wantNames { + if pod.Name == wantName { + found = true + } + if !found { + t.Error("Unexpected pod", pod.Name) + } + } + } + if len(pl.Items) != len(tc.wantNames) { + t.Errorf("Diffenrent Podlist length, want %v, got %v ", len(pl.Items), len(tc.wantNames)) + } + }) + } +} + +func TestGetTerminationLogFromPodList(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + podlist *corev1.PodList + wantMessage string + }{ + { + name: "qualified message in Pod State", + podlist: &corev1.PodList{ + Items: []corev1.Pod{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: testNS, + Labels: map[string]string{ + "label1": "val1", + "label2": "val2", + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{{ + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + Message: "checking authentication", + }, + }}}, + }, + }}, + }, + wantMessage: "checking authentication", + }, + { + name: "qualified message in Pod LastTerminationState", + podlist: &corev1.PodList{ + Items: []corev1.Pod{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: testNS, + Labels: map[string]string{ + "label1": "val1", + "label2": "val2", + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{{ + LastTerminationState: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + Message: "checking authentication", + }, + }}}, + }}, + }, + }, + wantMessage: "checking authentication", + }, + { + name: "un-qualified message", + podlist: &corev1.PodList{ + Items: []corev1.Pod{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: testNS, + Labels: map[string]string{ + "label1": "val1", + "label2": "val2", + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{{ + LastTerminationState: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + Message: "non-checking", + }, + }}}, + }, + }}, + }, + wantMessage: "", + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + getMessage := GetTerminationLogFromPodList(tc.podlist) + if diff := cmp.Diff(getMessage, tc.wantMessage); diff != "" { + t.Error("unexpected termination message (-want, +got) = ", diff) + } + }) + } +} diff --git a/pkg/utils/authcheck/probechecker.go b/pkg/utils/authcheck/probechecker.go new file mode 100644 index 0000000000..048a79f87d --- /dev/null +++ b/pkg/utils/authcheck/probechecker.go @@ -0,0 +1,73 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package authcheck provides utilities to check authentication configuration for data plane resources. +// probechecker.go utilities to perform a probe check for liviness and readiness. +package authcheck + +import ( + "context" + "net/http" + "strconv" + + "go.uber.org/zap" +) + +// DefaultProbeCheckPort is the default port for checking sync pool health. +const DefaultProbeCheckPort = 8080 + +type ProbeChecker struct { + logger *zap.Logger + port int + authType AuthType +} + +// NewProbeChecker returns ProbeChecker with default probe checker port. +func NewProbeChecker(logger *zap.Logger, authType AuthType) ProbeChecker { + return ProbeChecker{ + logger: logger, + port: DefaultProbeCheckPort, + authType: authType, + } +} + +// Start will initialize s http serve and start to listen. +func (pc *ProbeChecker) Start(ctx context.Context) { + srv := &http.Server{ + Addr: ":" + strconv.Itoa(pc.port), + Handler: pc, + } + + go func() { + pc.logger.Info("Starting probe checker...") + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + pc.logger.Error("the probe checker has stopped unexpectedly", zap.Error(err)) + } + }() + + <-ctx.Done() + if err := srv.Shutdown(ctx); err != nil { + pc.logger.Error("failed to shutdown the pubsub probe checker", zap.Error(err)) + } +} + +// ServerHTTP will perform the authentication check if the request path is /healthz. +func (pc *ProbeChecker) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/healthz" { + // Perform Authentication check. + AuthenticationCheck(req.Context(), pc.logger, pc.authType, w) + } +} diff --git a/pkg/utils/authcheck/probechecker_test.go b/pkg/utils/authcheck/probechecker_test.go new file mode 100644 index 0000000000..018f003612 --- /dev/null +++ b/pkg/utils/authcheck/probechecker_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package authcheck + +import ( + "context" + "fmt" + "net" + "net/http" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/google/knative-gcp/pkg/logging" +) + +func TestProbeCheckResult(t *testing.T) { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + + // Get a free port. + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + t.Fatal("Failed to resolve TCP address:", err) + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + t.Fatal("Failed to listen TCP:", err) + } + l.Close() + port := l.Addr().(*net.TCPAddr).Port + + logger := logging.FromContext(ctx) + probeChecker := ProbeChecker{ + logger: logger, + port: port, + authType: "", + } + go probeChecker.Start(ctx) + + time.Sleep(1 * time.Second) + + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/healthz", port), nil) + if err != nil { + t.Fatal("Failed to create probe check request:", err) + } + + client := http.DefaultClient + + resp, err := client.Do(req) + if err != nil { + t.Fatal("Failed to execute probe check:", err) + return + } + if diff := cmp.Diff(resp.StatusCode, http.StatusUnauthorized); diff != "" { + t.Error("unexpected probe check result (-want, +got) = ", diff) + } + cancel() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 96b04f56f4..18530e2981 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -398,6 +398,7 @@ golang.org/x/net/internal/timeseries golang.org/x/net/netutil golang.org/x/net/trace # golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43 +## explicit golang.org/x/oauth2 golang.org/x/oauth2/google golang.org/x/oauth2/internal