From 473c2834bc953819a030806712db1e49e3728064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Fri, 4 Jul 2025 18:01:50 +0200 Subject: [PATCH 01/10] wip - support streamable --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 53 +++++++++++++------ 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index ccb45ae5..8276ee0c 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -243,22 +243,36 @@ internal struct LambdaHTTPServer { requestHead = head case .body(let body): - requestBody.setOrWriteImmutableBuffer(body) + precondition(requestHead != nil, "Received .body without .head") + + // if this is a request from a Streaming Lambda Handler, + // stream the response instead of buffering it + if self.isStreamingResponse(requestHead) { + // we are receiving a chunked body, + // we can stream the response and not accumulate the chunks + print(String(buffer: body)) + } else { + requestBody.setOrWriteImmutableBuffer(body) + } case .end: precondition(requestHead != nil, "Received .end without .head") - // process the request - let response = try await self.processRequest( - head: requestHead, - body: requestBody, - logger: logger - ) - // send the responses - try await self.sendResponse( - response: response, - outbound: outbound, - logger: logger - ) + + // process the buffered response for non streaming requests + if !self.isStreamingResponse(requestHead) { + // process the complete request + let response = try await self.processCompleteRequest( + head: requestHead, + body: requestBody, + logger: logger + ) + // send the responses + try await self.sendCompleteResponse( + response: response, + outbound: outbound, + logger: logger + ) + } requestHead = nil requestBody = nil @@ -273,6 +287,15 @@ internal struct LambdaHTTPServer { } } + /// This function checks if the request is a streaming response request + /// verb = POST, uri = :requestID/response, HTTP Header contains "Transfer-Encoding: chunked" + private func isStreamingResponse(_ requestHead: HTTPRequestHead) -> Bool { + requestHead.method == .POST && + requestHead.uri.hasSuffix(Consts.postResponseURLSuffix) && + requestHead.headers.contains(name: "Transfer-Encoding") && + requestHead.headers["Transfer-Encoding"].contains("chunked") + } + /// This function process the URI request sent by the client and by the Lambda function /// /// It enqueues the client invocation and iterate over the invocation queue when the Lambda function sends /next request @@ -283,7 +306,7 @@ internal struct LambdaHTTPServer { /// - body: the HTTP request body /// - Throws: /// - Returns: the response to send back to the client or the Lambda function - private func processRequest( + private func processCompleteRequest( head: HTTPRequestHead, body: ByteBuffer?, logger: Logger @@ -406,7 +429,7 @@ internal struct LambdaHTTPServer { } } - private func sendResponse( + private func sendCompleteResponse( response: LocalServerResponse, outbound: NIOAsyncChannelOutboundWriter, logger: Logger From b2804555cced56adc7eeb5afd5bf0ed0fcddb5f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 12 Jul 2025 13:13:24 +0200 Subject: [PATCH 02/10] implement streaming on the server side --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 163 +++++++++++------- 1 file changed, 101 insertions(+), 62 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 1dbc7a2b..390d93d2 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -235,6 +235,7 @@ internal struct LambdaHTTPServer { var requestHead: HTTPRequestHead! var requestBody: ByteBuffer? + var requestId: String? // Note that this method is non-throwing and we are catching any error. // We do this since we don't want to tear down the whole server when a single connection @@ -246,6 +247,17 @@ internal struct LambdaHTTPServer { switch inboundData { case .head(let head): requestHead = head + requestId = getRequestId(from: requestHead) + + // for streaming requests, push a partial head response + if self.isStreamingResponse(requestHead) { + await self.responsePool.push( + LocalServerResponse( + id: requestId, + status: .ok + ) + ) + } case .body(let body): precondition(requestHead != nil, "Received .body without .head") @@ -253,9 +265,9 @@ internal struct LambdaHTTPServer { // if this is a request from a Streaming Lambda Handler, // stream the response instead of buffering it if self.isStreamingResponse(requestHead) { - // we are receiving a chunked body, - // we can stream the response and not accumulate the chunks - print(String(buffer: body)) + await self.responsePool.push( + LocalServerResponse(id: requestId, body: body) + ) } else { requestBody.setOrWriteImmutableBuffer(body) } @@ -265,22 +277,23 @@ internal struct LambdaHTTPServer { // process the buffered response for non streaming requests if !self.isStreamingResponse(requestHead) { - // process the complete request - let response = try await self.processCompleteRequest( + // process the request and send the response + try await self.processRequestAndSendResponse( head: requestHead, body: requestBody, - logger: logger - ) - // send the responses - try await self.sendCompleteResponse( - response: response, outbound: outbound, logger: logger ) + } else { + await self.responsePool.push( + LocalServerResponse(id: requestId, final: true) + ) + } requestHead = nil requestBody = nil + requestId = nil } } } @@ -304,6 +317,11 @@ internal struct LambdaHTTPServer { requestHead.headers["Transfer-Encoding"].contains("chunked") } + /// This function pareses and returns the requestId or nil if the request is malformed + private func getRequestId(from head: HTTPRequestHead) -> String? { + let parts = head.uri.split(separator: "/") + return parts.count > 2 ? String(parts[parts.count - 2]) : nil + } /// This function process the URI request sent by the client and by the Lambda function /// /// It enqueues the client invocation and iterate over the invocation queue when the Lambda function sends /next request @@ -314,19 +332,22 @@ internal struct LambdaHTTPServer { /// - body: the HTTP request body /// - Throws: /// - Returns: the response to send back to the client or the Lambda function - private func processCompleteRequest( + private func processRequestAndSendResponse( head: HTTPRequestHead, body: ByteBuffer?, + outbound: NIOAsyncChannelOutboundWriter, logger: Logger - ) async throws -> LocalServerResponse { + ) async throws { + var logger = logger + logger[metadataKey: "URI"] = "\(head.method) \(head.uri)" if let body { logger.trace( "Processing request", - metadata: ["URI": "\(head.method) \(head.uri)", "Body": "\(String(buffer: body))"] + metadata: ["Body": "\(String(buffer: body))"] ) } else { - logger.trace("Processing request", metadata: ["URI": "\(head.method) \(head.uri)"]) + logger.trace("Processing request") } switch (head.method, head.uri) { @@ -337,27 +358,32 @@ internal struct LambdaHTTPServer { // client POST /invoke case (.POST, let url) where url.hasSuffix(self.invocationEndpoint): guard let body else { - return .init(status: .badRequest, headers: [], body: nil) + return try await sendResponse(.init(status: .badRequest), outbound: outbound, logger: logger) } // we always accept the /invoke request and push them to the pool let requestId = "\(DispatchTime.now().uptimeNanoseconds)" - var logger = logger logger[metadataKey: "requestID"] = "\(requestId)" - logger.trace("/invoke received invocation") + logger.trace("/invoke received invocation, pushing it to the stack") await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) // wait for the lambda function to process the request for try await response in self.responsePool { - logger.trace( - "Received response to return to client", - metadata: ["requestId": "\(response.requestId ?? "")"] - ) + logger[metadataKey: "requestID"] = "\(requestId)" + logger.trace("Received response to return to client") if response.requestId == requestId { - return response + logger.trace("/invoke requestId is valid, sending the response") + // send the response to the client + // if the response is final, we can send it and return + // if the response is not final, we can send it and wait for the next response + try await self.sendResponse(response, outbound: outbound, logger: logger) + if response.final == true { + logger.trace("/invoke returning") + return // if the response is final, we can return and close the connection + } } else { logger.error( "Received response for a different request id", - metadata: ["response requestId": "\(response.requestId ?? "")", "requestId": "\(requestId)"] + metadata: ["response requestId": "\(response.requestId ?? "")"] ) // should we return an error here ? Or crash as this is probably a programming error? } @@ -368,7 +394,7 @@ internal struct LambdaHTTPServer { // client uses incorrect HTTP method case (_, let url) where url.hasSuffix(self.invocationEndpoint): - return .init(status: .methodNotAllowed) + return try await sendResponse(.init(status: .methodNotAllowed), outbound: outbound, logger: logger) // // lambda invocations @@ -381,9 +407,10 @@ internal struct LambdaHTTPServer { // pop the tasks from the queue logger.trace("/next waiting for /invoke") for try await invocation in self.invocationPool { - logger.trace("/next retrieved invocation", metadata: ["requestId": "\(invocation.requestId)"]) - // this call also stores the invocation requestId into the response - return invocation.makeResponse(status: .accepted) + logger[metadataKey: "requestId"] = "\(invocation.requestId)" + logger.trace("/next retrieved invocation") + // tell the lambda function we accepted the invocation + return try await sendResponse(invocation.acceptedResponse(), outbound: outbound, logger: logger) } // What todo when there is no more tasks to process? // This should not happen as the async iterator blocks until there is a task to process @@ -391,10 +418,9 @@ internal struct LambdaHTTPServer { // :requestID/response endpoint is called by the lambda posting the response case (.POST, let url) where url.hasSuffix(Consts.postResponseURLSuffix): - let parts = head.uri.split(separator: "/") - guard let requestID = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + guard let requestID = getRequestId(from: head) else { // the request is malformed, since we were expecting a requestId in the path - return .init(status: .badRequest) + return try await sendResponse(.init(status: .badRequest), outbound: outbound, logger: logger) } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestID/response received response", metadata: ["requestId": "\(requestID)"]) @@ -402,21 +428,20 @@ internal struct LambdaHTTPServer { LocalServerResponse( id: requestID, status: .ok, - headers: [("Content-Type", "application/json")], + headers: HTTPHeaders([("Content-Type", "application/json")]), body: body ) ) // tell the Lambda function we accepted the response - return .init(id: requestID, status: .accepted) + return try await sendResponse(.init(id: requestID, status: .accepted), outbound: outbound, logger: logger) // :requestID/error endpoint is called by the lambda posting an error response // we accept all requestID and we do not handle the body, we just acknowledge the request case (.POST, let url) where url.hasSuffix(Consts.postErrorURLSuffix): - let parts = head.uri.split(separator: "/") - guard let requestID = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + guard let requestID = getRequestId(from: head) else { // the request is malformed, since we were expecting a requestId in the path - return .init(status: .badRequest) + return try await sendResponse(.init(status: .badRequest), outbound: outbound, logger: logger) } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestID/response received response", metadata: ["requestId": "\(requestID)"]) @@ -424,42 +449,55 @@ internal struct LambdaHTTPServer { LocalServerResponse( id: requestID, status: .internalServerError, - headers: [("Content-Type", "application/json")], + headers: HTTPHeaders([("Content-Type", "application/json")]), body: body ) ) - return .init(status: .accepted) + return try await sendResponse(.init(status: .accepted), outbound: outbound, logger: logger) // unknown call default: - return .init(status: .notFound) + return try await sendResponse(.init(status: .notFound), outbound: outbound, logger: logger) } } - private func sendCompleteResponse( - response: LocalServerResponse, + private func sendResponse( + _ response: LocalServerResponse, outbound: NIOAsyncChannelOutboundWriter, logger: Logger ) async throws { - var headers = HTTPHeaders(response.headers ?? []) - headers.add(name: "Content-Length", value: "\(response.body?.readableBytes ?? 0)") - - logger.trace("Writing response", metadata: ["requestId": "\(response.requestId ?? "")"]) - try await outbound.write( - HTTPServerResponsePart.head( - HTTPResponseHead( - version: .init(major: 1, minor: 1), - status: response.status, - headers: headers + var logger = logger + logger[metadataKey: "requestId"] = "\(response.requestId ?? "nil")" + logger.trace("Writing response") + + var headers = response.headers ?? HTTPHeaders() + if let body = response.body { + headers.add(name: "Content-Length", value: "\(body.readableBytes)") + } + + if let status = response.status { + logger.trace("Sending status and headers") + try await outbound.write( + HTTPServerResponsePart.head( + HTTPResponseHead( + version: .init(major: 1, minor: 1), + status: status, + headers: headers + ) ) ) - ) + } + if let body = response.body { + logger.trace("Sending body") try await outbound.write(HTTPServerResponsePart.body(.byteBuffer(body))) } - try await outbound.write(HTTPServerResponsePart.end(nil)) + if response.final { + logger.trace("Sending end") + try await outbound.write(HTTPServerResponsePart.end(nil)) + } } /// A shared data structure to store the current invocation or response requests and the continuation objects. @@ -543,15 +581,16 @@ internal struct LambdaHTTPServer { private struct LocalServerResponse: Sendable { let requestId: String? - let status: HTTPResponseStatus - let headers: [(String, String)]? + let status: HTTPResponseStatus? + let headers: HTTPHeaders? let body: ByteBuffer? - init(id: String? = nil, status: HTTPResponseStatus, headers: [(String, String)]? = nil, body: ByteBuffer? = nil) - { + let final: Bool + init(id: String? = nil, status: HTTPResponseStatus? = nil, headers: HTTPHeaders? = nil, body: ByteBuffer? = nil, final: Bool = false) { self.requestId = id self.status = status self.headers = headers self.body = body + self.final = final } } @@ -559,20 +598,20 @@ internal struct LambdaHTTPServer { let requestId: String let request: ByteBuffer - func makeResponse(status: HTTPResponseStatus) -> LocalServerResponse { + func acceptedResponse() -> LocalServerResponse { // required headers - let headers = [ + let headers = HTTPHeaders([ (AmazonHeaders.requestID, self.requestId), ( - AmazonHeaders.invokedFunctionARN, - "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime" + AmazonHeaders.invokedFunctionARN, + "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime" ), (AmazonHeaders.traceID, "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=1"), (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), - ] + ]) - return LocalServerResponse(id: self.requestId, status: status, headers: headers, body: self.request) + return LocalServerResponse(id: self.requestId, status: .accepted, headers: headers, body: self.request, final: true) } } } From c3f034063c3280d3374b3433852edb70f3d3cccd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 12 Jul 2025 13:16:28 +0200 Subject: [PATCH 03/10] swift format --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 390d93d2..0709ddfe 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -274,7 +274,7 @@ internal struct LambdaHTTPServer { case .end: precondition(requestHead != nil, "Received .end without .head") - + // process the buffered response for non streaming requests if !self.isStreamingResponse(requestHead) { // process the request and send the response @@ -288,7 +288,7 @@ internal struct LambdaHTTPServer { await self.responsePool.push( LocalServerResponse(id: requestId, final: true) ) - + } requestHead = nil @@ -311,16 +311,15 @@ internal struct LambdaHTTPServer { /// This function checks if the request is a streaming response request /// verb = POST, uri = :requestID/response, HTTP Header contains "Transfer-Encoding: chunked" private func isStreamingResponse(_ requestHead: HTTPRequestHead) -> Bool { - requestHead.method == .POST && - requestHead.uri.hasSuffix(Consts.postResponseURLSuffix) && - requestHead.headers.contains(name: "Transfer-Encoding") && - requestHead.headers["Transfer-Encoding"].contains("chunked") + requestHead.method == .POST && requestHead.uri.hasSuffix(Consts.postResponseURLSuffix) + && requestHead.headers.contains(name: "Transfer-Encoding") + && requestHead.headers["Transfer-Encoding"].contains("chunked") } /// This function pareses and returns the requestId or nil if the request is malformed private func getRequestId(from head: HTTPRequestHead) -> String? { - let parts = head.uri.split(separator: "/") - return parts.count > 2 ? String(parts[parts.count - 2]) : nil + let parts = head.uri.split(separator: "/") + return parts.count > 2 ? String(parts[parts.count - 2]) : nil } /// This function process the URI request sent by the client and by the Lambda function /// @@ -378,7 +377,7 @@ internal struct LambdaHTTPServer { try await self.sendResponse(response, outbound: outbound, logger: logger) if response.final == true { logger.trace("/invoke returning") - return // if the response is final, we can return and close the connection + return // if the response is final, we can return and close the connection } } else { logger.error( @@ -497,7 +496,7 @@ internal struct LambdaHTTPServer { if response.final { logger.trace("Sending end") try await outbound.write(HTTPServerResponsePart.end(nil)) - } + } } /// A shared data structure to store the current invocation or response requests and the continuation objects. @@ -585,7 +584,13 @@ internal struct LambdaHTTPServer { let headers: HTTPHeaders? let body: ByteBuffer? let final: Bool - init(id: String? = nil, status: HTTPResponseStatus? = nil, headers: HTTPHeaders? = nil, body: ByteBuffer? = nil, final: Bool = false) { + init( + id: String? = nil, + status: HTTPResponseStatus? = nil, + headers: HTTPHeaders? = nil, + body: ByteBuffer? = nil, + final: Bool = false + ) { self.requestId = id self.status = status self.headers = headers @@ -604,14 +609,20 @@ internal struct LambdaHTTPServer { let headers = HTTPHeaders([ (AmazonHeaders.requestID, self.requestId), ( - AmazonHeaders.invokedFunctionARN, - "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime" + AmazonHeaders.invokedFunctionARN, + "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime" ), (AmazonHeaders.traceID, "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=1"), (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), ]) - return LocalServerResponse(id: self.requestId, status: .accepted, headers: headers, body: self.request, final: true) + return LocalServerResponse( + id: self.requestId, + status: .accepted, + headers: headers, + body: self.request, + final: true + ) } } } From 2b1cae5c2ddec5225fcbcfb9d11ce8a7725e9a81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 12 Jul 2025 13:19:05 +0200 Subject: [PATCH 04/10] use the same if / else logic for .head, .body, and .end --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 0709ddfe..de3787a7 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -275,22 +275,22 @@ internal struct LambdaHTTPServer { case .end: precondition(requestHead != nil, "Received .end without .head") - // process the buffered response for non streaming requests - if !self.isStreamingResponse(requestHead) { - // process the request and send the response + if self.isStreamingResponse(requestHead) { + // for streaming response, send the final response + await self.responsePool.push( + LocalServerResponse(id: requestId, final: true) + ) + } else { + // process the buffered response for non streaming requests try await self.processRequestAndSendResponse( head: requestHead, body: requestBody, outbound: outbound, logger: logger ) - } else { - await self.responsePool.push( - LocalServerResponse(id: requestId, final: true) - ) - } + // reset the request state for next request requestHead = nil requestBody = nil requestId = nil From f33ecaa2113d718751c82237b0f19691b8085a31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 12 Jul 2025 13:26:13 +0200 Subject: [PATCH 05/10] write requestId with lowercase d --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index de3787a7..bf912e7a 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -67,8 +67,8 @@ extension Lambda { /// /// It accepts three types of requests from the Lambda function (through the LambdaRuntimeClient): /// 1. GET /next - the lambda function polls this endpoint to get the next invocation request -/// 2. POST /:requestID/response - the lambda function posts the response to the invocation request -/// 3. POST /:requestID/error - the lambda function posts an error response to the invocation request +/// 2. POST /:requestId/response - the lambda function posts the response to the invocation request +/// 3. POST /:requestId/error - the lambda function posts an error response to the invocation request /// /// It also accepts one type of request from the client invoking the lambda function: /// 1. POST /invoke - the client posts the event to the lambda function @@ -309,14 +309,15 @@ internal struct LambdaHTTPServer { } /// This function checks if the request is a streaming response request - /// verb = POST, uri = :requestID/response, HTTP Header contains "Transfer-Encoding: chunked" + /// verb = POST, uri = :requestId/response, HTTP Header contains "Transfer-Encoding: chunked" private func isStreamingResponse(_ requestHead: HTTPRequestHead) -> Bool { requestHead.method == .POST && requestHead.uri.hasSuffix(Consts.postResponseURLSuffix) && requestHead.headers.contains(name: "Transfer-Encoding") - && requestHead.headers["Transfer-Encoding"].contains("chunked") + && (requestHead.headers["Transfer-Encoding"].contains("chunked") + || requestHead.headers["Transfer-Encoding"].contains("Chunked")) } - /// This function pareses and returns the requestId or nil if the request is malformed + /// This function parses and returns the requestId or nil if the request doesn't contain a requestId private func getRequestId(from head: HTTPRequestHead) -> String? { let parts = head.uri.split(separator: "/") return parts.count > 2 ? String(parts[parts.count - 2]) : nil @@ -324,7 +325,7 @@ internal struct LambdaHTTPServer { /// This function process the URI request sent by the client and by the Lambda function /// /// It enqueues the client invocation and iterate over the invocation queue when the Lambda function sends /next request - /// It answers the /:requestID/response and /:requestID/error requests sent by the Lambda function but do not process the body + /// It answers the /:requestId/response and /:requestId/error requests sent by the Lambda function but do not process the body /// /// - Parameters: /// - head: the HTTP request head @@ -361,13 +362,13 @@ internal struct LambdaHTTPServer { } // we always accept the /invoke request and push them to the pool let requestId = "\(DispatchTime.now().uptimeNanoseconds)" - logger[metadataKey: "requestID"] = "\(requestId)" + logger[metadataKey: "requestId"] = "\(requestId)" logger.trace("/invoke received invocation, pushing it to the stack") await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) // wait for the lambda function to process the request for try await response in self.responsePool { - logger[metadataKey: "requestID"] = "\(requestId)" + logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")" logger.trace("Received response to return to client") if response.requestId == requestId { logger.trace("/invoke requestId is valid, sending the response") @@ -415,17 +416,17 @@ internal struct LambdaHTTPServer { // This should not happen as the async iterator blocks until there is a task to process fatalError("No more invocations to process - the async for loop should not return") - // :requestID/response endpoint is called by the lambda posting the response + // :requestId/response endpoint is called by the lambda posting the response case (.POST, let url) where url.hasSuffix(Consts.postResponseURLSuffix): - guard let requestID = getRequestId(from: head) else { + guard let requestId = getRequestId(from: head) else { // the request is malformed, since we were expecting a requestId in the path return try await sendResponse(.init(status: .badRequest), outbound: outbound, logger: logger) } // enqueue the lambda function response to be served as response to the client /invoke - logger.trace("/:requestID/response received response", metadata: ["requestId": "\(requestID)"]) + logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) await self.responsePool.push( LocalServerResponse( - id: requestID, + id: requestId, status: .ok, headers: HTTPHeaders([("Content-Type", "application/json")]), body: body @@ -433,20 +434,20 @@ internal struct LambdaHTTPServer { ) // tell the Lambda function we accepted the response - return try await sendResponse(.init(id: requestID, status: .accepted), outbound: outbound, logger: logger) + return try await sendResponse(.init(id: requestId, status: .accepted), outbound: outbound, logger: logger) - // :requestID/error endpoint is called by the lambda posting an error response - // we accept all requestID and we do not handle the body, we just acknowledge the request + // :requestId/error endpoint is called by the lambda posting an error response + // we accept all requestId and we do not handle the body, we just acknowledge the request case (.POST, let url) where url.hasSuffix(Consts.postErrorURLSuffix): - guard let requestID = getRequestId(from: head) else { + guard let requestId = getRequestId(from: head) else { // the request is malformed, since we were expecting a requestId in the path return try await sendResponse(.init(status: .badRequest), outbound: outbound, logger: logger) } // enqueue the lambda function response to be served as response to the client /invoke - logger.trace("/:requestID/response received response", metadata: ["requestId": "\(requestID)"]) + logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) await self.responsePool.push( LocalServerResponse( - id: requestID, + id: requestId, status: .internalServerError, headers: HTTPHeaders([("Content-Type", "application/json")]), body: body From 9d5e4f32194378a3a0f9cd81c66c186ce40fca27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 12 Jul 2025 14:45:04 +0200 Subject: [PATCH 06/10] fix error that prevented to call the server twice --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index bf912e7a..2fad2048 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -363,7 +363,7 @@ internal struct LambdaHTTPServer { // we always accept the /invoke request and push them to the pool let requestId = "\(DispatchTime.now().uptimeNanoseconds)" logger[metadataKey: "requestId"] = "\(requestId)" - logger.trace("/invoke received invocation, pushing it to the stack") + logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) // wait for the lambda function to process the request @@ -429,12 +429,13 @@ internal struct LambdaHTTPServer { id: requestId, status: .ok, headers: HTTPHeaders([("Content-Type", "application/json")]), - body: body + body: body, + final: true ) ) // tell the Lambda function we accepted the response - return try await sendResponse(.init(id: requestId, status: .accepted), outbound: outbound, logger: logger) + return try await sendResponse(.init(id: requestId, status: .accepted, final: true), outbound: outbound, logger: logger) // :requestId/error endpoint is called by the lambda posting an error response // we accept all requestId and we do not handle the body, we just acknowledge the request @@ -469,7 +470,7 @@ internal struct LambdaHTTPServer { ) async throws { var logger = logger logger[metadataKey: "requestId"] = "\(response.requestId ?? "nil")" - logger.trace("Writing response") + logger.trace("Writing response for \(response.status?.code ?? 0)") var headers = response.headers ?? HTTPHeaders() if let body = response.body { From 3897986ff4a4646514f78a9a261ddbcefe445da2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 12 Jul 2025 14:47:27 +0200 Subject: [PATCH 07/10] add more final requests --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 37 +++++++++++++++---- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 2fad2048..34ad2163 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -358,7 +358,11 @@ internal struct LambdaHTTPServer { // client POST /invoke case (.POST, let url) where url.hasSuffix(self.invocationEndpoint): guard let body else { - return try await sendResponse(.init(status: .badRequest), outbound: outbound, logger: logger) + return try await sendResponse( + .init(status: .badRequest, final: true), + outbound: outbound, + logger: logger + ) } // we always accept the /invoke request and push them to the pool let requestId = "\(DispatchTime.now().uptimeNanoseconds)" @@ -394,7 +398,11 @@ internal struct LambdaHTTPServer { // client uses incorrect HTTP method case (_, let url) where url.hasSuffix(self.invocationEndpoint): - return try await sendResponse(.init(status: .methodNotAllowed), outbound: outbound, logger: logger) + return try await sendResponse( + .init(status: .methodNotAllowed, final: true), + outbound: outbound, + logger: logger + ) // // lambda invocations @@ -420,7 +428,11 @@ internal struct LambdaHTTPServer { case (.POST, let url) where url.hasSuffix(Consts.postResponseURLSuffix): guard let requestId = getRequestId(from: head) else { // the request is malformed, since we were expecting a requestId in the path - return try await sendResponse(.init(status: .badRequest), outbound: outbound, logger: logger) + return try await sendResponse( + .init(status: .badRequest, final: true), + outbound: outbound, + logger: logger + ) } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) @@ -435,14 +447,22 @@ internal struct LambdaHTTPServer { ) // tell the Lambda function we accepted the response - return try await sendResponse(.init(id: requestId, status: .accepted, final: true), outbound: outbound, logger: logger) + return try await sendResponse( + .init(id: requestId, status: .accepted, final: true), + outbound: outbound, + logger: logger + ) // :requestId/error endpoint is called by the lambda posting an error response // we accept all requestId and we do not handle the body, we just acknowledge the request case (.POST, let url) where url.hasSuffix(Consts.postErrorURLSuffix): guard let requestId = getRequestId(from: head) else { // the request is malformed, since we were expecting a requestId in the path - return try await sendResponse(.init(status: .badRequest), outbound: outbound, logger: logger) + return try await sendResponse( + .init(status: .badRequest, final: true), + outbound: outbound, + logger: logger + ) } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) @@ -451,15 +471,16 @@ internal struct LambdaHTTPServer { id: requestId, status: .internalServerError, headers: HTTPHeaders([("Content-Type", "application/json")]), - body: body + body: body, + final: true ) ) - return try await sendResponse(.init(status: .accepted), outbound: outbound, logger: logger) + return try await sendResponse(.init(status: .accepted, final: true), outbound: outbound, logger: logger) // unknown call default: - return try await sendResponse(.init(status: .notFound), outbound: outbound, logger: logger) + return try await sendResponse(.init(status: .notFound, final: true), outbound: outbound, logger: logger) } } From d48950455c2008ef2f663c6c9c7dffff93976092 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 13 Jul 2025 08:38:17 +0200 Subject: [PATCH 08/10] do not hardcode header content-type on response --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 34ad2163..03cebc59 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -440,7 +440,7 @@ internal struct LambdaHTTPServer { LocalServerResponse( id: requestId, status: .ok, - headers: HTTPHeaders([("Content-Type", "application/json")]), + headers: HTTPHeaders(), // the local server has no mecanism to collect headers set by the lambda function body: body, final: true ) From e687de050454453442286019fe69241374183dad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 13 Jul 2025 08:40:09 +0200 Subject: [PATCH 09/10] swift-format --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 03cebc59..b129f6ac 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -440,7 +440,8 @@ internal struct LambdaHTTPServer { LocalServerResponse( id: requestId, status: .ok, - headers: HTTPHeaders(), // the local server has no mecanism to collect headers set by the lambda function + // the local server has no mecanism to collect headers set by the lambda function + headers: HTTPHeaders(), body: body, final: true ) From b62a8ad9fc49c5198b671b93bfcc4cf9e1a573fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Tue, 15 Jul 2025 11:54:49 +0200 Subject: [PATCH 10/10] add instruction for local testing in readme --- Examples/Streaming/README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/Examples/Streaming/README.md b/Examples/Streaming/README.md index 86a42754..26202da9 100644 --- a/Examples/Streaming/README.md +++ b/Examples/Streaming/README.md @@ -34,6 +34,20 @@ swift package archive --allow-network-connections docker If there is no error, there is a ZIP file ready to deploy. The ZIP file is located at `.build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingNumbers/StreamingNumbers.zip` +## Test locally + +You can test the function locally before deploying: + +```bash +swift run + +# In another terminal, test with curl: +curl -v \ + --header "Content-Type: application/json" \ + --data '"this is not used"' \ + http://127.0.0.1:7000/invoke +``` + ## Deploy with the AWS CLI Here is how to deploy using the `aws` command line.