-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathrelay_endpoints.go
332 lines (298 loc) · 12.3 KB
/
relay_endpoints.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
package relay
import (
"crypto/sha1" //nolint:gosec // we're not using SHA1 for encryption, just for generating an insecure hash
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"sort"
"strconv"
"time"
"github.com/launchdarkly/ld-relay/v8/internal/basictypes"
"github.com/launchdarkly/ld-relay/v8/internal/logging"
"github.com/launchdarkly/ld-relay/v8/internal/middleware"
"github.com/launchdarkly/ld-relay/v8/internal/relayenv"
"github.com/launchdarkly/ld-relay/v8/internal/streams"
"github.com/launchdarkly/ld-relay/v8/internal/util"
"github.com/launchdarkly/go-jsonstream/v3/jwriter"
"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
ldevents "github.com/launchdarkly/go-sdk-events/v3"
"github.com/launchdarkly/go-server-sdk-evaluation/v3/ldmodel"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"
"github.com/gorilla/mux"
)
func getClientSideContextProperties(
clientCtx relayenv.EnvContext,
sdkKind basictypes.SDKKind,
req *http.Request,
w http.ResponseWriter,
) (ldcontext.Context, bool) {
var ldContext ldcontext.Context
var contextDecodeErr error
if req.Method == "REPORT" {
if req.Header.Get("Content-Type") != "application/json" {
w.WriteHeader(http.StatusUnsupportedMediaType)
_, _ = w.Write([]byte("Content-Type must be application/json."))
return ldContext, false
}
body, _ := io.ReadAll(req.Body)
contextDecodeErr = json.Unmarshal(body, &ldContext)
} else {
base64Context := mux.Vars(req)["context"] // this assumes we have used {context} as a placeholder in the route
ldContext, contextDecodeErr = middleware.ContextFromBase64(base64Context)
}
if contextDecodeErr != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write(util.ErrorJSONMsg(contextDecodeErr.Error()))
return ldContext, false
}
if clientCtx.IsSecureMode() && sdkKind == basictypes.JSClientSDK {
hash := req.URL.Query().Get("h")
valid := false
if hash != "" {
validHash := clientCtx.GetClient().SecureModeHash(ldContext)
valid = hash == validHash
}
if !valid {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write(util.ErrorJSONMsg("Environment is in secure mode, and context hash does not match."))
return ldContext, false
}
}
return ldContext, true
}
// Old stream endpoint that just sends "ping" events: clientstream.ld.com/mping (mobile)
// or clientstream.ld.com/ping/{envId} (JS)
func pingStreamHandler(streamProvider streams.StreamProvider) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
clientCtx := middleware.GetEnvContextInfo(req.Context())
clientCtx.Env.GetLoggers().Debug("Application requested client-side ping stream")
clientCtx.Env.GetStreamHandler(streamProvider, clientCtx.Credential).ServeHTTP(w, req)
})
}
// This handler is used for client-side streaming endpoints that require context properties. Currently it is
// implemented the same as the ping stream once we have validated the context.
func pingStreamHandlerWithContext(sdkKind basictypes.SDKKind, streamProvider streams.StreamProvider) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
clientCtx := middleware.GetEnvContextInfo(req.Context())
clientCtx.Env.GetLoggers().Debug("Application requested client-side ping stream")
if _, ok := getClientSideContextProperties(clientCtx.Env, sdkKind, req, w); ok {
clientCtx.Env.GetStreamHandler(streamProvider, clientCtx.Credential).ServeHTTP(w, req)
}
})
}
// Multi-purpose streaming handler; all details of the behavior of the particular type of stream are
// abstracted in StreamProvider and EnvStreams
func streamHandler(streamProvider streams.StreamProvider, logMessage string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
clientCtx := middleware.GetEnvContextInfo(req.Context())
clientCtx.Env.GetLoggers().Debug(logMessage)
clientCtx.Env.GetStreamHandler(streamProvider, clientCtx.Credential).ServeHTTP(w, req)
})
}
// PHP SDK polling endpoint for all flags: app.ld.com/sdk/flags
func pollAllFlagsHandler(w http.ResponseWriter, req *http.Request) {
clientCtx := middleware.GetEnvContextInfo(req.Context())
data, err := clientCtx.Env.GetStore().GetAll(ldstoreimpl.Features())
if err != nil {
clientCtx.Env.GetLoggers().Errorf("Error reading feature store: %s", err)
w.WriteHeader(500)
return
}
respData := serializeFlagsAsMap(data)
// Compute an overall Etag for the data set by hashing flag keys and versions
hash := sha1.New() //nolint:gas // just used for insecure hashing
sort.Slice(data, func(i, j int) bool { return data[i].Key < data[j].Key }) // makes the hash deterministic
for _, item := range data {
_, _ = io.WriteString(hash, fmt.Sprintf("%s:%d", item.Key, item.Item.Version))
}
etag := hex.EncodeToString(hash.Sum(nil))[:15]
writeCacheableJSONResponse(w, req, clientCtx.Env, respData, etag)
}
// PHP SDK polling endpoint for a flag: app.ld.com/sdk/flags/{key}
func pollFlagHandler(w http.ResponseWriter, req *http.Request) {
pollFlagOrSegment(middleware.GetEnvContextInfo(req.Context()).Env, ldstoreimpl.Features())(w, req)
}
// PHP SDK polling endpoint for a segment: app.ld.com/sdk/segments/{key}
func pollSegmentHandler(w http.ResponseWriter, req *http.Request) {
pollFlagOrSegment(middleware.GetEnvContextInfo(req.Context()).Env, ldstoreimpl.Segments())(w, req)
}
// Event-recorder endpoints:
// events.ld.com/bulk (server-side)
// events.ld.com/diagnostic (server-side diagnostic)
// events.ld.com/mobile, events.ld.com/mobile/events, events.ld.com/mobileevents/bulk (mobile)
// events.ld.com/mobile/events/diagnostic (mobile diagnostic)
// events.ld.com/events/bulk/{envId} (JS)
// events.ld.com/events/diagnostic/{envId} (JS)
func bulkEventHandler(sdkKind basictypes.SDKKind, eventsKind ldevents.EventDataKind, offline bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if offline {
w.WriteHeader(http.StatusAccepted)
if req.Body != nil {
_ = req.Body.Close()
}
return
}
clientCtx := middleware.GetEnvContextInfo(req.Context())
dispatcher := clientCtx.Env.GetEventDispatcher()
if dispatcher == nil {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write(util.ErrorJSONMsg("Event proxy is not enabled for this environment"))
return
}
handler := dispatcher.GetHandler(sdkKind, eventsKind)
if handler == nil {
// Note, if this ever happens, it is a programming error since we are only supposed to
// be using a fixed set of Endpoint values that the dispatcher knows about.
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write(util.ErrorJSONMsg("Internal error in event proxy"))
logging.GetGlobalContextLoggers(req.Context()).Errorf("Tried to proxy %s events for %s but no handler was defined",
eventsKind, sdkKind)
return
}
handler(w, req)
})
}
// Client-side evaluation endpoint, new schema with metadata:
// /sdk/evalx/{envId}/contexts/{context} (GET)
// /sdk/evalx/{envId}/context (REPORT)
// /sdk/evalx/{envId}/users/{context} (GET)
// /sdk/evalx/{envId}/user (REPORT)
// /sdk/evalx/users/{context} (GET - with SDK key auth; this is a Relay-only endpoint)
// /sdk/evalx/user (REPORT - with SDK key auth; this is a Relay-only endpoint)
func evaluateAllFeatureFlags(sdkKind basictypes.SDKKind) func(w http.ResponseWriter, req *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
evaluateAllShared(w, req, sdkKind)
}
}
func evaluateAllShared(w http.ResponseWriter, req *http.Request, sdkKind basictypes.SDKKind) {
clientCtx := middleware.GetEnvContextInfo(req.Context())
client := clientCtx.Env.GetClient()
store := clientCtx.Env.GetStore()
loggers := clientCtx.Env.GetLoggers()
ldContext, ok := getClientSideContextProperties(clientCtx.Env, sdkKind, req, w)
if !ok {
return
}
withReasons := req.URL.Query().Get("withReasons") == "true"
w.Header().Set("Content-Type", "application/json")
if !client.Initialized() {
if store.IsInitialized() {
loggers.Warn("Called before client initialization; using last known values from feature store")
} else {
w.WriteHeader(http.StatusServiceUnavailable)
loggers.Warn("Called before client initialization. Feature store not available")
_, _ = w.Write(util.ErrorJSONMsg("Service not initialized"))
return
}
}
if !ldContext.Multiple() && ldContext.Key() == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write(util.ErrorJSONMsg("User must have a 'key' attribute"))
return
}
loggers.Debugf("Application requested client-side flags (%s) for context: %s", sdkKind, ldContext.Key())
items, err := store.GetAll(ldstoreimpl.Features())
if err != nil {
loggers.Warnf("Unable to fetch flags from feature store. Returning nil map. Error: %s", err)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write(util.ErrorJSONMsgf("Error fetching flags from feature store: %s", err))
return
}
evaluator := clientCtx.Env.GetEvaluator()
responseWriter := jwriter.NewWriter()
responseObj := responseWriter.Object()
for _, item := range items {
if flag, ok := item.Item.Item.(*ldmodel.FeatureFlag); ok {
switch sdkKind {
case basictypes.JSClientSDK:
if !flag.ClientSideAvailability.UsingEnvironmentID {
continue
}
case basictypes.MobileSDK:
if !flag.ClientSideAvailability.UsingMobileKey {
continue
}
}
result := evaluator.Evaluate(flag, ldContext, nil)
detail := result.Detail
isExperiment := result.IsExperiment
valueObj := responseObj.Name(flag.Key).Object()
detail.Value.WriteToJSONWriter(valueObj.Name("value"))
detail.VariationIndex.WriteToJSONWriter(valueObj.Name("variation"))
valueObj.Name("version").Int(flag.Version)
valueObj.Maybe("trackEvents", flag.TrackEvents || isExperiment).Bool(true)
valueObj.Maybe("trackReason", isExperiment).Bool(true)
if withReasons || isExperiment {
detail.Reason.WriteToJSONWriter(valueObj.Name("reason"))
}
valueObj.Maybe("debugEventsUntilDate", flag.DebugEventsUntilDate != 0).
Float64(float64(flag.DebugEventsUntilDate))
valueObj.End()
}
}
responseObj.End()
result := responseWriter.Bytes()
w.WriteHeader(http.StatusOK)
_, _ = w.Write(result)
}
func pollFlagOrSegment(clientContext relayenv.EnvContext, kind ldstoretypes.DataKind) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
key := mux.Vars(req)["key"]
item, err := clientContext.GetStore().Get(kind, key)
if err != nil {
clientContext.GetLoggers().Errorf("Error reading feature store: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if item.Item == nil {
w.WriteHeader(http.StatusNotFound)
} else {
bytes, err := json.Marshal(item.Item)
if err == nil {
writeCacheableJSONResponse(w, req, clientContext, bytes, strconv.Itoa(item.Version))
} else {
clientContext.GetLoggers().Errorf("Error marshaling JSON: %s", err)
w.WriteHeader(http.StatusInternalServerError)
}
}
}
}
func writeCacheableJSONResponse(w http.ResponseWriter, req *http.Request, clientContext relayenv.EnvContext,
bytes []byte, etagValue string) {
etag := fmt.Sprintf("relay-%s", etagValue) // just to make it extra clear that these are relay-specific etags
if cachedEtag := req.Header.Get("If-None-Match"); cachedEtag != "" {
if cachedEtag == etag {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Etag", etag)
ttl := clientContext.GetTTL()
if ttl > 0 {
w.Header().Set("Vary", "Authorization")
expiresAt := time.Now().UTC().Add(ttl)
w.Header().Set("Expires", expiresAt.Format(http.TimeFormat))
// We're setting "Expires:" instead of "Cache-Control:max-age=" so that if someone puts an
// HTTP cache in front of ld-relay, multiple clients hitting the cache at different times
// will all see the same expiration time.
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(bytes)
}
func serializeFlagsAsMap(coll []ldstoretypes.KeyedItemDescriptor) []byte {
w := jwriter.NewWriter()
obj := w.Object()
for _, item := range coll {
if item.Item.Item != nil {
ldmodel.MarshalFeatureFlagToJSONWriter(*item.Item.Item.(*ldmodel.FeatureFlag), obj.Name(item.Key))
}
}
obj.End()
return w.Bytes()
}