diff --git a/Cargo.lock b/Cargo.lock index 57fe116..35a1224 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstyle" version = "1.0.8" @@ -46,9 +61,15 @@ checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] +[[package]] +name = "autocfg" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + [[package]] name = "backtrace" version = "0.3.74" @@ -64,12 +85,30 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "base64" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "byteorder" version = "1.5.0" @@ -82,12 +121,78 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +[[package]] +name = "cc" +version = "1.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" +dependencies = [ + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "serde", + "wasm-bindgen", + "windows-targets", +] + +[[package]] +name = "cloudevents-sdk" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "801713078518ab05d7c78508c14cf55173a14a1a6659421d3352c2576a6167bf" +dependencies = [ + "base64", + "bitflags 1.3.2", + "chrono", + "delegate-attr", + "hostname", + "serde", + "serde_json", + "snafu", + "url", + "uuid", + "web-sys", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "delegate-attr" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee7e7ea0dba407429d816e8e38dda1a467cd74737722f2ccc8eae60429a1a3ab" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "downcast" version = "0.11.0" @@ -128,6 +233,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fragile" version = "2.0.0" @@ -166,6 +280,50 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indexmap" version = "2.5.0" @@ -176,6 +334,21 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "js-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -200,6 +373,12 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "mediatype" version = "0.19.18" @@ -244,7 +423,16 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn", + "syn 2.0.77", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", ] [[package]] @@ -268,6 +456,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -500,13 +694,89 @@ version = "0.38.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" dependencies = [ - "bitflags", + "bitflags 2.6.0", "errno", "libc", "linux-raw-sys", "windows-sys 0.52.0", ] +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "serde" +version = "1.0.210" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.210" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "serde_json" +version = "1.0.128" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "snafu" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.77" @@ -555,7 +825,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -566,7 +836,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", "test-case-core", ] @@ -587,9 +857,24 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] +[[package]] +name = "tinyvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.40.0" @@ -609,7 +894,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -632,24 +917,41 @@ dependencies = [ "once_cell", ] +[[package]] +name = "unicode-bidi" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" + [[package]] name = "unicode-ident" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +[[package]] +name = "unicode-normalization" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" +dependencies = [ + "tinyvec", +] + [[package]] name = "up-rust" version = "0.2.0" dependencies = [ "async-trait", "bytes", + "cloudevents-sdk", "mediatype", "mockall", "protobuf", "protobuf-codegen", "protoc-bin-vendored", "rand", + "serde_json", "test-case", "thiserror", "tokio", @@ -668,6 +970,27 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "url" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + [[package]] name = "uuid-simd" version = "0.8.0" @@ -690,6 +1013,71 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.77", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" + +[[package]] +name = "web-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "which" version = "4.4.2" @@ -702,6 +1090,37 @@ dependencies = [ "rustix", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -802,5 +1221,5 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] diff --git a/Cargo.toml b/Cargo.toml index 331c72d..4b32131 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ version = "0.2.0" [features] default = ["communication"] +cloudevents = ["dep:cloudevents-sdk", "dep:serde_json"] communication = ["usubscription", "dep:thiserror", "tokio/sync", "tokio/time"] udiscovery = [] usubscription = [] @@ -42,9 +43,11 @@ util = ["tokio/sync"] [dependencies] async-trait = { version = "0.1" } bytes = { version = "1.7" } +cloudevents-sdk = { version = "0.7.0", optional = true } mediatype = "0.19" protobuf = { version = "3.5", features = ["with-bytes"] } rand = { version = "0.8" } +serde_json = { version = "1.0", optional = true } thiserror = { version = "1.0", optional = true } tokio = { version = "1.40", default-features = false, optional = true } tracing = { version = "0.1", default-features = false, features = [ diff --git a/src/cloudevents.rs b/src/cloudevents.rs new file mode 100644 index 0000000..532772a --- /dev/null +++ b/src/cloudevents.rs @@ -0,0 +1,627 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use crate::{ + UAttributes, UAttributesValidators, UCode, UMessage, UMessageType, UPayloadFormat, UPriority, + UStatus, UUri, UUID, +}; +use bytes::Bytes; +use cloudevents::{ + event::ExtensionValue, AttributesReader, Data, Event, EventBuilder, EventBuilderV10, +}; +use protobuf::EnumOrUnknown; + +/// This module contains functions for mapping uProtocol `UMessage`s to CloudEvents +/// as defined in the uProtocol specification. + +pub const CONTENT_TYPE_CLOUDEVENTS_JSON: &str = "application/cloudevents+json"; +pub const CONTENT_TYPE_CLOUDEVENTS_PROTOBUF: &str = "application/cloudevents+protobuf"; + +const EXTENSION_NAME_COMMSTATUS: &str = "commstatus"; +const EXTENSION_NAME_PERMISSION_LEVEL: &str = "plevel"; +const EXTENSION_NAME_PRIORITY: &str = "priority"; +const EXTENSION_NAME_REQUEST_ID: &str = "reqid"; +const EXTENSION_NAME_SINK: &str = "sink"; +const EXTENSION_NAME_TTL: &str = "ttl"; +const EXTENSION_NAME_TOKEN: &str = "token"; +const EXTENSION_NAME_TRACEPARENT: &str = "traceparent"; + +// Creates a CloudEvent from a uProtocol message. +// +// # Arguments +// +// * `message` - The message to create the event from. The message is being consumed by this function. +// * `skip_validation` - `true` if the given message known to be a valid uProtocol message. +// +// # Errors +// +// Returns an error if the given message does not contain the necessary information for creating a CloudEvent. +// Also returns an error if `skip_validation` is `false` and the message is not a valid uProtocol message. +// +// [impl->dsn~cloudevents-umessage-mapping~1] +pub fn get_cloudevent(message: UMessage, skip_validation: bool) -> Result { + let Some(attributes) = message.attributes.as_ref() else { + return Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "message has no attributes", + )); + }; + if !skip_validation { + UAttributesValidators::get_validator_for_attributes(attributes) + .validate(attributes) + .map_err(|e| UStatus::fail_with_code(UCode::INVALID_ARGUMENT, e.to_string()))?; + } + let mut event = EventBuilderV10::new() + .id(attributes + .id + .as_ref() + .unwrap_or(&UUID::build()) + .to_hyphenated_string()) + .ty(attributes + .type_ + .enum_value_or_default() + .to_cloudevent_type()) + .source(attributes.source.get_or_default().to_uri(false)) + .build() + .map_err(|_e| { + UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "cannot map UMessage to Cloudevent") + })?; + if let Some(sink) = attributes.sink.as_ref() { + event.set_extension(EXTENSION_NAME_SINK, sink.to_uri(false)); + } + if let Ok(priority) = attributes.priority.enum_value() { + if priority != UPriority::UPRIORITY_UNSPECIFIED { + event.set_extension(EXTENSION_NAME_PRIORITY, priority.to_priority_code()); + } + } + if let Some(ttl) = attributes.ttl { + event.set_extension(EXTENSION_NAME_TTL, ttl as i64); + } + if let Some(token) = attributes.token.as_ref() { + event.set_extension(EXTENSION_NAME_TOKEN, token.to_owned()); + } + if let Some(plevel) = attributes.permission_level { + event.set_extension(EXTENSION_NAME_PERMISSION_LEVEL, plevel as i64); + } + if let Some(reqid) = attributes.reqid.as_ref() { + event.set_extension(EXTENSION_NAME_REQUEST_ID, reqid.to_hyphenated_string()); + } + if let Some(commstatus) = attributes.commstatus.as_ref() { + event.set_extension(EXTENSION_NAME_COMMSTATUS, commstatus.value() as i64); + } + if let Some(traceparent) = attributes.traceparent.as_ref() { + event.set_extension(EXTENSION_NAME_TRACEPARENT, traceparent.to_owned()); + } + if let Some(payload) = message.payload { + let payload_format = attributes.payload_format.enum_value_or_default(); + let data = match payload_format { + UPayloadFormat::UPAYLOAD_FORMAT_JSON | UPayloadFormat::UPAYLOAD_FORMAT_TEXT => { + Data::String(String::from_utf8(payload.to_vec()).unwrap()) + } + _ => Data::Binary(payload.to_vec()), + }; + event.set_data( + payload_format.to_media_type().unwrap_or("".to_string()), + data, + ); + } + Ok(event) +} + +// Creates a uProtocol message from a CloudEvent. +// +// # Arguments +// +// * `event` - The CloudEvent to create the message from. +// +// # Errors +// +// Returns an error if the given event does not contain the necessary information for creating a uProtocol message. +// Also returns an error if `skip_validation` is `false` and the resulting message is not a valid uProtocol message. +// +// [impl->dsn~cloudevents-umessage-mapping~1] +pub fn get_umessage( + event: Event, + skip_validation: bool, +) -> Result> { + let message_type = UMessageType::try_from_cloudevent_type(event.ty())?; + let id = event.id().parse::()?; + let source_uri = event.source().to_string().parse()?; + + let sink_uri = match event.extension(EXTENSION_NAME_SINK) { + Some(ExtensionValue::String(sink)) => Some(sink.parse::()?), + _ => None, + }; + + let priority = match event.extension(EXTENSION_NAME_PRIORITY) { + Some(ExtensionValue::String(code)) => Some(UPriority::try_from_priority_code(code)?), + _ => None, + }; + + let ttl = match event.extension(EXTENSION_NAME_TTL) { + Some(ExtensionValue::Integer(ttl)) => Some(u32::try_from(*ttl)?), + _ => None, + }; + + let token = match event.extension(EXTENSION_NAME_TOKEN) { + Some(ExtensionValue::String(token)) => Some(token.to_string()), + _ => None, + }; + + let permission_level = match event.extension(EXTENSION_NAME_PERMISSION_LEVEL) { + Some(ExtensionValue::Integer(level)) => Some(u32::try_from(*level)?), + _ => None, + }; + + let reqid = match event.extension(EXTENSION_NAME_REQUEST_ID) { + Some(ExtensionValue::String(uuid)) => Some(uuid.parse::()?), + _ => None, + }; + + let commstatus = match event.extension(EXTENSION_NAME_COMMSTATUS) { + Some(ExtensionValue::Integer(code)) => { + i32::try_from(*code).map(|v| Some(EnumOrUnknown::::from_i32(v)))? + } + _ => None, + }; + + let traceparent = match event.extension(EXTENSION_NAME_TRACEPARENT) { + Some(ExtensionValue::String(traceparent)) => Some(traceparent.to_string()), + _ => None, + }; + + let payload = match event.data() { + Some(Data::Binary(buf)) => Some(Bytes::copy_from_slice(buf.as_slice())), + Some(Data::String(text)) => Some(Bytes::copy_from_slice(text.as_bytes())), + Some(Data::Json(json)) => { + Some(serde_json::to_vec(json).map(|v| Bytes::copy_from_slice(v.as_slice()))?) + } + _ => None, + }; + + let payload_format = match event.datacontenttype() { + Some(media_type) => Some(UPayloadFormat::from_media_type(media_type)?), + _ => None, + }; + + let attributes = UAttributes { + commstatus, + id: Some(id).into(), + type_: message_type.into(), + source: Some(source_uri).into(), + sink: sink_uri.into(), + priority: priority.unwrap_or(UPriority::default()).into(), + ttl, + permission_level, + reqid: reqid.into(), + token, + traceparent, + payload_format: payload_format.unwrap_or(UPayloadFormat::default()).into(), + ..Default::default() + }; + if !skip_validation { + UAttributesValidators::get_validator_for_attributes(&attributes).validate(&attributes)?; + } + Ok(UMessage { + attributes: Some(attributes).into(), + payload, + ..Default::default() + }) +} + +// [utest->dsn~cloudevents-umessage-mapping~1] +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use cloudevents::event::SpecVersion; + use protobuf::Enum; + + use crate::UMessageBuilder; + + use super::*; + + const MESSAGE_ID: &str = "00000000-0001-7000-8010-101010101a1a"; + const TOPIC: &str = "//my-vehicle/A81B/1/A9BA"; + const METHOD: &str = "//my-vehicle/A000/2/1"; + const REPLY_TO: &str = "//my-vehicle/A81B/1/0"; + const DESTINATION: &str = "//my-vehicle/A000/2/0"; + const PERMISSION_LEVEL: u32 = 5; + const PRIORITY: UPriority = UPriority::UPRIORITY_CS4; + const TTL: u32 = 15_000; + const TRACEPARENT: &str = "traceparent"; + const DATA: [u8; 4] = [0x00, 0x01, 0x02, 0x03]; + + // + // tests asserting conversion of UMessage -> CloudEvent + // + + fn assert_standard_cloudevent_attributes( + event: &Event, + message_type: &str, + source: &str, + sink: Option, + ) { + assert_eq!(event.specversion(), SpecVersion::V10); + assert_eq!(event.ty(), message_type); + assert_eq!(event.id(), MESSAGE_ID); + assert_eq!(event.source().as_str(), source); + assert_eq!( + event.extension(EXTENSION_NAME_SINK).map(|v| v.to_string()), + sink + ); + assert_eq!( + event.extension(EXTENSION_NAME_PRIORITY), + Some(&ExtensionValue::String(PRIORITY.to_priority_code())) + ); + assert_eq!( + event.extension(EXTENSION_NAME_TTL), + Some(&ExtensionValue::Integer(TTL as i64)) + ); + assert_eq!( + event.extension(EXTENSION_NAME_TRACEPARENT), + Some(&ExtensionValue::String(TRACEPARENT.to_string())) + ); + } + + #[test] + fn test_get_cloudevent_fails_for_invalid_message() { + let invalid_attributes = UAttributes { + type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(), + id: Some(UUID::build()).into(), + source: Some(TOPIC.parse::().unwrap()).into(), + ..Default::default() + }; + let invalid_message = UMessage { + attributes: Some(invalid_attributes).into(), + ..Default::default() + }; + assert!(get_cloudevent(invalid_message.clone(), false) + .is_err_and(|e| e.get_code() == UCode::INVALID_ARGUMENT)); + assert!(get_cloudevent(invalid_message, true).is_ok()); + } + + #[test] + fn test_get_cloudevent_for_publish_message_succeeds() { + let message_id = MESSAGE_ID + .parse::() + .expect("failed to parse message ID"); + let message = + UMessageBuilder::publish(UUri::from_str(TOPIC).expect("failed to create topic URI")) + .with_message_id(message_id) + .with_priority(PRIORITY) + .with_ttl(TTL) + .with_traceparent(TRACEPARENT) + .build_with_payload("test".as_bytes(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .expect("failed to create message"); + + let event = + get_cloudevent(message, false).expect("failed to create CloudEvent from UMessage"); + assert_standard_cloudevent_attributes(&event, "pub.v1", TOPIC, None); + assert_eq!( + event.datacontenttype().map(|v| v.to_string()), + UPayloadFormat::UPAYLOAD_FORMAT_TEXT.to_media_type() + ); + match event.data() { + Some(Data::String(payload)) => { + assert_eq!(payload, "test") + } + _ => panic!("unexpected payload format"), + } + } + + #[test] + fn test_get_cloudevent_for_notification_message_succeeds() { + let message_id = MESSAGE_ID + .parse::() + .expect("failed to parse message ID"); + let message = UMessageBuilder::notification( + UUri::from_str(TOPIC).expect("failed to create source URI"), + UUri::from_str(DESTINATION).expect("failed to create sink URI"), + ) + .with_message_id(message_id) + .with_priority(PRIORITY) + .with_ttl(TTL) + .with_traceparent(TRACEPARENT) + .build_with_payload( + "{\"count\": 5}".as_bytes(), + UPayloadFormat::UPAYLOAD_FORMAT_JSON, + ) + .expect("failed to create message"); + + let event = + get_cloudevent(message, false).expect("failed to create CloudEvent from UMessage"); + assert_standard_cloudevent_attributes( + &event, + "not.v1", + TOPIC, + Some(DESTINATION.to_string()), + ); + assert_eq!( + event.datacontenttype().map(|v| v.to_string()), + UPayloadFormat::UPAYLOAD_FORMAT_JSON.to_media_type() + ); + match event.data() { + Some(Data::String(payload)) => { + assert_eq!(payload, "{\"count\": 5}") + } + _ => panic!("unexpected payload format"), + } + } + + #[test] + fn test_get_cloudevent_for_request_message_succeeds() { + let message_id = MESSAGE_ID + .parse::() + .expect("failed to parse message ID"); + let token = "my-token"; + let message = UMessageBuilder::request( + UUri::from_str(METHOD).expect("failed to create sink URI"), + UUri::from_str(REPLY_TO).expect("failed to create source URI"), + TTL, + ) + .with_message_id(message_id) + .with_priority(PRIORITY) + .with_permission_level(PERMISSION_LEVEL) + .with_traceparent(TRACEPARENT) + .with_token(token) + .build() + .expect("failed to create message"); + + let event = + get_cloudevent(message, false).expect("failed to create CloudEvent from UMessage"); + assert_standard_cloudevent_attributes(&event, "req.v1", REPLY_TO, Some(METHOD.to_string())); + assert_eq!( + event.extension(EXTENSION_NAME_TOKEN), + Some(&ExtensionValue::String(token.to_string())) + ); + assert_eq!( + event.extension(EXTENSION_NAME_PERMISSION_LEVEL), + Some(&ExtensionValue::Integer(PERMISSION_LEVEL as i64)) + ); + assert!(event.datacontenttype().is_none()); + assert!(event.data().is_none()); + } + + #[test] + fn test_get_cloudevent_for_response_message_succeeds() { + let message_id = MESSAGE_ID + .parse::() + .expect("failed to parse message ID"); + let request_id = UUID::build(); + + let message = UMessageBuilder::response( + UUri::from_str(REPLY_TO).expect("failed to create sink URI"), + request_id.clone(), + UUri::from_str(METHOD).expect("failed to create source URI"), + ) + .with_message_id(message_id) + .with_ttl(TTL) + .with_priority(PRIORITY) + .with_comm_status(UCode::OK) + .with_traceparent(TRACEPARENT) + .build_with_payload(DATA.to_vec(), UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF) + .expect("failed to create message"); + + let event = + get_cloudevent(message, false).expect("failed to create CloudEvent from UMessage"); + assert_standard_cloudevent_attributes(&event, "res.v1", METHOD, Some(REPLY_TO.to_string())); + assert_eq!( + event.extension(EXTENSION_NAME_COMMSTATUS), + Some(&ExtensionValue::Integer(UCode::OK.value() as i64)) + ); + assert_eq!( + event.datacontenttype().map(|v| v.to_string()), + UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF.to_media_type() + ); + match event.data() { + Some(Data::Binary(payload)) => { + assert_eq!(payload, &DATA.to_vec()) + } + _ => panic!("unexpected payload format"), + } + } + + // + // tests asserting conversion of CloudEvent -> UMessage + // + + fn assert_standard_umessage_attributes( + attribs: &UAttributes, + message_type: UMessageType, + source: &str, + sink: Option, + ) { + assert_eq!(attribs.type_.enum_value_or_default(), message_type); + assert_eq!( + attribs.id.get_or_default().to_hyphenated_string(), + MESSAGE_ID + ); + assert_eq!(attribs.source.get_or_default().to_uri(false), source); + assert_eq!(attribs.sink.as_ref().map(|uuri| uuri.to_uri(false)), sink); + assert_eq!( + attribs.priority.enum_value_or_default(), + UPriority::UPRIORITY_CS4 + ); + assert_eq!(attribs.ttl, Some(TTL)); + assert_eq!(attribs.traceparent, Some(TRACEPARENT.to_string())); + } + + #[test] + fn test_get_umessage_fails_for_cloudevent_with_missing_sink() { + let event = cloudevents::EventBuilderV10::new() + .ty(UMessageType::UMESSAGE_TYPE_NOTIFICATION.to_cloudevent_type()) + .id(MESSAGE_ID) + .source(TOPIC) + .build() + .expect("failed to create CloudEvent"); + + assert!(get_umessage(event.clone(), false).is_err()); + assert!( + get_umessage(event, true).is_ok(), + "skipping validation should have ignored the missing destination" + ); + } + + #[test] + fn test_get_umessage_for_publish_cloudevent_succeeds() { + let event = cloudevents::EventBuilderV10::new() + .ty(UMessageType::UMESSAGE_TYPE_PUBLISH.to_cloudevent_type()) + .id(MESSAGE_ID) + .source(TOPIC) + .extension( + EXTENSION_NAME_PRIORITY, + UPriority::UPRIORITY_CS4.to_priority_code(), + ) + .extension(EXTENSION_NAME_TTL, TTL as i64) + .extension(EXTENSION_NAME_TRACEPARENT, TRACEPARENT) + .data( + UPayloadFormat::UPAYLOAD_FORMAT_TEXT + .to_media_type() + .unwrap(), + Data::String("test".to_string()), + ) + .build() + .expect("failed to create CloudEvent"); + + let umessage = + get_umessage(event, false).expect("failed to create UMessage from CloudEvent"); + let attribs = umessage.attributes.get_or_default(); + assert_standard_umessage_attributes( + attribs, + UMessageType::UMESSAGE_TYPE_PUBLISH, + TOPIC, + None, + ); + assert_eq!( + attribs.payload_format.enum_value_or_default(), + UPayloadFormat::UPAYLOAD_FORMAT_TEXT + ); + assert_eq!(umessage.payload, Some("test".as_bytes().to_vec().into())) + } + + #[test] + fn test_get_umessage_for_notification_cloudevent_succeeds() { + let event = cloudevents::EventBuilderV10::new() + .ty(UMessageType::UMESSAGE_TYPE_NOTIFICATION.to_cloudevent_type()) + .id(MESSAGE_ID) + .source(TOPIC) + .extension(EXTENSION_NAME_SINK, DESTINATION) + .extension( + EXTENSION_NAME_PRIORITY, + UPriority::UPRIORITY_CS4.to_priority_code(), + ) + .extension(EXTENSION_NAME_TTL, TTL as i64) + .extension(EXTENSION_NAME_TRACEPARENT, TRACEPARENT) + .data( + UPayloadFormat::UPAYLOAD_FORMAT_JSON + .to_media_type() + .unwrap(), + Data::String("{\"count\": 5}".to_string()), + ) + .build() + .expect("failed to create CloudEvent"); + + let umessage = + get_umessage(event, false).expect("failed to create UMessage from CloudEvent"); + let attribs = umessage.attributes.get_or_default(); + assert_standard_umessage_attributes( + attribs, + UMessageType::UMESSAGE_TYPE_NOTIFICATION, + TOPIC, + Some(DESTINATION.to_string()), + ); + assert_eq!( + attribs.payload_format.enum_value_or_default(), + UPayloadFormat::UPAYLOAD_FORMAT_JSON + ); + assert_eq!( + umessage.payload, + Some("{\"count\": 5}".as_bytes().to_vec().into()) + ) + } + + #[test] + fn test_get_umessage_for_request_cloudevent_succeeds() { + let event = cloudevents::EventBuilderV10::new() + .ty(UMessageType::UMESSAGE_TYPE_REQUEST.to_cloudevent_type()) + .id(MESSAGE_ID) + .source(REPLY_TO) + .extension(EXTENSION_NAME_SINK, METHOD) + .extension( + EXTENSION_NAME_PRIORITY, + UPriority::UPRIORITY_CS4.to_priority_code(), + ) + .extension(EXTENSION_NAME_PERMISSION_LEVEL, PERMISSION_LEVEL as i64) + .extension(EXTENSION_NAME_TOKEN, "my-token") + .extension(EXTENSION_NAME_TTL, TTL as i64) + .extension(EXTENSION_NAME_TRACEPARENT, TRACEPARENT) + .build() + .expect("failed to create CloudEvent"); + + let umessage = + get_umessage(event, false).expect("failed to create UMessage from CloudEvent"); + let attribs = umessage.attributes.get_or_default(); + assert_standard_umessage_attributes( + attribs, + UMessageType::UMESSAGE_TYPE_REQUEST, + REPLY_TO, + Some(METHOD.to_string()), + ); + assert_eq!(attribs.permission_level, Some(PERMISSION_LEVEL)); + assert_eq!(attribs.token, Some("my-token".to_string())); + } + + #[test] + fn test_get_umessage_for_response_cloudevent_succeeds() { + let request_id = UUID::build(); + let event = cloudevents::EventBuilderV10::new() + .ty(UMessageType::UMESSAGE_TYPE_RESPONSE.to_cloudevent_type()) + .id(MESSAGE_ID) + .source(METHOD) + .extension(EXTENSION_NAME_SINK, REPLY_TO) + .extension( + EXTENSION_NAME_PRIORITY, + UPriority::UPRIORITY_CS4.to_priority_code(), + ) + .extension(EXTENSION_NAME_COMMSTATUS, UCode::OK.value() as i64) + .extension(EXTENSION_NAME_TTL, TTL as i64) + .extension(EXTENSION_NAME_TRACEPARENT, TRACEPARENT) + .extension(EXTENSION_NAME_REQUEST_ID, request_id.to_hyphenated_string()) + .data( + UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF + .to_media_type() + .unwrap(), + Data::Binary(DATA.to_vec()), + ) + .build() + .expect("failed to create CloudEvent"); + + let umessage = + get_umessage(event, false).expect("failed to create UMessage from CloudEvent"); + let attribs = umessage.attributes.get_or_default(); + assert_standard_umessage_attributes( + attribs, + UMessageType::UMESSAGE_TYPE_RESPONSE, + METHOD, + Some(REPLY_TO.to_string()), + ); + assert_eq!(attribs.commstatus, Some(UCode::OK.into())); + assert_eq!(attribs.reqid, Some(request_id).into()); + assert_eq!( + attribs.payload_format.enum_value_or_default(), + UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF + ); + assert_eq!(umessage.payload, Some(DATA.to_vec().into())) + } +} diff --git a/src/lib.rs b/src/lib.rs index 36cf60f..1ad1192 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,9 @@ For user convenience, all of these modules export their types on up_rust top-lev */ // up_core_api types used and augmented by up_rust - symbols re-exported to toplevel, errors are module-specific +#[cfg(feature = "cloudevents")] +pub mod cloudevents; + #[cfg(feature = "communication")] pub mod communication; #[cfg(feature = "util")]