Skip to content
This repository has been archived by the owner on Dec 9, 2024. It is now read-only.

Include authorization in the event #456

Merged
merged 1 commit into from
Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func New(eventType TypeName, mimeType string, payload interface{}) *Event {
Data: payload,
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{
"transformed": true,
"transformed": "true",
"transformation-version": TransformationVersion,
},
},
Expand Down Expand Up @@ -136,7 +136,7 @@ func (e Event) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("contentType", e.ContentType)
}
if e.Extensions != nil {
e.Extensions.MarshalLogObject(enc)
enc.AddObject("extensions", e.Extensions)
}

payload, _ := json.Marshal(e.Data)
Expand Down
10 changes: 5 additions & 5 deletions event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var newTests = []struct {
Data: []byte("test"),
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{
"transformed": true,
"transformed": "true",
"transformation-version": eventpkg.TransformationVersion,
},
},
Expand All @@ -99,7 +99,7 @@ var newTests = []struct {
Data: eventpkg.SystemEventReceivedData{},
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{
"transformed": true,
"transformed": "true",
"transformation-version": eventpkg.TransformationVersion,
},
},
Expand Down Expand Up @@ -223,7 +223,7 @@ var fromRequestTests = []struct {
ContentType: "application/octet-stream",
Data: []byte("hey there"),
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{"transformed": true, "transformation-version": "0.1"}},
"eventgateway": map[string]interface{}{"transformed": "true", "transformation-version": "0.1"}},
},
},
{
Expand Down Expand Up @@ -264,7 +264,7 @@ var fromRequestTests = []struct {
ContentType: "application/json",
Data: map[string]interface{}{"eventType": "user.created"},
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{"transformed": true, "transformation-version": "0.1"}},
"eventgateway": map[string]interface{}{"transformed": "true", "transformation-version": "0.1"}},
},
},
{
Expand All @@ -286,7 +286,7 @@ var fromRequestTests = []struct {
Body: []byte("hey there"),
},
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{"transformed": true, "transformation-version": "0.1"}},
"eventgateway": map[string]interface{}{"transformed": "true", "transformation-version": "0.1"}},
},
},
}
3 changes: 2 additions & 1 deletion internal/zap/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zap

import (
"encoding/json"

"go.uber.org/zap/zapcore"
)

Expand All @@ -23,7 +24,7 @@ type MapStringInterface map[string]interface{}
func (msi MapStringInterface) MarshalLogObject(enc zapcore.ObjectEncoder) error {
for key, val := range msi {
v, err := json.Marshal(val)
if err != nil {
if err == nil {
enc.AddString(key, string(v))
} else {
return err
Expand Down
39 changes: 26 additions & 13 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"sync"

"github.com/jinzhu/copier"
"github.com/rs/cors"
"go.uber.org/zap"

Expand Down Expand Up @@ -87,10 +88,10 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {

syncSubscriber := router.targetCache.SyncSubscriber(r.Method, path, event.EventType)
if syncSubscriber != nil { // There is sync subscriber and possibly async subscribers also
router.handleSyncSubscription(path, event, *syncSubscriber, w, r)
router.handleSyncSubscription(path, *event, *syncSubscriber, w, r)
}

router.handleAsyncSubscriptions(r.Method, path, event, r)
router.handleAsyncSubscriptions(r.Method, path, *event, r)
if syncSubscriber == nil {
w.WriteHeader(http.StatusAccepted)
}
Expand Down Expand Up @@ -148,19 +149,19 @@ var (
errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function")
)

func (router *Router) handleSyncSubscription(path string, event *eventpkg.Event, subscriber SyncSubscriber, w http.ResponseWriter, r *http.Request) {
func (router *Router) handleSyncSubscription(path string, event eventpkg.Event, subscriber SyncSubscriber, w http.ResponseWriter, r *http.Request) {
// metrics & logs
metricEventsReceived.WithLabelValues(subscriber.Space, string(event.EventType)).Inc()
router.log.Debug("Event received.", zap.String("path", path), zap.String("space", subscriber.Space), zap.Object("event", event))
err := router.emitSystemEventReceived(path, *event, r.Header)
err := router.emitSystemEventReceived(path, event, r.Header)
if err != nil {
router.log.Debug("Event processing stopped because sync plugin subscription returned an error.",
zap.Object("event", event),
zap.Error(err))
return
}

err = router.authorizeEventType(subscriber.Space, event, r)
err = router.authorizeEventType(subscriber.Space, &event, r)
if err != nil {
w.WriteHeader(http.StatusForbidden)
return
Expand All @@ -172,7 +173,7 @@ func (router *Router) handleSyncSubscription(path string, event *eventpkg.Event,
httpRequestData.Params = subscriber.Params
event.Data = httpRequestData
}
router.httpRequestHandler(subscriber.Space, subscriber.FunctionID, event)(w, r)
router.httpRequestHandler(subscriber.Space, subscriber.FunctionID, &event)(w, r)

metricEventsProcessed.WithLabelValues(subscriber.Space, string(event.EventType)).Inc()
}
Expand Down Expand Up @@ -219,7 +220,7 @@ func (router *Router) httpRequestHandler(space string, backingFunction function.
}

// handleAsyncSubscriptions fetched events subscribers, runs authorization and enqueues event in the queue
func (router *Router) handleAsyncSubscriptions(method, path string, event *eventpkg.Event, r *http.Request) {
func (router *Router) handleAsyncSubscriptions(method, path string, event eventpkg.Event, r *http.Request) {
if event.IsSystem() {
router.log.Debug("System event received.", zap.Object("event", event))
}
Expand All @@ -228,9 +229,11 @@ func (router *Router) handleAsyncSubscriptions(method, path string, event *event
for _, subscriber := range subscribers {
metricEventsReceived.WithLabelValues(subscriber.Space, "custom").Inc()

err := router.authorizeEventType(subscriber.Space, event, r)
subEvent := eventpkg.Event{}
copier.Copy(&subEvent, &event)
err := router.authorizeEventType(subscriber.Space, &subEvent, r)
if err == nil {
router.enqueueWork(method, path, subscriber.Space, subscriber.FunctionID, *event)
router.enqueueWork(method, path, subscriber.Space, subscriber.FunctionID, subEvent)
}
}
}
Expand Down Expand Up @@ -331,6 +334,16 @@ func (router *Router) authorizeEventType(space string, event *eventpkg.Event, r
zap.Object("event", event))
return errors.New(authorizerResponse.AuthorizationError.Message)
}

if egExternsions, ok := event.Extensions["eventgateway"]; ok {
egExternsions.(map[string]interface{})["authorization"] = authorizerResponse.Authorization
} else {
event.Extensions = map[string]interface{}{
"eventgateway": map[string]interface{}{
"authorization": authorizerResponse.Authorization,
},
}
}
}

return nil
Expand Down Expand Up @@ -425,7 +438,7 @@ func (router *Router) emitSystemEventReceived(path string, event eventpkg.Event,
mimeJSON,
eventpkg.SystemEventReceivedData{Path: path, Event: event, Headers: ihttp.FlattenHeader(header)},
)
router.handleAsyncSubscriptions(http.MethodPost, "/", system, nil)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)
return router.plugins.React(system)
}

Expand All @@ -435,7 +448,7 @@ func (router *Router) emitSystemFunctionInvoking(space string, functionID functi
mimeJSON,
eventpkg.SystemFunctionInvokingData{Space: space, FunctionID: functionID, Event: event},
)
router.handleAsyncSubscriptions(http.MethodPost, "/", system, nil)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)

metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvokingType)).Inc()

Expand All @@ -447,7 +460,7 @@ func (router *Router) emitSystemFunctionInvoked(space string, functionID functio
eventpkg.SystemFunctionInvokedType,
mimeJSON,
eventpkg.SystemFunctionInvokedData{Space: space, FunctionID: functionID, Event: event, Result: result})
router.handleAsyncSubscriptions(http.MethodPost, "/", system, nil)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)

metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvokedType)).Inc()

Expand All @@ -460,7 +473,7 @@ func (router *Router) emitSystemFunctionInvocationFailed(space string, functionI
eventpkg.SystemFunctionInvocationFailedType,
mimeJSON,
eventpkg.SystemFunctionInvocationFailedData{Space: space, FunctionID: functionID, Event: event, Error: err})
router.handleAsyncSubscriptions(http.MethodPost, "/", system, nil)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)

metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvocationFailedType)).Inc()
}
Expand Down
Loading