Skip to content

Commit

Permalink
add delivery info to event payload (close #1476) (#1517)
Browse files Browse the repository at this point in the history
Adds the following to the event payload:
```
    "delivery_info": {
        "max_retries": 0,
        "current_retry": 0
    }
```
  • Loading branch information
tirumaraiselvan authored and shahidhk committed Feb 7, 2019
1 parent 572a563 commit 9a6fa7f
Showing 1 changed file with 50 additions and 20 deletions.
70 changes: 50 additions & 20 deletions server/src-lib/Hasura/Events/Lib.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ data TriggerMeta

$(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''TriggerMeta)

data DeliveryInfo
= DeliveryInfo
{ diCurrentRetry :: Int
, diMaxRetries :: Int
} deriving (Show, Eq)

$(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''DeliveryInfo)

data Event
= Event
{ eId :: EventId
Expand All @@ -75,18 +83,29 @@ data Event
, eCreatedAt :: Time.UTCTime
} deriving (Show, Eq)

instance ToJSON Event where
toJSON (Event eid (QualifiedObject sn tn) trigger event _ created)=
object [ "id" .= eid
, "table" .= object [ "schema" .= sn
, "name" .= tn
]
, "trigger" .= trigger
, "event" .= event
, "created_at" .= created
$(deriveFromJSON (aesonDrop 1 snakeCase){omitNothingFields=True} ''Event)

newtype QualifiedTableStrict = QualifiedTableStrict
{ getQualifiedTable :: QualifiedTable
} deriving (Show, Eq)

instance ToJSON QualifiedTableStrict where
toJSON (QualifiedTableStrict (QualifiedObject sn tn)) =
object [ "schema" .= sn
, "name" .= tn
]

$(deriveFromJSON (aesonDrop 1 snakeCase){omitNothingFields=True} ''Event)
data EventPayload
= EventPayload
{ epId :: EventId
, epTable :: QualifiedTableStrict
, epTrigger :: TriggerMeta
, epEvent :: Value
, epDeliveryInfo :: DeliveryInfo
, epCreatedAt :: Time.UTCTime
} deriving (Show, Eq)

$(deriveToJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''EventPayload)

data WebhookRequest
= WebhookRequest
Expand Down Expand Up @@ -266,6 +285,17 @@ tryWebhook logenv pool e = do
eventId = eId e
headerInfos = etiHeaders eti
headers = map encodeHeader headerInfos
eventPayload = EventPayload
{ epId = eId e
, epTable = QualifiedTableStrict { getQualifiedTable = eTable e}
, epTrigger = eTrigger e
, epEvent = eEvent e
, epDeliveryInfo = DeliveryInfo
{ diCurrentRetry = eTries e
, diMaxRetries = rcNumRetries $ etiRetryConf eti
}
, epCreatedAt = eCreatedAt e
}
eeCtx <- asks getter
-- wait for counter and then increment beforing making http
liftIO $ atomically $ do
Expand All @@ -276,7 +306,7 @@ tryWebhook logenv pool e = do
else modifyTVar' c (+1)
let options = addHeaders headers W.defaults
decodedHeaders = map (decodeHeader headerInfos) $ options CL.^. W.headers
eitherResp <- runExceptT $ runHTTP options (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON e)) (Just (ExtraContext createdAt eventId))
eitherResp <- runExceptT $ runHTTP options (mkAnyHTTPPost (T.unpack webhook) (Just $ toJSON eventPayload)) (Just (ExtraContext createdAt eventId))

--decrement counter once http is done
liftIO $ atomically $ do
Expand All @@ -287,32 +317,32 @@ tryWebhook logenv pool e = do
Left err ->
case err of
HClient excp -> let errMsg = TBS.fromLBS $ encode $ show excp
in runFailureQ pool $ mkInvo e 1000 decodedHeaders errMsg []
in runFailureQ pool $ mkInvo eventPayload 1000 decodedHeaders errMsg []
HParse _ detail -> let errMsg = TBS.fromLBS $ encode detail
in runFailureQ pool $ mkInvo e 1001 decodedHeaders errMsg []
in runFailureQ pool $ mkInvo eventPayload 1001 decodedHeaders errMsg []
HStatus errResp -> let respPayload = hrsBody errResp
respHeaders = hrsHeaders errResp
respStatus = hrsStatus errResp
in runFailureQ pool $ mkInvo e respStatus decodedHeaders respPayload respHeaders
in runFailureQ pool $ mkInvo eventPayload respStatus decodedHeaders respPayload respHeaders
HOther detail -> let errMsg = (TBS.fromLBS $ encode detail)
in runFailureQ pool $ mkInvo e 500 decodedHeaders errMsg []
in runFailureQ pool $ mkInvo eventPayload 500 decodedHeaders errMsg []
Right resp -> let respPayload = hrsBody resp
respHeaders = hrsHeaders resp
respStatus = hrsStatus resp
in runSuccessQ pool e $ mkInvo e respStatus decodedHeaders respPayload respHeaders
in runSuccessQ pool e $ mkInvo eventPayload respStatus decodedHeaders respPayload respHeaders
case finally of
Left err -> liftIO $ logger $ L.toEngineLog $ EventInternalErr err
Right _ -> return ()
return eitherResp
where
mkInvo :: Event -> Int -> [HeaderConf] -> TBS.TByteString -> [HeaderConf] -> Invocation
mkInvo e' status reqHeaders respBody respHeaders
mkInvo :: EventPayload -> Int -> [HeaderConf] -> TBS.TByteString -> [HeaderConf] -> Invocation
mkInvo ep status reqHeaders respBody respHeaders
= let resp = if isInitError status then mkErr respBody else mkResp status respBody respHeaders
in
Invocation
(eId e')
(epId ep)
status
(mkWebhookReq (toJSON e) reqHeaders)
(mkWebhookReq (toJSON ep) reqHeaders)
resp
addHeaders :: [(N.HeaderName, BS.ByteString)] -> W.Options -> W.Options
addHeaders headers opts = foldl (\acc h -> acc CL.& W.header (fst h) CL..~ [snd h] ) opts headers
Expand Down

0 comments on commit 9a6fa7f

Please sign in to comment.