Skip to content

[core] Implement Lambda streaming with custom HTTP headers #521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ Package.resolved
.vscode
Makefile
.devcontainer
.amazonq
.amazonq
.kiro
52 changes: 48 additions & 4 deletions Examples/Streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,59 @@ The sample code creates a `SendNumbersWithPause` struct that conforms to the `St

The `handle(...)` method of this protocol receives incoming events as a Swift NIO `ByteBuffer` and returns the output as a `ByteBuffer`.

The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function. The code calls the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly written before
finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not
stream the response by calling `writeAndFinish(_:)`.
The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function.

### Setting HTTP Status Code and Headers

Before streaming the response body, you can set the HTTP status code and headers using the `writeStatusAndHeaders(_:)` method:

```swift
try await responseWriter.writeStatusAndHeaders(
StreamingLambdaStatusAndHeadersResponse(
statusCode: 200,
headers: [
"Content-Type": "text/plain",
"x-my-custom-header": "streaming-example"
],
multiValueHeaders: [
"Set-Cookie": ["session=abc123", "theme=dark"]
]
)
)
```

The `StreamingLambdaStatusAndHeadersResponse` structure allows you to specify:
- **statusCode**: HTTP status code (e.g., 200, 404, 500)
- **headers**: Dictionary of single-value HTTP headers (optional)
- **multiValueHeaders**: Dictionary of multi-value HTTP headers like Set-Cookie (optional)

### Streaming the Response Body

After setting headers, you can stream the response body by calling the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly before finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not stream the response by calling `writeAndFinish(_:)`.

```swift
// Stream data in chunks
for i in 1...10 {
try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n"))
try await Task.sleep(for: .milliseconds(1000))
}

// Close the response stream
try await responseWriter.finish()
```

An error is thrown if `finish()` is called multiple times or if it is called after having called `writeAndFinish(_:)`.

### Example Usage Patterns

The example includes two handler implementations:

1. **SendNumbersWithPause**: Demonstrates basic streaming with headers, sending numbers with delays
2. **ConditionalStreamingHandler**: Shows how to handle different response scenarios, including error responses with appropriate status codes

The `handle(...)` method is marked as `mutating` to allow handlers to be implemented with a `struct`.

Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane.
Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane.

## Build & Package

Expand Down
91 changes: 88 additions & 3 deletions Examples/Streaming/Sources/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,107 @@
import AWSLambdaRuntime
import NIOCore

#if canImport(FoundationEssentials)
import FoundationEssentials
#else
import Foundation
#endif

struct SendNumbersWithPause: StreamingLambdaHandler {
func handle(
_ event: ByteBuffer,
responseWriter: some LambdaResponseStreamWriter,
context: LambdaContext
) async throws {

// Send HTTP status code and headers before streaming the response body
try await responseWriter.writeStatusAndHeaders(
StreamingLambdaStatusAndHeadersResponse(
statusCode: 200,
headers: [
"Content-Type": "text/plain",
"x-my-custom-header": "streaming-example",
],
multiValueHeaders: [
"Set-Cookie": ["session=abc123", "theme=dark"]
]
)
)

// Stream numbers with pauses to demonstrate streaming functionality
for i in 1...10 {
// Send partial data
try await responseWriter.write(ByteBuffer(string: "\(i)\n"))
// Perform some long asynchronous work
try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n"))
// Perform some long asynchronous work to simulate processing
try await Task.sleep(for: .milliseconds(1000))
}

// Send final message
try await responseWriter.write(ByteBuffer(string: "Streaming complete!\n"))

// All data has been sent. Close off the response stream.
try await responseWriter.finish()
}
}

let runtime = LambdaRuntime.init(handler: SendNumbersWithPause())
// Example of a more complex streaming handler that demonstrates different response scenarios
struct ConditionalStreamingHandler: StreamingLambdaHandler {
func handle(
_ event: ByteBuffer,
responseWriter: some LambdaResponseStreamWriter,
context: LambdaContext
) async throws {

// Parse the event to determine response type
let eventString = String(buffer: event)
let shouldError = eventString.contains("error")

if shouldError {
// Send error response with appropriate status code
try await responseWriter.writeStatusAndHeaders(
StreamingLambdaStatusAndHeadersResponse(
statusCode: 400,
headers: [
"Content-Type": "application/json",
"x-error-type": "client-error",
]
)
)

try await responseWriter.writeAndFinish(
ByteBuffer(string: #"{"error": "Bad request", "message": "Error requested in input"}"#)
)
} else {
// Send successful response with streaming data
try await responseWriter.writeStatusAndHeaders(
StreamingLambdaStatusAndHeadersResponse(
statusCode: 200,
headers: [
"Content-Type": "application/json",
"Cache-Control": "no-cache",
]
)
)

// Stream JSON array elements
try await responseWriter.write(ByteBuffer(string: "["))

for i in 1...5 {
if i > 1 {
try await responseWriter.write(ByteBuffer(string: ","))
}
try await responseWriter.write(
ByteBuffer(string: #"{"id": \#(i), "timestamp": "\#(Date().timeIntervalSince1970)"}"#)
)
try await Task.sleep(for: .milliseconds(500))
}

try await responseWriter.write(ByteBuffer(string: "]"))
try await responseWriter.finish()
}
}
}

// Use the simple example by default
let runtime = LambdaRuntime(handler: SendNumbersWithPause())
try await runtime.run()
100 changes: 100 additions & 0 deletions Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

#if canImport(FoundationEssentials)
import FoundationEssentials
#else
import Foundation
#endif

/// A response structure specifically designed for streaming Lambda responses that contains
/// HTTP status code and headers without body content.
///
/// This structure is used with `LambdaResponseStreamWriter.writeStatusAndHeaders(_:)` to send
/// HTTP response metadata before streaming the response body.
public struct StreamingLambdaStatusAndHeadersResponse: Codable, Sendable {
/// The HTTP status code for the response (e.g., 200, 404, 500)
public let statusCode: Int

/// Dictionary of single-value HTTP headers
public let headers: [String: String]?

/// Dictionary of multi-value HTTP headers (e.g., Set-Cookie headers)
public let multiValueHeaders: [String: [String]]?

/// Creates a new streaming Lambda response with status code and optional headers
///
/// - Parameters:
/// - statusCode: The HTTP status code for the response
/// - headers: Optional dictionary of single-value HTTP headers
/// - multiValueHeaders: Optional dictionary of multi-value HTTP headers
public init(
statusCode: Int,
headers: [String: String]? = nil,
multiValueHeaders: [String: [String]]? = nil
) {
self.statusCode = statusCode
self.headers = headers
self.multiValueHeaders = multiValueHeaders
}
}

extension LambdaResponseStreamWriter {
/// Writes the HTTP status code and headers to the response stream.
///
/// This method serializes the status and headers as JSON and writes them to the stream,
/// followed by eight null bytes as a separator before the response body.
///
/// - Parameters:
/// - response: The status and headers response to write
/// - encoder: The encoder to use for serializing the response,
/// - Throws: An error if JSON serialization or writing fails
public func writeStatusAndHeaders<Encoder: LambdaOutputEncoder>(
_ response: StreamingLambdaStatusAndHeadersResponse,
encoder: Encoder
) async throws where Encoder.Output == StreamingLambdaStatusAndHeadersResponse {

// Convert Data to ByteBuffer
var buffer = ByteBuffer()
try encoder.encode(response, into: &buffer)

// Write the JSON data
try await write(buffer)

// Write eight null bytes as separator
var separatorBuffer = ByteBuffer()
separatorBuffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
try await write(separatorBuffer)
}
}

extension LambdaResponseStreamWriter {
/// Writes the HTTP status code and headers to the response stream.
///
/// This method serializes the status and headers as JSON and writes them to the stream,
/// followed by eight null bytes as a separator before the response body.
///
/// - Parameters:
/// - response: The status and headers response to write
/// - encoder: The encoder to use for serializing the response, use JSONEncoder by default
/// - Throws: An error if JSON serialization or writing fails
public func writeStatusAndHeaders(
_ response: StreamingLambdaStatusAndHeadersResponse,
encoder: JSONEncoder = JSONEncoder()
) async throws {
try await self.writeStatusAndHeaders(response, encoder: LambdaJSONOutputEncoder(encoder))
}
}
Loading