Skip to content

Commit

Permalink
support server event streams
Browse files Browse the repository at this point in the history
* Server event streams
* Rename EventStreamInput to EventStreamSender
* Make event stream errors optional
* Pokemon service model updated
* Pokemon server event handler
* Pokemon client to test event streams
* EventStreamDecorator to make optional using SigV4 signing

Closes: #1157

Signed-off-by: Daniele Ahmed <ahmeddan@amazon.de>
  • Loading branch information
82marbag committed Jun 21, 2022
1 parent c3f3730 commit 92e95f4
Show file tree
Hide file tree
Showing 27 changed files with 570 additions and 99 deletions.
38 changes: 37 additions & 1 deletion CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,42 @@
# meta = { "breaking" = false, "tada" = false, "bug" = false }
# author = "rcoh"

[[smithy-rs]]
message = "Rename EventStreamInput<> to EventStreamSender<>"
references = ["smithy-rs#1157"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "82marbag"

[[smithy-rs]]
message = "Add ability to sign a request with all headers, or to change which headers are excluded from signing"
references = ["smithy-rs#1381"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "alonlud"

[[aws-sdk-rust]]
message = "Add method `ByteStream::into_async_read`. This makes it easy to convert `ByteStream`s into a struct implementing `tokio:io::AsyncRead`. Available on **crate feature** `rt-tokio` only."
references = ["smithy-rs#1390"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "Velfi"

[[smithy-rs]]
message = "Add method `ByteStream::into_async_read`. This makes it easy to convert `ByteStream`s into a struct implementing `tokio:io::AsyncRead`. Available on **crate feature** `rt-tokio` only."
references = ["smithy-rs#1390"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "Velfi"

[[smithy-rs]]
message = "Add ability to specify a different rust crate name than the one derived from the package name"
references = ["smithy-rs#1404"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "petrosagg"

[[smithy-rs]]
message = "Switch to [RustCrypto](https://github.com/RustCrypto)'s implementation of MD5."
references = ["smithy-rs#1404"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "petrosagg"

[[aws-sdk-rust]]
message = "Fix bug in profile file credential provider where a missing `default` profile lead to an unintended error."
references = ["aws-sdk-rust#547", "smithy-rs#1458"]
Expand All @@ -21,4 +57,4 @@ author = "rcoh"
message = "Add `Debug` implementation to several types in `aws-config`"
references = ["smithy-rs#1421"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "jdisanti"
author = "jdisanti"
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import software.amazon.smithy.rust.codegen.smithy.customizations.RetryConfigDeco
import software.amazon.smithy.rust.codegen.smithy.customizations.SleepImplDecorator
import software.amazon.smithy.rust.codegen.smithy.customizations.TimeoutConfigDecorator
import software.amazon.smithy.rust.codegen.smithy.customize.CombinedCodegenDecorator
import software.amazon.smithy.rust.codegen.smithy.customize.EventStreamDecorator
import software.amazon.smithy.rustsdk.customize.apigateway.ApiGatewayDecorator
import software.amazon.smithy.rustsdk.customize.auth.DisabledAuthDecorator
import software.amazon.smithy.rustsdk.customize.ec2.Ec2Decorator
Expand All @@ -24,7 +25,7 @@ val DECORATORS = listOf(
RegionDecorator(),
AwsEndpointDecorator(),
UserAgentDecorator(),
SigV4SigningDecorator(),
EventStreamDecorator(listOf(SigV4SigningDecorator())),
RetryPolicyDecorator(),
IntegrationTestDecorator(),
AwsFluentClientDecorator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import software.amazon.smithy.rust.codegen.rustlang.writable
import software.amazon.smithy.rust.codegen.smithy.CodegenContext
import software.amazon.smithy.rust.codegen.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.smithy.customize.EventStreamDecorator
import software.amazon.smithy.rust.codegen.smithy.customize.OperationCustomization
import software.amazon.smithy.rust.codegen.smithy.customize.OperationSection
import software.amazon.smithy.rust.codegen.smithy.customize.RustCodegenDecorator
import software.amazon.smithy.rust.codegen.smithy.generators.config.ConfigCustomization
import software.amazon.smithy.rust.codegen.smithy.generators.config.ServiceConfig
import software.amazon.smithy.rust.codegen.smithy.letIf
Expand All @@ -42,11 +42,11 @@ import software.amazon.smithy.rust.codegen.util.isInputEventStream
* - sets a default `OperationSigningConfig` A future enhancement will customize this for specific services that need
* different behavior.
*/
class SigV4SigningDecorator : RustCodegenDecorator {
class SigV4SigningDecorator : EventStreamDecorator(listOf()) {
override val name: String = "SigV4Signing"
override val order: Byte = 0

private fun applies(codegenContext: CodegenContext): Boolean = codegenContext.serviceShape.hasTrait<SigV4Trait>()
override fun applies(codegenContext: CodegenContext): Boolean = codegenContext.serviceShape.hasTrait<SigV4Trait>()

override fun configCustomizations(
codegenContext: CodegenContext,
Expand Down
40 changes: 39 additions & 1 deletion codegen-server-test/model/pokemon.smithy
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aws.protocols#restJson1
service PokemonService {
version: "2021-12-01",
resources: [PokemonSpecies],
operations: [GetServerStatistics, EmptyOperation],
operations: [GetServerStatistics, EmptyOperation, CapturePokemonOperation],
}

/// A Pokémon species forms the basis for at least one Pokémon.
Expand All @@ -22,6 +22,44 @@ resource PokemonSpecies {
read: GetPokemonSpecies,
}

/// Capture Pokémons via event streams
@http(uri: "/capture-pokemon-event", method: "POST")
operation CapturePokemonOperation {
input: CapturePokemonOperationEventsInput,
output: CapturePokemonOperationEventsOutput,
}

@input
structure CapturePokemonOperationEventsInput {
@httpPayload
events: AttemptCapturingPokemonEvent,
}

@output
structure CapturePokemonOperationEventsOutput {
@httpPayload
events: CapturePokemonEvents,
}

@streaming
union AttemptCapturingPokemonEvent {
event: CapturingEvent,
}

structure CapturingEvent {
name: String,
pokeball: String,
}

@streaming
union CapturePokemonEvents {
event: CaptureEvent,
}

structure CaptureEvent {
name: String,
}

/// Retrieve information about a Pokémon species.
@readonly
@http(uri: "/pokemon-species/{name}", method: "GET")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class PythonCodegenServerPlugin : SmithyBuildPlugin {
override fun execute(context: PluginContext) {
// Suppress extremely noisy logs about reserved words
Logger.getLogger(ReservedWordSymbolProvider::class.java.name).level = Level.OFF
// Discover [RustCodegenDecorators] on the classpath. [RustCodegenDectorator] return different types of
// Discover [RustCodegenDecorators] on the classpath. [RustCodegenDecorator] return different types of
// customization. A customization is a function of:
// - location (e.g. the mutate section of an operation)
// - context (e.g. the of the operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import software.amazon.smithy.rust.codegen.smithy.StreamingShapeSymbolProvider
import software.amazon.smithy.rust.codegen.smithy.SymbolVisitor
import software.amazon.smithy.rust.codegen.smithy.SymbolVisitorConfig
import software.amazon.smithy.rust.codegen.smithy.customize.CombinedCodegenDecorator
import software.amazon.smithy.rust.codegen.smithy.generators.CodegenTarget
import java.util.logging.Level
import java.util.logging.Logger

Expand Down Expand Up @@ -62,7 +63,7 @@ class RustCodegenServerPlugin : SmithyBuildPlugin {
SymbolVisitor(model, serviceShape = serviceShape, config = symbolVisitorConfig)
// Generate different types for EventStream shapes (e.g. transcribe streaming)
.let {
EventStreamSymbolProvider(symbolVisitorConfig.runtimeConfig, it, model)
EventStreamSymbolProvider(symbolVisitorConfig.runtimeConfig, it, model, CodegenTarget.SERVER)
}
// Generate [ByteStream] instead of `Blob` for streaming binary shapes (e.g. S3 GetObject)
.let { StreamingShapeSymbolProvider(it, model) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,10 @@ private class ServerHttpBoundProtocolTraitImplGenerator(
// Fallback to the default code of `http::response::Builder`, 200.

operationShape.outputShape(model).findStreamingMember(model)?.let {
val memberName = symbolProvider.toMemberName(it)
rustTemplate(
"""
let body = #{SmithyHttpServer}::body::to_boxed(#{SmithyHttpServer}::body::Body::wrap_stream(output.$memberName));
""",
*codegenScope,
)
val payloadGenerator = HttpBoundProtocolPayloadGenerator(codegenContext, protocol, httpMessageType = HttpMessageType.RESPONSE)
withBlockTemplate("let body = #{SmithyHttpServer}::body::boxed(#{SmithyHttpServer}::body::Body::wrap_stream(", "));", *codegenScope) {
payloadGenerator.generatePayload(this, "output", operationShape)
}
} ?: run {
val payloadGenerator = HttpBoundProtocolPayloadGenerator(codegenContext, protocol, httpMessageType = HttpMessageType.RESPONSE)
withBlockTemplate("let body = #{SmithyHttpServer}::body::to_boxed(", ");", *codegenScope) {
Expand Down Expand Up @@ -682,29 +679,29 @@ private class ServerHttpBoundProtocolTraitImplGenerator(
HttpLocation.HEADER -> writable { serverRenderHeaderParser(this, binding, operationShape) }
HttpLocation.PREFIX_HEADERS -> writable { serverRenderPrefixHeadersParser(this, binding, operationShape) }
HttpLocation.PAYLOAD -> {
return if (binding.member.isStreaming(model)) {
writable {
val structureShapeHandler: RustWriter.(String) -> Unit = { body ->
rust("#T($body)", structuredDataParser.payloadParser(binding.member))
}
val errorSymbol = getDeserializePayloadErrorSymbol(binding)
val deserializer = httpBindingGenerator.generateDeserializePayloadFn(
binding,
errorSymbol,
structuredHandler = structureShapeHandler
)
return writable {
if (binding.member.isStreaming(model)) {
rustTemplate(
"""
{
let body = request.take_body().ok_or(#{RequestRejection}::BodyAlreadyExtracted)?;
Some(body.into())
let bytes = #{Hyper}::body::to_bytes(body).await?;
Some(#{Deserializer}(&mut bytes.into())?)
}
""".trimIndent(),
""",
"Deserializer" to deserializer,
*codegenScope
)
}
} else {
val structureShapeHandler: RustWriter.(String) -> Unit = { body ->
rust("#T($body)", structuredDataParser.payloadParser(binding.member))
}
val errorSymbol = getDeserializePayloadErrorSymbol(binding)
val deserializer = httpBindingGenerator.generateDeserializePayloadFn(
binding,
errorSymbol,
structuredHandler = structureShapeHandler
)
writable {
} else {
rustTemplate(
"""
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@ import software.amazon.smithy.rust.codegen.rustlang.CargoDependency
import software.amazon.smithy.rust.codegen.rustlang.RustType
import software.amazon.smithy.rust.codegen.rustlang.render
import software.amazon.smithy.rust.codegen.rustlang.stripOuter
import software.amazon.smithy.rust.codegen.smithy.generators.CodegenTarget
import software.amazon.smithy.rust.codegen.smithy.generators.error.errorSymbol
import software.amazon.smithy.rust.codegen.smithy.traits.SyntheticInputTrait
import software.amazon.smithy.rust.codegen.smithy.traits.SyntheticOutputTrait
import software.amazon.smithy.rust.codegen.util.getTrait
import software.amazon.smithy.rust.codegen.util.isEventStream
import software.amazon.smithy.rust.codegen.util.isInputEventStream
import software.amazon.smithy.rust.codegen.util.isOutputEventStream

/**
* Wrapping symbol provider to wrap modeled types with the aws-smithy-http Event Stream send/receive types.
*/
class EventStreamSymbolProvider(
private val runtimeConfig: RuntimeConfig,
base: RustSymbolProvider,
private val model: Model
private val model: Model,
private val target: CodegenTarget,
) : WrappingSymbolProvider(base) {
private val smithyEventStream = CargoDependency.SmithyEventStream(runtimeConfig)
override fun toSymbol(shape: Shape): Symbol {
val initial = super.toSymbol(shape)

Expand All @@ -42,21 +46,30 @@ class EventStreamSymbolProvider(
}
// If we find an operation shape, then we can wrap the type
if (operationShape != null) {
val error = operationShape.errorSymbol(this).toSymbol()
val error = if (operationShape.errors.isNotEmpty()) {
operationShape.errorSymbol(this).toSymbol()
} else {
RuntimeType("MessageStreamError", smithyEventStream, "aws_smithy_http::event_stream")
.toSymbol()
}
val errorFmt = error.rustType().render(fullyQualified = true)
val innerFmt = initial.rustType().stripOuter<RustType.Option>().render(fullyQualified = true)
val outer = when (shape.isInputEventStream(model)) {
true -> "EventStreamInput<$innerFmt>"
val isSender = (shape.isInputEventStream(model) && target == CodegenTarget.CLIENT) ||
(shape.isOutputEventStream(model) && target == CodegenTarget.SERVER)
val outer = when (isSender) {
true -> "EventStreamSender<$innerFmt>"
else -> "Receiver<$innerFmt, $errorFmt>"
}
val rustType = RustType.Opaque(outer, "aws_smithy_http::event_stream")
return initial.toBuilder()
val symbol = initial.toBuilder()
.name(rustType.name)
.rustType(rustType)
.addReference(error)
.addReference(initial)
.addDependency(CargoDependency.SmithyHttp(runtimeConfig).withFeature("event-stream"))
.build()
if (operationShape.errors.isNotEmpty()) {
symbol.addReference(error)
}
return symbol.build()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import software.amazon.smithy.rust.codegen.rustlang.Attribute.Companion.NonExhau
import software.amazon.smithy.rust.codegen.rustlang.RustReservedWordSymbolProvider
import software.amazon.smithy.rust.codegen.smithy.customizations.ClientCustomizations
import software.amazon.smithy.rust.codegen.smithy.customize.CombinedCodegenDecorator
import software.amazon.smithy.rust.codegen.smithy.generators.CodegenTarget
import java.util.logging.Level
import java.util.logging.Logger

Expand Down Expand Up @@ -49,7 +50,7 @@ class RustCodegenPlugin : SmithyBuildPlugin {
fun baseSymbolProvider(model: Model, serviceShape: ServiceShape, symbolVisitorConfig: SymbolVisitorConfig = DefaultConfig) =
SymbolVisitor(model, serviceShape = serviceShape, config = symbolVisitorConfig)
// Generate different types for EventStream shapes (e.g. transcribe streaming)
.let { EventStreamSymbolProvider(symbolVisitorConfig.runtimeConfig, it, model) }
.let { EventStreamSymbolProvider(symbolVisitorConfig.runtimeConfig, it, model, CodegenTarget.CLIENT) }
// Generate `ByteStream` instead of `Blob` for streaming binary shapes (e.g. S3 GetObject)
.let { StreamingShapeSymbolProvider(it, model) }
// Add Rust attributes (like `#[derive(PartialEq)]`) to generated shapes
Expand Down
Loading

0 comments on commit 92e95f4

Please sign in to comment.