diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 36940fce4..e02a1b007 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -390,6 +390,8 @@ runXFTPSndPrepareWorker c doWork = do else pure sndFile maxRecipients <- asks (xftpMaxRecipientsPerRequest . config) let numRecipients' = min numRecipients maxRecipients + liftIO $ print "finished encrypting" + threadDelay 10000000 -- concurrently? forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients' withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading @@ -463,6 +465,7 @@ runXFTPSndWorker c srv doWork = do `catchError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e where retryLoop loop e replicaDelay = do + liftIO $ print $ "replica " <> show sndChunkReplicaId <> " temporary error" <> show e flip catchError (\_ -> pure ()) $ do notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) when notifyOnRetry $ notify c sndFileEntityId $ SFERR e @@ -470,7 +473,9 @@ runXFTPSndWorker c srv doWork = do withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay atomically $ assertAgentForeground c loop - retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e) + retryDone e = do + liftIO $ print $ "replica " <> show sndChunkReplicaId <> " permanent error" <> show e + sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e) uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m () uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index fa93776d4..8d1ca7938 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -9,6 +9,7 @@ module Simplex.FileTransfer.Client where +import qualified Control.Exception as E import Control.Monad.Except import Data.Bifunctor (first) import Data.ByteString.Builder (Builder, byteString) @@ -135,23 +136,33 @@ sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd) let req = H.requestStreaming N.methodPost "/" [] $ streamBody t reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_ - HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout - when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK - -- TODO validate that the file ID is the same as in the request? - (_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead - case respOrErr of - Right r -> case protocolError r of - Just e -> throwError $ PCEProtocolError e - _ -> pure (r, body) - Left e -> throwError $ PCEResponseError e + res <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout + case res of + HTTP2RequestResponse HTTP2Response {respBody = body@HTTP2Body {bodyHead}} -> do + when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK + -- TODO validate that the file ID is the same as in the request? + (_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead + case respOrErr of + Right r -> case protocolError r of + Just e -> throwError $ PCEProtocolError e + _ -> pure (r, body) + Left e -> throwError $ PCEResponseError e + HTTP2RequestError e -> do + liftIO $ print $ "in sendXFTPCommand HTTP2RequestError" <> show e + throwError $ PCEInternalError $ show e where streamBody :: ByteString -> (Builder -> IO ()) -> IO () -> IO () streamBody t send done = do send $ byteString t forM_ chunkSpec_ $ \XFTPChunkSpec {filePath, chunkOffset, chunkSize} -> - withFile filePath ReadMode $ \h -> do - hSeek h AbsoluteSeek $ fromIntegral chunkOffset - sendFile h send $ fromIntegral chunkSize + do + ( withFile filePath ReadMode $ \h -> do + hSeek h AbsoluteSeek $ fromIntegral chunkOffset + sendFile h send $ fromIntegral chunkSize + ) + `E.catch` \(e :: SomeException) -> do + print $ "in sendXFTPCommand streamBody " <> show e + E.throw e done createXFTPChunk :: diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index e2d650bd6..cb15f3d8b 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -702,6 +702,7 @@ protocolClientError protocolError_ host = \case PCETransportError e -> BROKER host $ TRANSPORT e e@PCECryptoError {} -> INTERNAL $ show e PCEIOError {} -> BROKER host NETWORK + PCEInternalError e -> INTERNAL e data ProtocolTestStep = TSConnect diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 02fcbfc2d..1e6663f1d 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -417,6 +417,8 @@ data ProtocolClientError err PCECryptoError C.CryptoError | -- | IO Error PCEIOError IOException + | -- | Internal error + PCEInternalError String deriving (Eq, Show, Exception) type SMPClientError = ProtocolClientError ErrorType diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index ee5b3f508..e69e6cbc6 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -233,6 +233,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth PCEProtocolError e -> updateErr "SMP error " e PCEIOError e -> updateErr "IOError " e + PCEInternalError e -> updateErr "InternalError " e PCEResponseError e -> updateErr "ResponseError " e PCEUnexpectedResponse r -> updateErr "UnexpectedResponse " r PCETransportError e -> updateErr "TransportError " e diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 0b4b0f572..188ecc722 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -344,11 +344,14 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn req <- liftIO $ apnsRequest c tknStr apnsNtf -- TODO when HTTP2 client is thread-safe, we can use sendRequestDirect - HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing - let status = H.responseStatus response - reason' = maybe "" reason $ J.decodeStrict' bodyHead - logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason' - result status reason' + res <- liftHTTPS2 $ sendRequest http2 req Nothing + case res of + HTTP2RequestResponse HTTP2Response {response, respBody = HTTP2Body {bodyHead}} -> do + let status = H.responseStatus response + reason' = maybe "" reason $ J.decodeStrict' bodyHead + logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason' + result status reason' + HTTP2RequestError _e -> throwError PPPermanentError where result :: Maybe Status -> Text -> ExceptT PushProviderError IO () result status reason' diff --git a/src/Simplex/Messaging/Transport/HTTP2.hs b/src/Simplex/Messaging/Transport/HTTP2.hs index f258f9dc9..e85cc95ff 100644 --- a/src/Simplex/Messaging/Transport/HTTP2.hs +++ b/src/Simplex/Messaging/Transport/HTTP2.hs @@ -1,4 +1,5 @@ {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} module Simplex.Messaging.Transport.HTTP2 where @@ -23,7 +24,16 @@ defaultHTTP2BufferSize :: BufferSize defaultHTTP2BufferSize = 32768 withHTTP2 :: BufferSize -> (Config -> SessionId -> IO a) -> TLS -> IO a -withHTTP2 sz run c = E.bracket (allocHTTP2Config c sz) freeSimpleConfig (`run` tlsUniq c) +withHTTP2 sz run c = + E.bracket + (allocHTTP2Config c sz) + freeSimpleConfig + ( \cfg -> + run cfg (tlsUniq c) `E.catch` \(e :: E.SomeException) -> + do + print $ "withHTTP2 e: " <> show e + E.throwIO e + ) allocHTTP2Config :: TLS -> BufferSize -> IO Config allocHTTP2Config c sz = do diff --git a/src/Simplex/Messaging/Transport/HTTP2/Client.hs b/src/Simplex/Messaging/Transport/HTTP2/Client.hs index 66d88f6c9..e572f6d10 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Client.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Client.hs @@ -29,10 +29,10 @@ import UnliftIO.STM import UnliftIO.Timeout data HTTP2Client = HTTP2Client - { action :: Maybe (Async HTTP2Response), + { action :: Maybe (Async HTTP2RequestResult), sessionId :: SessionId, sessionTs :: UTCTime, - sendReq :: Request -> (Response -> IO HTTP2Response) -> IO HTTP2Response, + sendReq :: Request -> (Response -> IO HTTP2RequestResult) -> IO HTTP2RequestResult, client_ :: HClient } @@ -42,9 +42,13 @@ data HClient = HClient host :: TransportHost, port :: ServiceName, config :: HTTP2ClientConfig, - reqQ :: TBQueue (Request, TMVar HTTP2Response) + reqQ :: TBQueue (Request, TMVar HTTP2RequestResult) } +data HTTP2RequestResult + = HTTP2RequestResponse HTTP2Response + | HTTP2RequestError E.SomeException + data HTTP2Response = HTTP2Response { response :: Response, respBody :: HTTP2Body @@ -101,7 +105,7 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien Just (Left e) -> Left e Nothing -> Left HCNetworkError - client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2Response + client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2RequestResult client c cVar sessionId sendReq = do sessionTs <- getCurrentTime let c' = HTTP2Client {action = Nothing, client_ = c, sendReq, sessionId, sessionTs} @@ -110,28 +114,34 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien putTMVar cVar (Right c') process c' sendReq `E.finally` disconnected - process :: HTTP2Client -> H.Client HTTP2Response + process :: HTTP2Client -> H.Client HTTP2RequestResult process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do - (req, respVar) <- atomically $ readTBQueue reqQ - sendReq req $ \r -> do - respBody <- getHTTP2Body r bodyHeadSize - let resp = HTTP2Response {response = r, respBody} - atomically $ putTMVar respVar resp - pure resp + (req, resVar) <- atomically $ readTBQueue reqQ + sendReq req (processResp resVar) `E.catch` \e -> do + liftIO $ print $ "in process catch e: " <> show e + let res = HTTP2RequestError e + atomically $ putTMVar resVar res + pure res + where + processResp resVar r = do + respBody <- getHTTP2Body r bodyHeadSize + let res = HTTP2RequestResponse $ HTTP2Response {response = r, respBody} + atomically $ putTMVar resVar res + pure res -- | Disconnects client from the server and terminates client threads. closeHTTP2Client :: HTTP2Client -> IO () closeHTTP2Client = mapM_ uninterruptibleCancel . action -sendRequest :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2Response) +sendRequest :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2RequestResult) sendRequest HTTP2Client {client_ = HClient {config, reqQ}} req reqTimeout_ = do - resp <- newEmptyTMVarIO - atomically $ writeTBQueue reqQ (req, resp) + resVar <- newEmptyTMVarIO + atomically $ writeTBQueue reqQ (req, resVar) let reqTimeout = http2RequestTimeout config reqTimeout_ - maybe (Left HCResponseTimeout) Right <$> (reqTimeout `timeout` atomically (takeTMVar resp)) + maybe (Left HCResponseTimeout) Right <$> (reqTimeout `timeout` atomically (takeTMVar resVar)) -- | this function should not be used until HTTP2 is thread safe, use sendRequest -sendRequestDirect :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2Response) +sendRequestDirect :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2RequestResult) sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq} req reqTimeout_ = do let reqTimeout = http2RequestTimeout config reqTimeout_ reqTimeout `timeout` try (sendReq req process) >>= \case @@ -141,7 +151,7 @@ sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq where process r = do respBody <- getHTTP2Body r $ bodyHeadSize config - pure HTTP2Response {response = r, respBody} + pure $ HTTP2RequestResponse $ HTTP2Response {response = r, respBody} http2RequestTimeout :: HTTP2ClientConfig -> Maybe Int -> Int http2RequestTimeout HTTP2ClientConfig {connTimeout} = maybe connTimeout (connTimeout +)