From 473c2834bc953819a030806712db1e49e3728064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Fri, 4 Jul 2025 18:01:50 +0200 Subject: [PATCH 1/3] wip - support streamable --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 53 +++++++++++++------ 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index ccb45ae5..8276ee0c 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -243,22 +243,36 @@ internal struct LambdaHTTPServer { requestHead = head case .body(let body): - requestBody.setOrWriteImmutableBuffer(body) + precondition(requestHead != nil, "Received .body without .head") + + // if this is a request from a Streaming Lambda Handler, + // stream the response instead of buffering it + if self.isStreamingResponse(requestHead) { + // we are receiving a chunked body, + // we can stream the response and not accumulate the chunks + print(String(buffer: body)) + } else { + requestBody.setOrWriteImmutableBuffer(body) + } case .end: precondition(requestHead != nil, "Received .end without .head") - // process the request - let response = try await self.processRequest( - head: requestHead, - body: requestBody, - logger: logger - ) - // send the responses - try await self.sendResponse( - response: response, - outbound: outbound, - logger: logger - ) + + // process the buffered response for non streaming requests + if !self.isStreamingResponse(requestHead) { + // process the complete request + let response = try await self.processCompleteRequest( + head: requestHead, + body: requestBody, + logger: logger + ) + // send the responses + try await self.sendCompleteResponse( + response: response, + outbound: outbound, + logger: logger + ) + } requestHead = nil requestBody = nil @@ -273,6 +287,15 @@ internal struct LambdaHTTPServer { } } + /// This function checks if the request is a streaming response request + /// verb = POST, uri = :requestID/response, HTTP Header contains "Transfer-Encoding: chunked" + private func isStreamingResponse(_ requestHead: HTTPRequestHead) -> Bool { + requestHead.method == .POST && + requestHead.uri.hasSuffix(Consts.postResponseURLSuffix) && + requestHead.headers.contains(name: "Transfer-Encoding") && + requestHead.headers["Transfer-Encoding"].contains("chunked") + } + /// This function process the URI request sent by the client and by the Lambda function /// /// It enqueues the client invocation and iterate over the invocation queue when the Lambda function sends /next request @@ -283,7 +306,7 @@ internal struct LambdaHTTPServer { /// - body: the HTTP request body /// - Throws: /// - Returns: the response to send back to the client or the Lambda function - private func processRequest( + private func processCompleteRequest( head: HTTPRequestHead, body: ByteBuffer?, logger: Logger @@ -406,7 +429,7 @@ internal struct LambdaHTTPServer { } } - private func sendResponse( + private func sendCompleteResponse( response: LocalServerResponse, outbound: NIOAsyncChannelOutboundWriter, logger: Logger From 4828dd9190cb6343fdbcd9542b5b324e0929b7c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 12 Jul 2025 10:58:13 +0200 Subject: [PATCH 2/3] minor changes in tests --- ...eTests.swift => Lambda+CodableTests.swift} | 28 +++++++++---------- .../LambdaRunLoopTests.swift | 2 +- .../LambdaRuntimeTests.swift | 4 +-- ...ockClient.swift => MockLambdaClient.swift} | 10 +++---- 4 files changed, 22 insertions(+), 22 deletions(-) rename Tests/AWSLambdaRuntimeTests/{NewLambda+CodableTests.swift => Lambda+CodableTests.swift} (82%) rename Tests/AWSLambdaRuntimeTests/{LambdaMockClient.swift => MockLambdaClient.swift} (97%) diff --git a/Tests/AWSLambdaRuntimeTests/NewLambda+CodableTests.swift b/Tests/AWSLambdaRuntimeTests/Lambda+CodableTests.swift similarity index 82% rename from Tests/AWSLambdaRuntimeTests/NewLambda+CodableTests.swift rename to Tests/AWSLambdaRuntimeTests/Lambda+CodableTests.swift index 35d56225..9d1cec21 100644 --- a/Tests/AWSLambdaRuntimeTests/NewLambda+CodableTests.swift +++ b/Tests/AWSLambdaRuntimeTests/Lambda+CodableTests.swift @@ -77,24 +77,24 @@ struct JSONTests { let result = await writer.output #expect(result == ByteBuffer(string: #"{"bar":"baz"}"#)) } -} -final actor MockLambdaWriter: LambdaResponseStreamWriter { - private var _buffer: ByteBuffer? + final actor MockLambdaWriter: LambdaResponseStreamWriter { + private var _buffer: ByteBuffer? - var output: ByteBuffer? { - self._buffer - } + var output: ByteBuffer? { + self._buffer + } - func writeAndFinish(_ buffer: ByteBuffer) async throws { - self._buffer = buffer - } + func writeAndFinish(_ buffer: ByteBuffer) async throws { + self._buffer = buffer + } - func write(_ buffer: ByteBuffer) async throws { - fatalError("Unexpected call") - } + func write(_ buffer: ByteBuffer) async throws { + fatalError("Unexpected call") + } - func finish() async throws { - fatalError("Unexpected call") + func finish() async throws { + fatalError("Unexpected call") + } } } diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRunLoopTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRunLoopTests.swift index 3253238e..ec1c9265 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRunLoopTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRunLoopTests.swift @@ -48,7 +48,7 @@ struct LambdaRunLoopTests { } } - let mockClient = LambdaMockClient() + let mockClient = MockLambdaClient() let mockEchoHandler = MockEchoHandler() let failingHandler = FailingHandler() diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeTests.swift index cd519d76..76057ddf 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeTests.swift @@ -50,7 +50,7 @@ struct LambdaRuntimeTests { } // wait a small amount to ensure runtime1 task is started - try await Task.sleep(for: .seconds(1)) + try await Task.sleep(for: .seconds(0.5)) // Running the second runtime should trigger LambdaRuntimeError await #expect(throws: LambdaRuntimeError.self) { @@ -71,7 +71,7 @@ struct LambdaRuntimeTests { } // Set timeout and cancel the runtime 2 - try await Task.sleep(for: .seconds(2)) + try await Task.sleep(for: .seconds(1)) taskGroup.cancelAll() } } diff --git a/Tests/AWSLambdaRuntimeTests/LambdaMockClient.swift b/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift similarity index 97% rename from Tests/AWSLambdaRuntimeTests/LambdaMockClient.swift rename to Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift index 7714c84a..b9a97933 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaMockClient.swift +++ b/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift @@ -22,10 +22,10 @@ import FoundationEssentials import Foundation #endif -struct LambdaMockWriter: LambdaRuntimeClientResponseStreamWriter { - var underlying: LambdaMockClient +struct MockLambdaWriter: LambdaRuntimeClientResponseStreamWriter { + var underlying: MockLambdaClient - init(underlying: LambdaMockClient) { + init(underlying: MockLambdaClient) { self.underlying = underlying } @@ -55,8 +55,8 @@ enum LambdaError: Error, Equatable { case handlerError } -final actor LambdaMockClient: LambdaRuntimeClientProtocol { - typealias Writer = LambdaMockWriter +final actor MockLambdaClient: LambdaRuntimeClientProtocol { + typealias Writer = MockLambdaWriter private struct StateMachine { private enum State { From c5fd0acfab09b103408123dda42c67c8edde786b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 12 Jul 2025 11:01:24 +0200 Subject: [PATCH 3/3] revert wip on streaming local server --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 53 ++++++------------- 1 file changed, 15 insertions(+), 38 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 1dbc7a2b..bdf8f034 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -248,36 +248,22 @@ internal struct LambdaHTTPServer { requestHead = head case .body(let body): - precondition(requestHead != nil, "Received .body without .head") - - // if this is a request from a Streaming Lambda Handler, - // stream the response instead of buffering it - if self.isStreamingResponse(requestHead) { - // we are receiving a chunked body, - // we can stream the response and not accumulate the chunks - print(String(buffer: body)) - } else { - requestBody.setOrWriteImmutableBuffer(body) - } + requestBody.setOrWriteImmutableBuffer(body) case .end: precondition(requestHead != nil, "Received .end without .head") - - // process the buffered response for non streaming requests - if !self.isStreamingResponse(requestHead) { - // process the complete request - let response = try await self.processCompleteRequest( - head: requestHead, - body: requestBody, - logger: logger - ) - // send the responses - try await self.sendCompleteResponse( - response: response, - outbound: outbound, - logger: logger - ) - } + // process the request + let response = try await self.processRequest( + head: requestHead, + body: requestBody, + logger: logger + ) + // send the responses + try await self.sendResponse( + response: response, + outbound: outbound, + logger: logger + ) requestHead = nil requestBody = nil @@ -295,15 +281,6 @@ internal struct LambdaHTTPServer { } } - /// This function checks if the request is a streaming response request - /// verb = POST, uri = :requestID/response, HTTP Header contains "Transfer-Encoding: chunked" - private func isStreamingResponse(_ requestHead: HTTPRequestHead) -> Bool { - requestHead.method == .POST && - requestHead.uri.hasSuffix(Consts.postResponseURLSuffix) && - requestHead.headers.contains(name: "Transfer-Encoding") && - requestHead.headers["Transfer-Encoding"].contains("chunked") - } - /// This function process the URI request sent by the client and by the Lambda function /// /// It enqueues the client invocation and iterate over the invocation queue when the Lambda function sends /next request @@ -314,7 +291,7 @@ internal struct LambdaHTTPServer { /// - body: the HTTP request body /// - Throws: /// - Returns: the response to send back to the client or the Lambda function - private func processCompleteRequest( + private func processRequest( head: HTTPRequestHead, body: ByteBuffer?, logger: Logger @@ -437,7 +414,7 @@ internal struct LambdaHTTPServer { } } - private func sendCompleteResponse( + private func sendResponse( response: LocalServerResponse, outbound: NIOAsyncChannelOutboundWriter, logger: Logger