-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathRuntimeClient.hs
262 lines (222 loc) · 11 KB
/
RuntimeClient.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
{-|
Module : AWS.Lambda.RuntimeClient
Description : HTTP related machinery for talking to the AWS Lambda Custom Runtime interface.
Copyright : (c) Nike, Inc., 2018
License : BSD3
Maintainer : nathan.fairhurst@nike.com, fernando.freire@nike.com
Stability : stable
-}
module AWS.Lambda.RuntimeClient (
RuntimeClientConfig,
getRuntimeClientConfig,
getNextData,
getNextEvent,
sendEventSuccess,
sendEventError,
) where
import AWS.Lambda.Context (LambdaContext)
import AWS.Lambda.Internal (StaticContext, getStaticContext)
import AWS.Lambda.RuntimeClient.Internal (eventResponseToNextData)
import Control.Applicative ((<*>))
import Control.Concurrent (threadDelay)
import Control.Exception (IOException, displayException,
throw, try)
import Control.Monad (unless)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson (Value, encode)
import Data.Aeson.Parser (value')
import Data.Aeson.Types (ToJSON)
import Data.Bifunctor (first)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSW
import Data.Conduit (ConduitM, runConduit, yield,
(.|))
import Data.Conduit.Attoparsec (sinkParser)
import Data.Semigroup ((<>))
import GHC.Generics (Generic (..))
import Network.HTTP.Client (BodyReader, HttpException,
Manager, Request,
RequestBody (RequestBodyLBS),
Response, brRead,
defaultManagerSettings,
httpNoBody,
managerConnCount,
managerIdleConnectionCount,
managerResponseTimeout,
managerSetProxy, method,
newManager, noProxy,
parseRequest, path,
requestBody, requestHeaders,
responseBody,
responseStatus,
responseTimeoutNone,
setRequestCheckStatus,
withResponse)
import Network.HTTP.Types (HeaderName)
import Network.HTTP.Types.Status (Status, status403,
status413,
statusIsSuccessful)
import System.Environment (getEnv)
-- | Lambda runtime error that we pass back to AWS
data LambdaError = LambdaError
{ errorMessage :: String,
errorType :: String,
stackTrace :: [String]
} deriving (Show, Generic)
instance ToJSON LambdaError
data RuntimeClientConfig = RuntimeClientConfig Request Manager StaticContext
-- Exposed Handlers
-- TODO: It would be interesting if we could make the interface a sort of
-- "chained" callback API. So instead of getting back a base request to kick
-- things off we get a 'getNextEvent' handler and then the 'getNextEvent'
-- handler returns both the 'success' and 'error' handlers. So things like
-- baseRequest and reqId are pre-injected.
getRuntimeClientConfig :: IO RuntimeClientConfig
getRuntimeClientConfig = do
awsLambdaRuntimeApi <- getEnv "AWS_LAMBDA_RUNTIME_API"
req <- parseRequest $ "http://" ++ awsLambdaRuntimeApi
man <- newManager
-- In the off chance that they set a proxy value, we don't want to
-- use it. There's also no reason to spend time reading env vars.
$ managerSetProxy noProxy
$ defaultManagerSettings
-- This is the most important setting, we must not timeout requests
{ managerResponseTimeout = responseTimeoutNone
-- We only ever need a single connection, because we'll never make
-- concurrent requests and never talk to more than one host.
, managerConnCount = 1
, managerIdleConnectionCount = 1
}
possibleStaticCtx <-
first (displayException :: IOException -> String) <$> try getStaticContext
case possibleStaticCtx of
Left err -> do
liftIO $ sendInitError req man err
error err
Right staticCtx -> return $ RuntimeClientConfig req man staticCtx
getNextData :: RuntimeClientConfig -> IO (BS.ByteString, Value, Either String LambdaContext)
getNextData runtimeClientConfig@(RuntimeClientConfig _ _ staticContext) =
eventResponseToNextData staticContext <$> getNextEvent runtimeClientConfig
-- AWS lambda guarantees that we will get valid JSON,
-- so parsing is guaranteed to succeed.
getNextEvent :: RuntimeClientConfig -> IO (Response Value)
getNextEvent (RuntimeClientConfig baseRuntimeRequest manager _) = do
resOrEx <- runtimeClientRetryTry $ flip httpValue manager $ toNextEventRequest baseRuntimeRequest
let checkStatus res = if not $ statusIsSuccessful $ getResponseStatus res then
Left "Unexpected Runtime Error: Could not retrieve next event."
else
Right res
let resOrMsg = first (displayException :: HttpException -> String) resOrEx >>= checkStatus
case resOrMsg of
Left msg -> do
_ <- sendInitError baseRuntimeRequest manager msg
error msg
Right y -> return y
sendEventSuccess :: ToJSON a => RuntimeClientConfig -> BS.ByteString -> a -> IO ()
sendEventSuccess rcc@(RuntimeClientConfig baseRuntimeRequest manager _) reqId json = do
resOrEx <- runtimeClientRetryTry $ flip httpNoBody manager $ toEventSuccessRequest reqId json baseRuntimeRequest
let resOrTypedMsg = case resOrEx of
Left ex ->
-- aka NonRecoverable
Left $ Left $ displayException (ex :: HttpException)
Right res ->
if getResponseStatus res == status413 then
-- TODO Get the real error info from the response
-- aka Recoverable
Left (Right "Payload Too Large")
else if not $ statusIsSuccessful $ getResponseStatus res then
--aka NonRecoverable
Left (Left "Unexpected Runtime Error: Could not post handler result.")
else
--aka Success
Right ()
case resOrTypedMsg of
Left (Left msg) ->
-- If an exception occurs here, we want that to propogate
sendEventError rcc reqId msg
Left (Right msg) -> error msg
Right () -> return ()
sendEventError :: RuntimeClientConfig -> BS.ByteString -> String -> IO ()
sendEventError (RuntimeClientConfig baseRuntimeRequest manager _) reqId e =
fmap (const ()) $ runtimeClientRetry $ flip httpNoBody manager $ toEventErrorRequest reqId e baseRuntimeRequest
sendInitError :: Request -> Manager -> String -> IO ()
sendInitError baseRuntimeRequest manager e =
fmap (const ()) $ runtimeClientRetry $ flip httpNoBody manager $ toInitErrorRequest e baseRuntimeRequest
-- Helpers for Requests with JSON Bodies
httpValue :: Request -> Manager -> IO (Response Value)
httpValue request manager =
withResponse request manager (\bodyReaderRes -> do
value <- runConduit $ bodyReaderSource (responseBody bodyReaderRes) .| sinkParser value'
return $ fmap (const value) bodyReaderRes
)
bodyReaderSource :: MonadIO m
=> BodyReader
-> ConduitM i BS.ByteString m ()
bodyReaderSource br =
loop
where
loop = do
bs <- liftIO $ brRead br
unless (BS.null bs) $ do
yield bs
loop
-- Retry Helpers
runtimeClientRetryTry' :: Int -> Int -> IO (Response a) -> IO (Either HttpException (Response a))
runtimeClientRetryTry' retries maxRetries f
| retries == maxRetries = try f
| otherwise = do
resOrEx <- try f
let retry =
threadDelay (500 * 2 ^ retries)
>> runtimeClientRetryTry' (retries + 1) maxRetries f
case resOrEx of
Left (_ :: HttpException) -> retry
Right res ->
-- TODO: Explore this further.
-- Before ~July 22nd 2020 it seemed that if a next event request reached
-- the runtime before a new event was available that there would be a
-- network error. After it appears that a 403 is returned.
if getResponseStatus res == status403 then retry
else return $ Right res
runtimeClientRetryTry :: IO (Response a) -> IO (Either HttpException (Response a))
runtimeClientRetryTry = runtimeClientRetryTry' 0 10
runtimeClientRetry :: IO (Response a) -> IO (Response a)
runtimeClientRetry = fmap (either throw id) . runtimeClientRetryTry
-- Request Transformers
toNextEventRequest :: Request -> Request
toNextEventRequest = setRequestPath "2018-06-01/runtime/invocation/next"
toEventSuccessRequest :: ToJSON a => BS.ByteString -> a -> Request -> Request
toEventSuccessRequest reqId json =
setRequestBodyJSON json .
setRequestMethod "POST" .
setRequestPath (BS.concat ["2018-06-01/runtime/invocation/", reqId, "/response"])
toBaseErrorRequest :: String -> Request -> Request
toBaseErrorRequest e =
setRequestBodyLBS (encode (LambdaError { errorMessage = e, stackTrace = [], errorType = "User"}))
. setRequestHeader "Content-Type" ["application/vnd.aws.lambda.error+json"]
. setRequestMethod "POST"
. setRequestCheckStatus
toEventErrorRequest :: BS.ByteString -> String -> Request -> Request
toEventErrorRequest reqId e =
setRequestPath (BS.concat ["2018-06-01/runtime/invocation/", reqId, "/error"]) . toBaseErrorRequest e
toInitErrorRequest :: String -> Request -> Request
toInitErrorRequest e =
setRequestPath "2018-06-01/runtime/init/error" . toBaseErrorRequest e
-- HTTP Client Type Helpers
getResponseStatus :: Response a -> Status
getResponseStatus = responseStatus
setRequestBodyJSON :: ToJSON a => a -> Request -> Request
setRequestBodyJSON = setRequestBodyLBS . encode
setRequestBodyLBS :: BSW.ByteString -> Request -> Request
setRequestBodyLBS body req = req { requestBody = RequestBodyLBS body }
setRequestHeader :: HeaderName -> [BS.ByteString] -> Request -> Request
setRequestHeader headerName values req =
let
withoutPrevious = filter ((/=) headerName . fst) $ requestHeaders req
withNew = fmap ((,) headerName) values <> withoutPrevious
in
req { requestHeaders = withNew }
setRequestMethod :: BS.ByteString -> Request -> Request
setRequestMethod m req = req { method = m }
setRequestPath :: BS.ByteString -> Request -> Request
setRequestPath p req = req { path = p }