From 9a6fa7fafc95f223f35cda9f76b41d69daf60a77 Mon Sep 17 00:00:00 2001 From: Tirumarai Selvan Date: Thu, 7 Feb 2019 18:07:28 +0530 Subject: [PATCH] add delivery info to event payload (close #1476) (#1517) Adds the following to the event payload: ``` "delivery_info": { "max_retries": 0, "current_retry": 0 } ``` --- server/src-lib/Hasura/Events/Lib.hs | 70 ++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/server/src-lib/Hasura/Events/Lib.hs b/server/src-lib/Hasura/Events/Lib.hs index 6313c317f08ce..f1c4fc4214f41 100644 --- a/server/src-lib/Hasura/Events/Lib.hs +++ b/server/src-lib/Hasura/Events/Lib.hs @@ -65,6 +65,14 @@ data TriggerMeta $(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''TriggerMeta) +data DeliveryInfo + = DeliveryInfo + { diCurrentRetry :: Int + , diMaxRetries :: Int + } deriving (Show, Eq) + +$(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''DeliveryInfo) + data Event = Event { eId :: EventId @@ -75,18 +83,29 @@ data Event , eCreatedAt :: Time.UTCTime } deriving (Show, Eq) -instance ToJSON Event where - toJSON (Event eid (QualifiedObject sn tn) trigger event _ created)= - object [ "id" .= eid - , "table" .= object [ "schema" .= sn - , "name" .= tn - ] - , "trigger" .= trigger - , "event" .= event - , "created_at" .= created +$(deriveFromJSON (aesonDrop 1 snakeCase){omitNothingFields=True} ''Event) + +newtype QualifiedTableStrict = QualifiedTableStrict + { getQualifiedTable :: QualifiedTable + } deriving (Show, Eq) + +instance ToJSON QualifiedTableStrict where + toJSON (QualifiedTableStrict (QualifiedObject sn tn)) = + object [ "schema" .= sn + , "name" .= tn ] -$(deriveFromJSON (aesonDrop 1 snakeCase){omitNothingFields=True} ''Event) +data EventPayload + = EventPayload + { epId :: EventId + , epTable :: QualifiedTableStrict + , epTrigger :: TriggerMeta + , epEvent :: Value + , epDeliveryInfo :: DeliveryInfo + , epCreatedAt :: Time.UTCTime + } deriving (Show, Eq) + +$(deriveToJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''EventPayload) data WebhookRequest = WebhookRequest @@ -266,6 +285,17 @@ tryWebhook logenv pool e = do eventId = eId e headerInfos = etiHeaders eti headers = map encodeHeader headerInfos + eventPayload = EventPayload + { epId = eId e + , epTable = QualifiedTableStrict { getQualifiedTable = eTable e} + , epTrigger = eTrigger e + , epEvent = eEvent e + , epDeliveryInfo = DeliveryInfo + { diCurrentRetry = eTries e + , diMaxRetries = rcNumRetries $ etiRetryConf eti + } + , epCreatedAt = eCreatedAt e + } eeCtx <- asks getter -- wait for counter and then increment beforing making http liftIO $ atomically $ do @@ -276,7 +306,7 @@ tryWebhook logenv pool e = do else modifyTVar' c (+1) let options = addHeaders headers W.defaults decodedHeaders = map (decodeHeader headerInfos) $ options CL.^. W.headers - eitherResp <- runExceptT $ runHTTP options (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON e)) (Just (ExtraContext createdAt eventId)) + eitherResp <- runExceptT $ runHTTP options (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON eventPayload)) (Just (ExtraContext createdAt eventId)) --decrement counter once http is done liftIO $ atomically $ do @@ -287,32 +317,32 @@ tryWebhook logenv pool e = do Left err -> case err of HClient excp -> let errMsg = TBS.fromLBS $ encode $ show excp - in runFailureQ pool $ mkInvo e 1000 decodedHeaders errMsg [] + in runFailureQ pool $ mkInvo eventPayload 1000 decodedHeaders errMsg [] HParse _ detail -> let errMsg = TBS.fromLBS $ encode detail - in runFailureQ pool $ mkInvo e 1001 decodedHeaders errMsg [] + in runFailureQ pool $ mkInvo eventPayload 1001 decodedHeaders errMsg [] HStatus errResp -> let respPayload = hrsBody errResp respHeaders = hrsHeaders errResp respStatus = hrsStatus errResp - in runFailureQ pool $ mkInvo e respStatus decodedHeaders respPayload respHeaders + in runFailureQ pool $ mkInvo eventPayload respStatus decodedHeaders respPayload respHeaders HOther detail -> let errMsg = (TBS.fromLBS $ encode detail) - in runFailureQ pool $ mkInvo e 500 decodedHeaders errMsg [] + in runFailureQ pool $ mkInvo eventPayload 500 decodedHeaders errMsg [] Right resp -> let respPayload = hrsBody resp respHeaders = hrsHeaders resp respStatus = hrsStatus resp - in runSuccessQ pool e $ mkInvo e respStatus decodedHeaders respPayload respHeaders + in runSuccessQ pool e $ mkInvo eventPayload respStatus decodedHeaders respPayload respHeaders case finally of Left err -> liftIO $ logger $ L.toEngineLog $ EventInternalErr err Right _ -> return () return eitherResp where - mkInvo :: Event -> Int -> [HeaderConf] -> TBS.TByteString -> [HeaderConf] -> Invocation - mkInvo e' status reqHeaders respBody respHeaders + mkInvo :: EventPayload -> Int -> [HeaderConf] -> TBS.TByteString -> [HeaderConf] -> Invocation + mkInvo ep status reqHeaders respBody respHeaders = let resp = if isInitError status then mkErr respBody else mkResp status respBody respHeaders in Invocation - (eId e') + (epId ep) status - (mkWebhookReq (toJSON e) reqHeaders) + (mkWebhookReq (toJSON ep) reqHeaders) resp addHeaders :: [(N.HeaderName, BS.ByteString)] -> W.Options -> W.Options addHeaders headers opts = foldl (\acc h -> acc CL.& W.header (fst h) CL..~ [snd h] ) opts headers