Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Drop non-event messages #1212

Merged
merged 7 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion pkg/broker/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package handler

import (
"context"
"errors"
"sync/atomic"
"time"

Expand Down Expand Up @@ -74,8 +75,15 @@ func (h *Handler) IsAlive() bool {
func (h *Handler) receive(ctx context.Context, msg *pubsub.Message) {
ctx = metrics.StartEventProcessing(ctx)
event, err := binding.ToEvent(ctx, cepubsub.NewMessage(msg))
if isNonRetryable(err) {
logEventConversionError(ctx, msg, err, "failed to convert received message to an event, check the msg format")
// Ack the message so it won't be retried.
// TODO Should this go to the DLQ once DLQ is implemented?
msg.Ack()
return
}
if err != nil {
logging.FromContext(ctx).Error("failed to convert received message to an event", zap.Any("message", msg), zap.Error(err))
logEventConversionError(ctx, msg, err, "unknown error when converting the received message to an event")
msg.Nack()
return
}
Expand All @@ -92,3 +100,22 @@ func (h *Handler) receive(ctx context.Context, msg *pubsub.Message) {
}
msg.Ack()
}

func isNonRetryable(err error) bool {
// The following errors can be returned by ToEvent and are not retryable.
// TODO Should binding.ToEvent consolidate them and return the generic ErrCannotConvertToEvent?
return errors.Is(err, binding.ErrCannotConvertToEvent) || errors.Is(err, binding.ErrNotStructured) || errors.Is(err, binding.ErrUnknownEncoding) || errors.Is(err, binding.ErrNotBinary)
}

// Log the full message in debug level and a truncated version as an error in case the message is too big (can be as big as 10MB),
func logEventConversionError(ctx context.Context, pm *pubsub.Message, err error, msg string) {
maxLen := 2000
truncated := pm
if len(pm.Data) > maxLen {
copy := *pm
copy.Data = copy.Data[:maxLen]
truncated = &copy
}
logging.FromContext(ctx).Debug(msg, zap.Any("message", pm), zap.Error(err))
logging.FromContext(ctx).Error(msg, zap.Any("message", truncated), zap.Error(err))
liu-cong marked this conversation as resolved.
Show resolved Hide resolved
}
38 changes: 29 additions & 9 deletions pkg/broker/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (
"github.com/google/knative-gcp/pkg/broker/handler/processors"
)

const (
testProjectID = "test-testProjectID"
testTopic = "test-testTopic"
testSub = "test-testSub"
)

func testPubsubClient(ctx context.Context, t *testing.T, projectID string) (*pubsub.Client, func()) {
t.Helper()
srv := pstest.NewServer()
Expand All @@ -53,14 +59,14 @@ func testPubsubClient(ctx context.Context, t *testing.T, projectID string) (*pub

func TestHandler(t *testing.T) {
ctx := context.Background()
c, close := testPubsubClient(ctx, t, "test-project")
c, close := testPubsubClient(ctx, t, testProjectID)
defer close()

topic, err := c.CreateTopic(ctx, "test-topic")
topic, err := c.CreateTopic(ctx, testTopic)
if err != nil {
t.Fatalf("failed to create topic: %v", err)
}
sub, err := c.CreateSubscription(ctx, "test-sub", pubsub.SubscriptionConfig{
sub, err := c.CreateSubscription(ctx, testSub, pubsub.SubscriptionConfig{
Topic: topic,
})
if err != nil {
Expand All @@ -69,8 +75,8 @@ func TestHandler(t *testing.T) {

p, err := cepubsub.New(context.Background(),
cepubsub.WithClient(c),
cepubsub.WithProjectID("test-project"),
cepubsub.WithTopicID("test-topic"),
cepubsub.WithProjectID(testProjectID),
cepubsub.WithTopicID(testTopic),
)
if err != nil {
t.Fatalf("failed to create cloudevents pubsub protocol: %v", err)
Expand All @@ -97,7 +103,7 @@ func TestHandler(t *testing.T) {

t.Run("handle event success", func(t *testing.T) {
if err := p.Send(ctx, binding.ToMessage(&testEvent)); err != nil {
t.Errorf("failed to seed event to pubsub: %v", err)
t.Fatalf("failed to seed event to pubsub: %v", err)
}
gotEvent := nextEventWithTimeout(eventCh)
if diff := cmp.Diff(&testEvent, gotEvent); diff != "" {
Expand All @@ -110,7 +116,7 @@ func TestHandler(t *testing.T) {
processor.OneTimeErr = true
unlock()
if err := p.Send(ctx, binding.ToMessage(&testEvent)); err != nil {
t.Errorf("failed to seed event to pubsub: %v", err)
t.Fatalf("failed to seed event to pubsub: %v", err)
}
// On failure, the handler should nack the pubsub message.
// And we should expect two deliveries.
Expand All @@ -122,12 +128,26 @@ func TestHandler(t *testing.T) {
}
})

t.Run("message is not an event", func(t *testing.T) {
res := topic.Publish(context.Background(), &pubsub.Message{ID: "testid"})
ctx, _ := context.WithTimeout(context.Background(), 50*time.Millisecond)
if _, err := res.Get(ctx); err != nil {
t.Fatalf("Failed to publish a msg to topic: %v", err)
}

gotEvent := nextEventWithTimeout(eventCh)
// The message should be Acked and should not reach the processor
if gotEvent != nil {
t.Errorf("processor should receive 0 events but got: %+v", gotEvent)
}
})

t.Run("timeout on event processing", func(t *testing.T) {
unlock := processor.Lock()
processor.BlockUntilCancel = true
unlock()
if err := p.Send(ctx, binding.ToMessage(&testEvent)); err != nil {
t.Errorf("failed to seed event to pubsub: %v", err)
t.Fatalf("failed to seed event to pubsub: %v", err)
}
gotEvent := nextEventWithTimeout(eventCh)
if diff := cmp.Diff(&testEvent, gotEvent); diff != "" {
Expand All @@ -143,7 +163,7 @@ func TestHandler(t *testing.T) {

func nextEventWithTimeout(eventCh <-chan *event.Event) *event.Event {
select {
case <-time.After(30 * time.Second):
case <-time.After(time.Second):
return nil
case got := <-eventCh:
return got
Expand Down