Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Authentication check running inside the Pod for Source and BrokerCell…
Browse files Browse the repository at this point in the history
… Fanout and Retry (#1982)

* running auth check in Pod

* address comments

* address comments

* address comments

* address comments

* address comments
  • Loading branch information
grac3gao-zz authored Dec 17, 2020
1 parent 3aaedf5 commit 980eb70
Show file tree
Hide file tree
Showing 44 changed files with 1,235 additions and 65 deletions.
6 changes: 5 additions & 1 deletion cmd/broker/fanout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
"github.com/google/knative-gcp/pkg/utils/appcredentials"
"github.com/google/knative-gcp/pkg/utils/authcheck"
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/google/knative-gcp/pkg/utils/mainhelper"

Expand All @@ -45,6 +46,9 @@ type envConfig struct {
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`
MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"`

// Environment variable containing the authType, which represents the authentication configuration mode the Pod is using.
AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""`

// MaxStaleDuration is the max duration of the handler pool without being synced.
// With the internal pool resync period being 15s, it requires at least 4
// continuous sync failures (or no sync at all) to be stale.
Expand Down Expand Up @@ -99,7 +103,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create fanout sync pool", zap.Error(err))
}
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, authcheck.NewDefault(env.AuthType)); err != nil {
logger.Fatalw("Failed to start fanout sync pool", zap.Error(err))
}

Expand Down
5 changes: 5 additions & 0 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
"github.com/google/knative-gcp/pkg/utils/appcredentials"
"github.com/google/knative-gcp/pkg/utils/authcheck"
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/google/knative-gcp/pkg/utils/mainhelper"

Expand All @@ -31,6 +32,9 @@ type envConfig struct {
PodName string `envconfig:"POD_NAME" required:"true"`
Port int `envconfig:"PORT" default:"8080"`

// Environment variable containing the authType, which represents the authentication configuration mode the Pod is using.
AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""`

// Default 300Mi.
PublishBufferedByteLimit int `envconfig:"PUBLISH_BUFFERED_BYTES_LIMIT" default:"314572800"`
}
Expand Down Expand Up @@ -66,6 +70,7 @@ func main() {
metrics.PodName(env.PodName),
metrics.ContainerName(component),
publishSetting(logger.Desugar(), env),
env.AuthType,
)
if err != nil {
logger.Desugar().Fatal("Unable to create ingress handler: ", zap.Error(err))
Expand Down
2 changes: 2 additions & 0 deletions cmd/broker/ingress/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/ingress"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils/authcheck"
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/google/wire"
)
Expand All @@ -36,6 +37,7 @@ func InitializeHandler(
podName metrics.PodName,
containerName metrics.ContainerName,
publishSettings pubsub.PublishSettings,
authType authcheck.AuthType,
) (*ingress.Handler, error) {
panic(wire.Build(
ingress.HandlerSet,
Expand Down
5 changes: 3 additions & 2 deletions cmd/broker/ingress/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion cmd/broker/retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
"github.com/google/knative-gcp/pkg/utils/appcredentials"
"github.com/google/knative-gcp/pkg/utils/authcheck"
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/google/knative-gcp/pkg/utils/mainhelper"
)
Expand All @@ -44,6 +45,9 @@ type envConfig struct {
TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"`
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`

// Environment variable containing the authType, which represents the authentication configuration mode the Pod is using.
AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""`

// Outstanding messages effectively limits how many connections we will create to each subscriber.
// If such connections are long, it will consume a lot of memory (aggregated) without limiting.
OutstandingMessagesPerSub int `envconfig:"OUTSTANDING_MESSAGES_PER_SUB" default:"200"`
Expand Down Expand Up @@ -97,7 +101,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to get retry sync pool", zap.Error(err))
}
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultProbeCheckPort, authcheck.NewDefault(env.AuthType)); err != nil {
logger.Fatal("Failed to start retry sync pool", zap.Error(err))
}

Expand Down
14 changes: 10 additions & 4 deletions cmd/pubsub/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,27 @@ import (
"flag"
"log"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"knative.dev/pkg/tracing"

. "github.com/google/knative-gcp/pkg/pubsub/publisher"
"github.com/google/knative-gcp/pkg/testing/testloggingutil"
tracingconfig "github.com/google/knative-gcp/pkg/tracing"
"github.com/google/knative-gcp/pkg/utils"
"github.com/google/knative-gcp/pkg/utils/appcredentials"
"github.com/google/knative-gcp/pkg/utils/authcheck"
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"knative.dev/pkg/tracing"
)

type envConfig struct {
// Environment variable containing the port for the publisher.
Port int `envconfig:"PORT" default:"8080"`

// Environment variable containing the authType, which represents the authentication configuration mode the Pod is using.
AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""`

// Topic is the environment variable containing the PubSub Topic being
// subscribed to's name. In the form that is unique within the project.
// E.g. 'laconia', not 'projects/my-gcp-project/topics/laconia'.
Expand Down Expand Up @@ -95,6 +100,7 @@ func main() {
clients.Port(env.Port),
clients.ProjectID(projectID),
TopicID(topicID),
env.AuthType,
)

if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/pubsub/publisher/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"

"github.com/google/knative-gcp/pkg/pubsub/publisher"
"github.com/google/knative-gcp/pkg/utils/authcheck"
"github.com/google/knative-gcp/pkg/utils/clients"

"github.com/google/wire"
Expand All @@ -32,6 +33,7 @@ func InitializePublisher(
port clients.Port,
projectID clients.ProjectID,
topicID publisher.TopicID,
authType authcheck.AuthType,
) (*publisher.Publisher, error) {
panic(wire.Build(
publisher.PublisherSet,
Expand Down
5 changes: 3 additions & 2 deletions cmd/pubsub/publisher/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions cmd/pubsub/receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/google/knative-gcp/pkg/testing/testloggingutil"
"github.com/google/knative-gcp/pkg/utils/authcheck"

"go.uber.org/zap"
"knative.dev/pkg/logging"
Expand All @@ -48,6 +49,9 @@ const (
// TODO we should refactor this and reduce the number of environment variables.
// most of them are due to metrics, which has to change anyways.
type envConfig struct {
// Environment variable containing the authType, which represents the authentication configuration mode the Pod is using.
AuthType authcheck.AuthType `envconfig:"K_GCP_AUTH_TYPE" default:""`

// Environment variable containing the sink URI.
Sink string `envconfig:"SINK_URI" required:"true"`

Expand Down Expand Up @@ -173,6 +177,7 @@ func main() {
SinkURI: env.Sink,
TransformerURI: env.Transformer,
Extensions: extensions,
AuthType: env.AuthType,
}

adapter, err := InitializeAdapter(ctx,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
google.golang.org/api v0.36.0
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a
Expand Down
12 changes: 11 additions & 1 deletion pkg/apis/intevents/v1/pullsubscription_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"knative.dev/pkg/apis"
)

const replicaUnavailableReason = "MinimumReplicasUnavailable"

// GetCondition returns the condition currently associated with the given type, or nil.
func (s *PullSubscriptionStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return pullSubscriptionCondSet.Manage(s).GetCondition(t)
Expand Down Expand Up @@ -93,14 +95,21 @@ func (s *PullSubscriptionStatus) MarkDeployedUnknown(reason, messageFormat strin

// PropagateDeploymentAvailability uses the availability of the provided Deployment to determine if
// PullSubscriptionConditionDeployed should be marked as true or false.
func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) {
// For authentication check purpose, this method will return false if a false condition
// is caused by deployment's replicaset unavailable.
// ReplicaSet unavailable is a sign for potential authentication problems.
func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) bool {
deploymentAvailableFound := false
replicaAvailable := true
for _, cond := range d.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
deploymentAvailableFound = true
if cond.Status == corev1.ConditionTrue {
pullSubscriptionCondSet.Manage(s).MarkTrue(PullSubscriptionConditionDeployed)
} else if cond.Status == corev1.ConditionFalse {
if cond.Reason == replicaUnavailableReason {
replicaAvailable = false
}
pullSubscriptionCondSet.Manage(s).MarkFalse(PullSubscriptionConditionDeployed, cond.Reason, cond.Message)
} else if cond.Status == corev1.ConditionUnknown {
pullSubscriptionCondSet.Manage(s).MarkUnknown(PullSubscriptionConditionDeployed, cond.Reason, cond.Message)
Expand All @@ -110,4 +119,5 @@ func (s *PullSubscriptionStatus) PropagateDeploymentAvailability(d *appsv1.Deplo
if !deploymentAvailableFound {
pullSubscriptionCondSet.Manage(s).MarkUnknown(PullSubscriptionConditionDeployed, "DeploymentUnavailable", "Deployment %q is unavailable.", d.Name)
}
return replicaAvailable
}
25 changes: 25 additions & 0 deletions pkg/apis/intevents/v1/pullsubscription_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ var (
},
},
}

replicaUnavailableDeployment = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
},
Status: appsv1.DeploymentStatus{
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionFalse,
Reason: "MinimumReplicasUnavailable",
Message: "False Status",
},
},
},
}
)

func TestPullSubscriptionStatusIsReady(t *testing.T) {
Expand Down Expand Up @@ -506,3 +522,12 @@ func TestPullSubscriptionStatusGetCondition(t *testing.T) {
})
}
}

func TestPropagateDeploymentAvailability(t *testing.T) {
s := &PullSubscriptionStatus{}
got := s.PropagateDeploymentAvailability(replicaUnavailableDeployment)
want := false
if diff := cmp.Diff(want, got); diff != "" {
t.Error("unexpected condition (-want, +got) =", diff)
}
}
Loading

0 comments on commit 980eb70

Please sign in to comment.