Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Moving to cev2 - publisher, receive adapter - Implementing KRShaped #1296

Merged
merged 34 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7b4cbcf
moving to cev2 publisher and receive adapter
nachocano Jun 17, 2020
55aa08b
updating wire gen and moving push to pubsub converter
nachocano Jun 17, 2020
ab6cb04
update-codegen
nachocano Jun 17, 2020
33bae7f
injecting converter for better UTs
nachocano Jun 17, 2020
d0afb5c
update
nachocano Jun 17, 2020
06eb406
fixing storage
nachocano Jun 18, 2020
4a9b3cb
using converter type for every source instead of just some.
nachocano Jun 18, 2020
85325c3
attempt to update tracing
nachocano Jun 18, 2020
cc27ea4
adding pull converter to maintain pullsubscription functionality
nachocano Jun 18, 2020
65c5d9a
Merge remote-tracking branch 'upstream/master' into cev2
nachocano Jun 18, 2020
a55ab95
another try at tracing
nachocano Jun 18, 2020
471d094
merge conflicts
nachocano Jun 18, 2020
4a339db
merging master
nachocano Jun 22, 2020
9bd1f17
updates after code review
nachocano Jun 23, 2020
06cf523
removing unnecessary stuff
nachocano Jun 23, 2020
bcce673
better shutdown
nachocano Jun 23, 2020
a25046a
removing good_client and cesdk-v1 sender
nachocano Jun 23, 2020
a9ad6a2
moving e2e tests to cev2
nachocano Jun 23, 2020
0d2bd1d
updates to tests
nachocano Jun 23, 2020
d44ee4c
update-codegen
nachocano Jun 23, 2020
e3ae81e
code review comments, and updating deps, trying to get rid of cev1...
nachocano Jun 23, 2020
9e6d298
Merge remote-tracking branch 'upstream/master' into cev2
nachocano Jun 23, 2020
f047a3c
implementing krshaped
nachocano Jun 23, 2020
9cb134c
removing unused genreconciler clients.
nachocano Jun 24, 2020
02611c4
commenting out adapter tests for now.. will do in a follow up
nachocano Jun 24, 2020
223698b
address code review comment
nachocano Jun 24, 2020
a11f042
Fix ut
liu-cong Jun 24, 2020
6b57a47
leaving e2e tests images with cev1 for now..
nachocano Jun 24, 2020
9ef7b16
updating deps
nachocano Jun 24, 2020
c3a8d8c
Merge remote-tracking branch 'upstream/master' into cev2
nachocano Jun 24, 2020
cd2b40c
sync start
nachocano Jun 24, 2020
be6fa45
Merge remote-tracking branch 'upstream/master' into cev2
nachocano Jun 24, 2020
5f9afd0
nits after code review
nachocano Jun 24, 2020
014802a
no need to delete attributes
nachocano Jun 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package main

import (
"github.com/google/knative-gcp/pkg/broker/ingress"
metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
"github.com/google/knative-gcp/pkg/utils/appcredentials"
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/google/knative-gcp/pkg/utils/mainhelper"

"go.uber.org/zap"
Expand Down Expand Up @@ -59,8 +59,8 @@ func main() {

ingress, err := InitializeHandler(
ctx,
ingress.Port(env.Port),
ingress.ProjectID(projectID),
clients.Port(env.Port),
clients.ProjectID(projectID),
metrics.PodName(env.PodName),
metrics.ContainerName(component),
)
Expand Down
5 changes: 3 additions & 2 deletions cmd/broker/ingress/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/ingress"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/google/wire"
)

func InitializeHandler(
ctx context.Context,
port ingress.Port,
projectID ingress.ProjectID,
port clients.Port,
projectID clients.ProjectID,
podName metrics.PodName,
containerName metrics.ContainerName,
) (*ingress.Handler, error) {
Expand Down
7 changes: 4 additions & 3 deletions cmd/broker/ingress/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 32 additions & 17 deletions cmd/pubsub/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@ import (
"flag"
"log"

"cloud.google.com/go/compute/metadata"
metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata"
. "github.com/google/knative-gcp/pkg/pubsub/publisher"
tracingconfig "github.com/google/knative-gcp/pkg/tracing"
"github.com/google/knative-gcp/pkg/utils"
"github.com/google/knative-gcp/pkg/utils/appcredentials"
"github.com/google/knative-gcp/pkg/utils/clients"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"knative.dev/eventing/pkg/tracing"

"github.com/google/knative-gcp/pkg/pubsub/publisher"
tracingconfig "github.com/google/knative-gcp/pkg/tracing"
)

type envConfig struct {
// Environment variable containing the port for the publisher.
Port int `envconfig:"PORT" default:"8080"`

// Environment variable containing project id.
Project string `envconfig:"PROJECT_ID"`

Expand All @@ -47,6 +52,8 @@ type envConfig struct {
}

func main() {
appcredentials.MustExistOrUnsetEnv()

flag.Parse()

ctx := context.Background()
Expand All @@ -62,15 +69,15 @@ func main() {
logger.Fatal("Failed to process env var", zap.Error(err))
}

if env.Project == "" {
project, err := metadata.ProjectID()
if err != nil {
logger.Fatal("failed to find project id. ", zap.Error(err))
}
env.Project = project
projectID, err := utils.ProjectID(env.Project, metadataClient.NewDefaultMetadataClient())
if err != nil {
logger.Fatal("Failed to retrieve project id", zap.Error(err))
}

logger.Info("Using project.", zap.String("project", env.Project))
topicID := env.Topic
if topicID == "" {
logger.Fatal("Failed to retrieve topic id", zap.Error(err))
}

tracingConfig, err := tracingconfig.JSONToConfig(env.TracingConfigJson)
if err != nil {
Expand All @@ -80,13 +87,21 @@ func main() {
logger.Error("Failed to setup tracing", zap.Error(err), zap.Any("tracingConfig", tracingConfig))
}

startable := &publisher.Publisher{
ProjectID: env.Project,
TopicID: env.Topic,
logger.Info("Initializing publisher", zap.String("Project ID", projectID), zap.String("Topic ID", topicID))

publisher, err := InitializePublisher(
ctx,
clients.Port(env.Port),
clients.ProjectID(projectID),
TopicID(topicID),
)

if err != nil {
logger.Fatal("Unable to create publisher", zap.Error(err))
}

logger.Info("Starting Pub/Sub Publisher.", zap.Any("publisher", startable))
if err := startable.Start(ctx); err != nil {
logger.Fatal("failed to start publisher: ", zap.Error(err))
logger.Info("Starting publisher", zap.Any("publisher", publisher))
if err := publisher.Start(ctx); err != nil {
logger.Fatal("Failed to start publisher", zap.Error(err))
nachocano marked this conversation as resolved.
Show resolved Hide resolved
}
}
39 changes: 39 additions & 0 deletions cmd/pubsub/publisher/wire.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// +build wireinject

/*
Copyright 2020 Google LLC.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"

"github.com/google/knative-gcp/pkg/pubsub/publisher"
"github.com/google/knative-gcp/pkg/utils/clients"

"github.com/google/wire"
)

func InitializePublisher(
ctx context.Context,
port clients.Port,
projectID clients.ProjectID,
topicID publisher.TopicID,
) (*publisher.Publisher, error) {
panic(wire.Build(
publisher.PublisherSet,
))
}
25 changes: 25 additions & 0 deletions cmd/pubsub/publisher/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading