From 7079c70484dd66b53878339b4c773a26341e2a23 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Fri, 21 Apr 2023 21:07:38 +0400 Subject: [PATCH 1/5] xftp: fix permanent error reported as temporary from http client - logs --- src/Simplex/FileTransfer/Agent.hs | 3 +++ src/Simplex/Messaging/Transport/HTTP2/Client.hs | 15 +++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 11681d7b9..61ac492a7 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -390,6 +390,8 @@ runXFTPSndPrepareWorker c doWork = do else pure sndFile maxRecipients <- asks (xftpMaxRecipientsPerRequest . config) let numRecipients' = min numRecipients maxRecipients + liftIO $ print "finished encrypting" + threadDelay 30000000 -- concurrently? forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients' withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading @@ -463,6 +465,7 @@ runXFTPSndWorker c srv doWork = do `catchError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e where retryLoop loop e replicaDelay = do + liftIO $ print $ "replica " <> show sndChunkReplicaId <> " temporary error" flip catchError (\_ -> pure ()) $ do notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) when notifyOnRetry $ notify c sndFileEntityId $ SFERR e diff --git a/src/Simplex/Messaging/Transport/HTTP2/Client.hs b/src/Simplex/Messaging/Transport/HTTP2/Client.hs index 66d88f6c9..90b036b6d 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Client.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Client.hs @@ -108,16 +108,19 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien atomically $ do writeTVar (connected c) True putTMVar cVar (Right c') - process c' sendReq `E.finally` disconnected + process c' sendReq `E.finally` (putStrLn "process error" >> disconnected) process :: HTTP2Client -> H.Client HTTP2Response process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do (req, respVar) <- atomically $ readTBQueue reqQ - sendReq req $ \r -> do - respBody <- getHTTP2Body r bodyHeadSize - let resp = HTTP2Response {response = r, respBody} - atomically $ putTMVar respVar resp - pure resp + do + ( sendReq req $ \r -> do + respBody <- getHTTP2Body r bodyHeadSize + let resp = HTTP2Response {response = r, respBody} + atomically $ putTMVar respVar resp + pure resp + ) + `E.finally` print "sendReq error" -- | Disconnects client from the server and terminates client threads. closeHTTP2Client :: HTTP2Client -> IO () From 9edefb5a5638adffb1b486c757e848b3d9962969 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Mon, 24 Apr 2023 16:11:51 +0400 Subject: [PATCH 2/5] return error from request --- src/Simplex/FileTransfer/Agent.hs | 2 - src/Simplex/FileTransfer/Client.hs | 21 ++++---- src/Simplex/Messaging/Agent/Client.hs | 1 + src/Simplex/Messaging/Client.hs | 2 + src/Simplex/Messaging/Notifications/Server.hs | 1 + .../Notifications/Server/Push/APNS.hs | 13 +++-- .../Messaging/Transport/HTTP2/Client.hs | 48 +++++++++++-------- 7 files changed, 51 insertions(+), 37 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 61ac492a7..d78511d78 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -390,8 +390,6 @@ runXFTPSndPrepareWorker c doWork = do else pure sndFile maxRecipients <- asks (xftpMaxRecipientsPerRequest . config) let numRecipients' = min numRecipients maxRecipients - liftIO $ print "finished encrypting" - threadDelay 30000000 -- concurrently? forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients' withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index fa93776d4..e54c6e2f0 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -135,15 +135,18 @@ sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd) let req = H.requestStreaming N.methodPost "/" [] $ streamBody t reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_ - HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout - when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK - -- TODO validate that the file ID is the same as in the request? - (_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead - case respOrErr of - Right r -> case protocolError r of - Just e -> throwError $ PCEProtocolError e - _ -> pure (r, body) - Left e -> throwError $ PCEResponseError e + res <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout + case res of + HTTP2RequestResponse HTTP2Response {respBody = body@HTTP2Body {bodyHead}} -> do + when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK + -- TODO validate that the file ID is the same as in the request? + (_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead + case respOrErr of + Right r -> case protocolError r of + Just e -> throwError $ PCEProtocolError e + _ -> pure (r, body) + Left e -> throwError $ PCEResponseError e + HTTP2RequestError e -> throwError $ PCEInternalError $ show e where streamBody :: ByteString -> (Builder -> IO ()) -> IO () -> IO () streamBody t send done = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index e2d650bd6..cb15f3d8b 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -702,6 +702,7 @@ protocolClientError protocolError_ host = \case PCETransportError e -> BROKER host $ TRANSPORT e e@PCECryptoError {} -> INTERNAL $ show e PCEIOError {} -> BROKER host NETWORK + PCEInternalError e -> INTERNAL e data ProtocolTestStep = TSConnect diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 02fcbfc2d..1e6663f1d 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -417,6 +417,8 @@ data ProtocolClientError err PCECryptoError C.CryptoError | -- | IO Error PCEIOError IOException + | -- | Internal error + PCEInternalError String deriving (Eq, Show, Exception) type SMPClientError = ProtocolClientError ErrorType diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index ee5b3f508..e69e6cbc6 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -233,6 +233,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth PCEProtocolError e -> updateErr "SMP error " e PCEIOError e -> updateErr "IOError " e + PCEInternalError e -> updateErr "InternalError " e PCEResponseError e -> updateErr "ResponseError " e PCEUnexpectedResponse r -> updateErr "UnexpectedResponse " r PCETransportError e -> updateErr "TransportError " e diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 0b4b0f572..188ecc722 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -344,11 +344,14 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn req <- liftIO $ apnsRequest c tknStr apnsNtf -- TODO when HTTP2 client is thread-safe, we can use sendRequestDirect - HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing - let status = H.responseStatus response - reason' = maybe "" reason $ J.decodeStrict' bodyHead - logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason' - result status reason' + res <- liftHTTPS2 $ sendRequest http2 req Nothing + case res of + HTTP2RequestResponse HTTP2Response {response, respBody = HTTP2Body {bodyHead}} -> do + let status = H.responseStatus response + reason' = maybe "" reason $ J.decodeStrict' bodyHead + logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason' + result status reason' + HTTP2RequestError _e -> throwError PPPermanentError where result :: Maybe Status -> Text -> ExceptT PushProviderError IO () result status reason' diff --git a/src/Simplex/Messaging/Transport/HTTP2/Client.hs b/src/Simplex/Messaging/Transport/HTTP2/Client.hs index 90b036b6d..c7245d534 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Client.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Client.hs @@ -29,10 +29,10 @@ import UnliftIO.STM import UnliftIO.Timeout data HTTP2Client = HTTP2Client - { action :: Maybe (Async HTTP2Response), + { action :: Maybe (Async HTTP2RequestResult), sessionId :: SessionId, sessionTs :: UTCTime, - sendReq :: Request -> (Response -> IO HTTP2Response) -> IO HTTP2Response, + sendReq :: Request -> (Response -> IO HTTP2RequestResult) -> IO HTTP2RequestResult, client_ :: HClient } @@ -42,9 +42,13 @@ data HClient = HClient host :: TransportHost, port :: ServiceName, config :: HTTP2ClientConfig, - reqQ :: TBQueue (Request, TMVar HTTP2Response) + reqQ :: TBQueue (Request, TMVar HTTP2RequestResult) } +data HTTP2RequestResult + = HTTP2RequestResponse HTTP2Response + | HTTP2RequestError E.SomeException + data HTTP2Response = HTTP2Response { response :: Response, respBody :: HTTP2Body @@ -101,40 +105,42 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien Just (Left e) -> Left e Nothing -> Left HCNetworkError - client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2Response + client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2RequestResult client c cVar sessionId sendReq = do sessionTs <- getCurrentTime let c' = HTTP2Client {action = Nothing, client_ = c, sendReq, sessionId, sessionTs} atomically $ do writeTVar (connected c) True putTMVar cVar (Right c') - process c' sendReq `E.finally` (putStrLn "process error" >> disconnected) + process c' sendReq `E.finally` disconnected - process :: HTTP2Client -> H.Client HTTP2Response + process :: HTTP2Client -> H.Client HTTP2RequestResult process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do - (req, respVar) <- atomically $ readTBQueue reqQ - do - ( sendReq req $ \r -> do - respBody <- getHTTP2Body r bodyHeadSize - let resp = HTTP2Response {response = r, respBody} - atomically $ putTMVar respVar resp - pure resp - ) - `E.finally` print "sendReq error" + (req, resVar) <- atomically $ readTBQueue reqQ + sendReq req (processResp resVar) `E.catch` \e -> do + let res = HTTP2RequestError e + atomically $ putTMVar resVar res + pure res + where + processResp resVar r = do + respBody <- getHTTP2Body r bodyHeadSize + let res = HTTP2RequestResponse $ HTTP2Response {response = r, respBody} + atomically $ putTMVar resVar res + pure res -- | Disconnects client from the server and terminates client threads. closeHTTP2Client :: HTTP2Client -> IO () closeHTTP2Client = mapM_ uninterruptibleCancel . action -sendRequest :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2Response) +sendRequest :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2RequestResult) sendRequest HTTP2Client {client_ = HClient {config, reqQ}} req reqTimeout_ = do - resp <- newEmptyTMVarIO - atomically $ writeTBQueue reqQ (req, resp) + resVar <- newEmptyTMVarIO + atomically $ writeTBQueue reqQ (req, resVar) let reqTimeout = http2RequestTimeout config reqTimeout_ - maybe (Left HCResponseTimeout) Right <$> (reqTimeout `timeout` atomically (takeTMVar resp)) + maybe (Left HCResponseTimeout) Right <$> (reqTimeout `timeout` atomically (takeTMVar resVar)) -- | this function should not be used until HTTP2 is thread safe, use sendRequest -sendRequestDirect :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2Response) +sendRequestDirect :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2RequestResult) sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq} req reqTimeout_ = do let reqTimeout = http2RequestTimeout config reqTimeout_ reqTimeout `timeout` try (sendReq req process) >>= \case @@ -144,7 +150,7 @@ sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq where process r = do respBody <- getHTTP2Body r $ bodyHeadSize config - pure HTTP2Response {response = r, respBody} + pure $ HTTP2RequestResponse $ HTTP2Response {response = r, respBody} http2RequestTimeout :: HTTP2ClientConfig -> Maybe Int -> Int http2RequestTimeout HTTP2ClientConfig {connTimeout} = maybe connTimeout (connTimeout +) From 4a64061aab3ad0aefb342f0806c71b3fd08c23b2 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Mon, 24 Apr 2023 18:42:30 +0400 Subject: [PATCH 3/5] wip - logs --- src/Simplex/FileTransfer/Agent.hs | 8 ++++++-- src/Simplex/FileTransfer/Client.hs | 4 +++- src/Simplex/Messaging/Transport/HTTP2/Client.hs | 1 + 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index d78511d78..b2417aa8c 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -390,6 +390,8 @@ runXFTPSndPrepareWorker c doWork = do else pure sndFile maxRecipients <- asks (xftpMaxRecipientsPerRequest . config) let numRecipients' = min numRecipients maxRecipients + liftIO $ print "finished encrypting" + threadDelay 10000000 -- concurrently? forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients' withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading @@ -463,7 +465,7 @@ runXFTPSndWorker c srv doWork = do `catchError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e where retryLoop loop e replicaDelay = do - liftIO $ print $ "replica " <> show sndChunkReplicaId <> " temporary error" + liftIO $ print $ "replica " <> show sndChunkReplicaId <> " temporary error" <> show e flip catchError (\_ -> pure ()) $ do notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) when notifyOnRetry $ notify c sndFileEntityId $ SFERR e @@ -471,7 +473,9 @@ runXFTPSndWorker c srv doWork = do withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay atomically $ assertAgentForeground c loop - retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e) + retryDone e = do + liftIO $ print $ "replica " <> show sndChunkReplicaId <> " permanent error" <> show e + sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e) uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m () uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index e54c6e2f0..8d244e9a7 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -146,7 +146,9 @@ sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} Just e -> throwError $ PCEProtocolError e _ -> pure (r, body) Left e -> throwError $ PCEResponseError e - HTTP2RequestError e -> throwError $ PCEInternalError $ show e + HTTP2RequestError e -> do + liftIO $ print $ "in sendXFTPCommand HTTP2RequestError" <> show e + throwError $ PCEInternalError $ show e where streamBody :: ByteString -> (Builder -> IO ()) -> IO () -> IO () streamBody t send done = do diff --git a/src/Simplex/Messaging/Transport/HTTP2/Client.hs b/src/Simplex/Messaging/Transport/HTTP2/Client.hs index c7245d534..e572f6d10 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Client.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Client.hs @@ -118,6 +118,7 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do (req, resVar) <- atomically $ readTBQueue reqQ sendReq req (processResp resVar) `E.catch` \e -> do + liftIO $ print $ "in process catch e: " <> show e let res = HTTP2RequestError e atomically $ putTMVar resVar res pure res From 50ee2df48deda2b37fcce550654f9285a6b26567 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Tue, 25 Apr 2023 13:16:15 +0400 Subject: [PATCH 4/5] catch in streamBody --- src/Simplex/FileTransfer/Client.hs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index 8d244e9a7..8d1ca7938 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -9,6 +9,7 @@ module Simplex.FileTransfer.Client where +import qualified Control.Exception as E import Control.Monad.Except import Data.Bifunctor (first) import Data.ByteString.Builder (Builder, byteString) @@ -154,9 +155,14 @@ sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} streamBody t send done = do send $ byteString t forM_ chunkSpec_ $ \XFTPChunkSpec {filePath, chunkOffset, chunkSize} -> - withFile filePath ReadMode $ \h -> do - hSeek h AbsoluteSeek $ fromIntegral chunkOffset - sendFile h send $ fromIntegral chunkSize + do + ( withFile filePath ReadMode $ \h -> do + hSeek h AbsoluteSeek $ fromIntegral chunkOffset + sendFile h send $ fromIntegral chunkSize + ) + `E.catch` \(e :: SomeException) -> do + print $ "in sendXFTPCommand streamBody " <> show e + E.throw e done createXFTPChunk :: From df1786213c1e1b2b6f30aab56dac21407c0bf048 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Wed, 26 Apr 2023 13:51:47 +0400 Subject: [PATCH 5/5] catch in withHTTP2 --- src/Simplex/Messaging/Transport/HTTP2.hs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/Simplex/Messaging/Transport/HTTP2.hs b/src/Simplex/Messaging/Transport/HTTP2.hs index f258f9dc9..e85cc95ff 100644 --- a/src/Simplex/Messaging/Transport/HTTP2.hs +++ b/src/Simplex/Messaging/Transport/HTTP2.hs @@ -1,4 +1,5 @@ {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} module Simplex.Messaging.Transport.HTTP2 where @@ -23,7 +24,16 @@ defaultHTTP2BufferSize :: BufferSize defaultHTTP2BufferSize = 32768 withHTTP2 :: BufferSize -> (Config -> SessionId -> IO a) -> TLS -> IO a -withHTTP2 sz run c = E.bracket (allocHTTP2Config c sz) freeSimpleConfig (`run` tlsUniq c) +withHTTP2 sz run c = + E.bracket + (allocHTTP2Config c sz) + freeSimpleConfig + ( \cfg -> + run cfg (tlsUniq c) `E.catch` \(e :: E.SomeException) -> + do + print $ "withHTTP2 e: " <> show e + E.throwIO e + ) allocHTTP2Config :: TLS -> BufferSize -> IO Config allocHTTP2Config c sz = do