diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 58873be42..84b3a121d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,6 +20,14 @@ jobs: runs-on: ${{ matrix.platform }} timeout-minutes: 60 steps: + - name: Print available space (Windows only) + run: Get-PSDrive + if: runner.os == 'Windows' + - name: Override cargo target dir (Windows only) + run: echo "CARGO_TARGET_DIR=C:\cargo-target" >> "$GITHUB_ENV" + shell: bash + if: runner.os == 'Windows' + - uses: actions/checkout@v3 - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose @@ -50,6 +58,7 @@ jobs: cache-on-failure: true # only save caches for `main` branch save-if: ${{ github.ref == 'refs/heads/main' }} + cache-directories: ${{ env.CARGO_TARGET_DIR }} - name: "Check" run: cargo check --all diff --git a/Cargo.lock b/Cargo.lock index 404a7a2cc..885ac2d1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "aligned-vec" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4aa90d7ce82d4be67b64039a3d588d38dbcc6736577de4a847025ce5b0c468d1" +dependencies = [ + "serde", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -136,9 +145,9 @@ checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" [[package]] name = "arrow" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7104b9e9761613ae92fe770c741d6bbf1dbc791a0fe204400aebdd429875741" +checksum = "edb738d83750ec705808f6d44046d165e6bb8623f64e29a4d53fcb136ab22dfb" dependencies = [ "ahash", "arrow-arith", @@ -159,9 +168,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38e597a8e8efb8ff52c50eaf8f4d85124ce3c1bf20fab82f476d73739d9ab1c2" +checksum = "c5c3d17fc5b006e7beeaebfb1d2edfc92398b981f82d9744130437909b72a468" dependencies = [ "arrow-array", "arrow-buffer", @@ -174,9 +183,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a86d9c1473db72896bd2345ebb6b8ad75b8553ba390875c76708e8dc5c5492d" +checksum = "55705ada5cdde4cb0f202ffa6aa756637e33fea30e13d8d0d0fd6a24ffcee1e3" dependencies = [ "ahash", "arrow-buffer", @@ -190,19 +199,20 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234b3b1c8ed00c874bf95972030ac4def6f58e02ea5a7884314388307fb3669b" +checksum = "a722f90a09b94f295ab7102542e97199d3500128843446ef63e410ad546c5333" dependencies = [ + "bytes", "half", "num", ] [[package]] name = "arrow-cast" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22f61168b853c7faea8cea23a2169fdff9c82fb10ae5e2c07ad1cab8f6884931" +checksum = "af01fc1a06f6f2baf31a04776156d47f9f31ca5939fe6d00cd7a059f95a46ff1" dependencies = [ "arrow-array", "arrow-buffer", @@ -217,9 +227,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b545c114d9bf8569c84d2fbe2020ac4eea8db462c0a37d0b65f41a90d066fe" +checksum = "83cbbfde86f9ecd3f875c42a73d8aeab3d95149cd80129b18d09e039ecf5391b" dependencies = [ "arrow-array", "arrow-buffer", @@ -236,9 +246,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6b6852635e7c43e5b242841c7470606ff0ee70eef323004cacc3ecedd33dd8f" +checksum = "d0a547195e607e625e7fafa1a7269b8df1a4a612c919efd9b26bd86e74538f3a" dependencies = [ "arrow-buffer", "arrow-schema", @@ -248,9 +258,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66da9e16aecd9250af0ae9717ae8dd7ea0d8ca5a3e788fe3de9f4ee508da751" +checksum = "e36bf091502ab7e37775ff448413ef1ffff28ff93789acb669fffdd51b394d51" dependencies = [ "arrow-array", "arrow-buffer", @@ -262,9 +272,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60ee0f9d8997f4be44a60ee5807443e396e025c23cf14d2b74ce56135cb04474" +checksum = "7ac346bc84846ab425ab3c8c7b6721db90643bc218939677ed7e071ccbfb919d" dependencies = [ "arrow-array", "arrow-buffer", @@ -282,9 +292,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcab05410e6b241442abdab6e1035177dc082bdb6f17049a4db49faed986d63" +checksum = "4502123d2397319f3a13688432bc678c61cb1582f2daa01253186da650bf5841" dependencies = [ "arrow-array", "arrow-buffer", @@ -297,9 +307,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91a847dd9eb0bacd7836ac63b3475c68b2210c2c96d0ec1b808237b973bd5d73" +checksum = "249fc5a07906ab3f3536a6e9f118ec2883fbcde398a97a5ba70053f0276abda4" dependencies = [ "ahash", "arrow-array", @@ -312,9 +322,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54df8c47918eb634c20e29286e69494fdc20cafa5173eb6dad49c7f6acece733" +checksum = "9d7a8c3f97f5ef6abd862155a6f39aaba36b029322462d72bbcfa69782a50614" dependencies = [ "bitflags 2.4.0", "serde", @@ -322,10 +332,11 @@ dependencies = [ [[package]] name = "arrow-select" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "941dbe481da043c4bd40c805a19ec2fc008846080c4953171b62bcad5ee5f7fb" +checksum = "f868f4a5001429e20f7c1994b5cd1aa68b82e3db8cf96c559cdb56dc8be21410" dependencies = [ + "ahash", "arrow-array", "arrow-buffer", "arrow-data", @@ -335,9 +346,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "45.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "359b2cd9e071d5a3bcf44679f9d85830afebc5b9c98a08019a570a65ae933e0f" +checksum = "a27fdf8fc70040a2dee78af2e217479cb5b263bd7ab8711c7999e74056eb688a" dependencies = [ "arrow-array", "arrow-buffer", @@ -346,7 +357,7 @@ dependencies = [ "arrow-select", "num", "regex", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] [[package]] @@ -1330,6 +1341,14 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dora-arrow-convert" +version = "0.2.6" +dependencies = [ + "arrow", + "eyre", +] + [[package]] name = "dora-cli" version = "0.2.6" @@ -1383,6 +1402,7 @@ dependencies = [ name = "dora-core" version = "0.2.6" dependencies = [ + "aligned-vec", "dora-message", "eyre", "once_cell", @@ -1400,6 +1420,7 @@ dependencies = [ name = "dora-daemon" version = "0.2.6" dependencies = [ + "aligned-vec", "arrow-schema", "async-trait", "bincode", @@ -1479,10 +1500,12 @@ dependencies = [ name = "dora-node-api" version = "0.2.6" dependencies = [ + "aligned-vec", "arrow", "arrow-schema", "bincode", "capnp", + "dora-arrow-convert", "dora-core", "dora-tracing", "eyre", @@ -1505,6 +1528,7 @@ dependencies = [ name = "dora-node-api-c" version = "0.2.6" dependencies = [ + "arrow-array", "dora-node-api", "eyre", "flume", @@ -1542,6 +1566,7 @@ dependencies = [ name = "dora-operator-api" version = "0.2.6" dependencies = [ + "dora-arrow-convert", "dora-operator-api-macros", "dora-operator-api-types", ] @@ -1575,6 +1600,7 @@ dependencies = [ name = "dora-operator-api-python" version = "0.2.6" dependencies = [ + "aligned-vec", "arrow", "arrow-schema", "dora-node-api", @@ -1588,6 +1614,8 @@ dependencies = [ name = "dora-operator-api-types" version = "0.2.6" dependencies = [ + "arrow", + "dora-arrow-convert", "safer-ffi", ] @@ -1658,6 +1686,7 @@ dependencies = [ name = "dora-runtime" version = "0.2.6" dependencies = [ + "aligned-vec", "arrow", "arrow-schema", "clap 4.4.6", @@ -2495,9 +2524,9 @@ dependencies = [ [[package]] name = "indoc" -version = "1.0.9" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa799dd5ed20a7e349f3b4639aa80d74549c81716d9ec4f994c9b5815598306" +checksum = "1e186cfbae8084e513daff4240b4797e342f988cecda4fb6c939150f96315fd8" [[package]] name = "inotify" @@ -4057,9 +4086,9 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.19.2" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e681a6cfdc4adcc93b4d3cf993749a4552018ee0a9b65fc0ccfad74352c72a38" +checksum = "04e8453b658fe480c3e70c8ed4e3d3ec33eb74988bd186561b0cc66b85c3bc4b" dependencies = [ "cfg-if 1.0.0", "eyre", @@ -4076,9 +4105,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.19.2" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "076c73d0bc438f7a4ef6fdd0c3bb4732149136abd952b110ac93e4edb13a6ba5" +checksum = "a96fe70b176a89cff78f2fa7b3c930081e163d5379b4dcdf993e3ae29ca662e5" dependencies = [ "once_cell", "target-lexicon", @@ -4086,9 +4115,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.19.2" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e53cee42e77ebe256066ba8aa77eff722b3bb91f3419177cf4cd0f304d3284d9" +checksum = "214929900fd25e6604661ed9cf349727c8920d47deff196c4e28165a6ef2a96b" dependencies = [ "libc", "pyo3-build-config", @@ -4096,32 +4125,33 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.19.2" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfeb4c99597e136528c6dd7d5e3de5434d1ceaf487436a3f03b2d56b6fc9efd1" +checksum = "dac53072f717aa1bfa4db832b39de8c875b7c7af4f4a6fe93cdbf9264cf8383b" dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn 1.0.109", + "syn 2.0.38", ] [[package]] name = "pyo3-macros-backend" -version = "0.19.2" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "947dc12175c254889edc0c02e399476c2f652b4b9ebd123aa655c224de259536" +checksum = "7774b5a8282bd4f25f803b1f0d945120be959a36c72e08e7cd031c792fdfd424" dependencies = [ + "heck 0.4.1", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.38", ] [[package]] name = "pythonize" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e35b716d430ace57e2d1b4afb51c9e5b7c46d2bce72926e07f9be6a98ced03e" +checksum = "ffd1c3ef39c725d63db5f9bc455461bafd80540cb7824c61afb823501921a850" dependencies = [ "pyo3", "serde", @@ -4343,6 +4373,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "reqwest" version = "0.11.22" @@ -5655,9 +5691,9 @@ checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" [[package]] name = "unindent" -version = "0.1.11" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1766d682d402817b5ac4490b3c3002d91dfa0d22812f341609f97b08757359c" +checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" [[package]] name = "uninit" diff --git a/Cargo.toml b/Cargo.toml index 4069d98b2..cd8a6fa59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "examples/rust-ros2-dataflow/*", "examples/benchmark/*", "examples/multiple-daemons/*", + "libraries/arrow-convert", "libraries/communication-layer/*", "libraries/core", "libraries/message", @@ -45,6 +46,7 @@ dora-operator-api-python = { version = "0.2.6", path = "apis/python/operator" } dora-operator-api-c = { version = "0.2.6", path = "apis/c/operator" } dora-node-api-c = { version = "0.2.6", path = "apis/c/node" } dora-core = { version = "0.2.6", path = "libraries/core" } +dora-arrow-convert = { version = "0.2.6", path = "libraries/arrow-convert" } dora-tracing = { version = "0.2.6", path = "libraries/extensions/telemetry/tracing" } dora-metrics = { version = "0.2.6", path = "libraries/extensions/telemetry/metrics" } dora-download = { version = "0.2.6", path = "libraries/extensions/download" } @@ -56,6 +58,12 @@ dora-daemon = { version = "0.2.6", path = "binaries/daemon" } dora-coordinator = { version = "0.2.6", path = "binaries/coordinator" } dora-ros2-bridge = { path = "libraries/extensions/ros2-bridge" } dora-ros2-bridge-python = { path = "libraries/extensions/ros2-bridge/python" } +arrow = "48.0.0" +arrow-schema = "48.0.0" +arrow-data = "48.0.0" +arrow-array = "48.0.0" +pyo3 = "0.20.0" +pythonize = "0.20.0" [package] name = "dora-examples" diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index e953961a0..77fd31970 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -1,4 +1,8 @@ -use dora_node_api::{self, Event, EventStream}; +use dora_node_api::{ + self, + arrow::array::{AsArray, BinaryArray}, + Event, EventStream, +}; use eyre::bail; #[cxx::bridge] @@ -81,9 +85,10 @@ fn event_as_input(event: Box) -> eyre::Result { let Some(Event::Input { id, metadata: _, data }) = event.0 else { bail!("not an input event"); }; + let data: Option<&BinaryArray> = data.as_binary_opt(); Ok(ffi::DoraInput { id: id.into(), - data: data.map(|d| d.to_owned()).unwrap_or_default(), + data: data.map(|d| d.value(0).to_owned()).unwrap_or_default(), }) } @@ -92,7 +97,7 @@ pub struct OutputSender(dora_node_api::DoraNode); fn send_output(sender: &mut Box, id: String, data: &[u8]) -> ffi::DoraResult { let result = sender .0 - .send_output(id.into(), Default::default(), data.len(), |out| { + .send_output_raw(id.into(), Default::default(), data.len(), |out| { out.copy_from_slice(data) }); let error = match result { diff --git a/apis/c++/operator/src/lib.rs b/apis/c++/operator/src/lib.rs index 511b44706..4ea7bab37 100644 --- a/apis/c++/operator/src/lib.rs +++ b/apis/c++/operator/src/lib.rs @@ -2,7 +2,7 @@ #![warn(unsafe_op_in_unsafe_fn)] use dora_operator_api::{ - self, register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, + self, register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, }; use ffi::DoraSendOutputResult; @@ -45,7 +45,7 @@ pub struct OutputSender<'a, 'b>(&'a mut DoraOutputSender<'b>); fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> DoraSendOutputResult { let error = sender .0 - .send(id.into(), data.to_owned()) + .send(id.into(), data.to_owned().into_arrow()) .err() .unwrap_or_default(); DoraSendOutputResult { error } @@ -75,6 +75,9 @@ impl DoraOperator for OperatorWrapper { Event::Input { id, data } => { let operator = self.operator.as_mut().unwrap(); let mut output_sender = OutputSender(output_sender); + let data: &[u8] = data + .try_into() + .map_err(|err| format!("expected byte array: {err}"))?; let result = ffi::on_input(operator, id, data, &mut output_sender); if result.error.is_empty() { diff --git a/apis/c/node/Cargo.toml b/apis/c/node/Cargo.toml index ec8235435..f5346ddad 100644 --- a/apis/c/node/Cargo.toml +++ b/apis/c/node/Cargo.toml @@ -21,6 +21,7 @@ tracing = ["dora-node-api/tracing"] eyre = "0.6.8" flume = "0.10.14" tracing = "0.1.33" +arrow-array = { workspace = true } [dependencies.dora-node-api] workspace = true diff --git a/apis/c/node/src/lib.rs b/apis/c/node/src/lib.rs index 01f36ba69..88a776753 100644 --- a/apis/c/node/src/lib.rs +++ b/apis/c/node/src/lib.rs @@ -1,6 +1,7 @@ #![deny(unsafe_op_in_unsafe_fn)] -use dora_node_api::{DoraNode, Event, EventStream}; +use arrow_array::BinaryArray; +use dora_node_api::{arrow::array::AsArray, DoraNode, Event, EventStream}; use eyre::Context; use std::{ffi::c_void, ptr, slice}; @@ -169,14 +170,20 @@ pub unsafe extern "C" fn read_dora_input_data( ) { let event: &Event = unsafe { &*event.cast() }; match event { - Event::Input { - data: Some(data), .. - } => { - let ptr = data.as_ptr(); - let len = data.len(); - unsafe { - *out_ptr = ptr; - *out_len = len; + Event::Input { data, .. } => { + let data: Option<&BinaryArray> = data.as_binary_opt(); + if let Some(data) = data { + let ptr = data.value(0).as_ptr(); + let len = data.value(0).len(); + unsafe { + *out_ptr = ptr; + *out_len = len; + } + } else { + unsafe { + *out_ptr = ptr::null(); + *out_len = 0; + } } } _ => unsafe { @@ -245,7 +252,7 @@ unsafe fn try_send_output( let data = unsafe { slice::from_raw_parts(data_ptr, data_len) }; context .node - .send_output(output_id, Default::default(), data.len(), |out| { + .send_output_raw(output_id, Default::default(), data.len(), |out| { out.copy_from_slice(data); }) } diff --git a/apis/c/operator/Cargo.toml b/apis/c/operator/Cargo.toml index 0df6eac03..e17469643 100644 --- a/apis/c/operator/Cargo.toml +++ b/apis/c/operator/Cargo.toml @@ -6,5 +6,11 @@ description = "C API implemetation for Dora Operator" documentation.workspace = true license.workspace = true +[lib] +crate-type = ["staticlib", "lib"] + +[dependencies] +dora-operator-api-types = { workspace = true } + [build-dependencies] dora-operator-api-types = { workspace = true } diff --git a/apis/c/operator/operator_api.h b/apis/c/operator/operator_api.h index 846fd3cb3..24f57ab27 100644 --- a/apis/c/operator/operator_api.h +++ b/apis/c/operator/operator_api.h @@ -19,7 +19,7 @@ extern "C" EXPORT DoraResult_t dora_drop_operator(void *operator_context); EXPORT OnEventResult_t dora_on_event( - const RawEvent_t *event, + RawEvent_t *event, const SendOutput_t *send_output, void *operator_context); diff --git a/apis/c/operator/operator_types.h b/apis/c/operator/operator_types.h index 30585ca4f..c6e3c6f8c 100644 --- a/apis/c/operator/operator_types.h +++ b/apis/c/operator/operator_types.h @@ -86,22 +86,7 @@ typedef struct OnEventResult { } OnEventResult_t; /** */ -typedef struct Metadata { - /** */ - Vec_uint8_t open_telemetry_context; -} Metadata_t; - -/** */ -typedef struct Input { - /** */ - Vec_uint8_t id; - - /** */ - Vec_uint8_t data; - - /** */ - Metadata_t metadata; -} Input_t; +typedef struct Input Input_t; #include @@ -122,16 +107,7 @@ typedef struct RawEvent { } RawEvent_t; /** */ -typedef struct Output { - /** */ - Vec_uint8_t id; - - /** */ - Vec_uint8_t data; - - /** */ - Metadata_t metadata; -} Output_t; +typedef struct Output Output_t; /** \brief * `Arc Ret>` @@ -159,9 +135,43 @@ typedef struct SendOutput { /** */ typedef struct DoraOnEvent { /** */ - OnEventResult_t (*on_event)(RawEvent_t const *, SendOutput_t const *, void *); + OnEventResult_t (*on_event)(RawEvent_t *, SendOutput_t const *, void *); } DoraOnEvent_t; +/** */ +typedef struct Metadata { + /** */ + Vec_uint8_t open_telemetry_context; +} Metadata_t; + +/** */ +void +dora_free_data ( + Vec_uint8_t _data); + +/** */ +void +dora_free_input_id ( + char * _input_id); + +/** */ +Vec_uint8_t +dora_read_data ( + Input_t * input); + +/** */ +char * +dora_read_input_id ( + Input_t const * input); + +/** */ +DoraResult_t +dora_send_operator_output ( + SendOutput_t const * send_output, + char const * id, + uint8_t const * data_ptr, + size_t data_len); + #ifdef __cplusplus } /* extern \"C\" */ diff --git a/apis/c/operator/src/lib.rs b/apis/c/operator/src/lib.rs index 262e70b32..9b6f8ea5a 100644 --- a/apis/c/operator/src/lib.rs +++ b/apis/c/operator/src/lib.rs @@ -1,2 +1,4 @@ pub const HEADER_OPERATOR_API: &str = include_str!("../operator_api.h"); pub const HEADER_OPERATOR_TYPES: &str = include_str!("../operator_types.h"); + +pub use dora_operator_api_types; diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 229d8d9ac..71bc9a10d 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -16,13 +16,13 @@ telemetry = ["dora-runtime/telemetry"] [dependencies] dora-node-api = { workspace = true } dora-operator-api-python = { workspace = true } -pyo3 = { version = "0.19", features = ["eyre", "abi3-py37"] } +pyo3 = { workspace = true, features = ["eyre", "abi3-py37"] } eyre = "0.6" serde_yaml = "0.8.23" flume = "0.10.14" dora-runtime = { workspace = true, features = ["tracing", "python"] } -arrow = { version = "45.0.0", features = ["pyarrow"] } -pythonize = "0.19.0" +arrow = { workspace = true, features = ["pyarrow"] } +pythonize = { workspace = true } futures = "0.3.28" dora-ros2-bridge-python = { workspace = true } diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 2918527ae..19f61264f 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -3,9 +3,7 @@ use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use dora_node_api::merged::{MergeExternalSend, MergedEvent}; use dora_node_api::{DoraNode, EventStream}; -use dora_operator_api_python::{ - copy_array_into_sample, pydict_to_metadata, required_data_size, PyEvent, -}; +use dora_operator_api_python::{pydict_to_metadata, PyEvent}; use dora_ros2_bridge_python::Ros2Subscription; use eyre::Context; use futures::{Stream, StreamExt}; @@ -100,14 +98,11 @@ impl Node { .send_output_bytes(output_id.into(), parameters, data.len(), data) .wrap_err("failed to send output")?; } else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow(data.as_ref(py)) { - let total_len = required_data_size(&arrow_array); - - let mut sample = self.node.allocate_data_sample(total_len)?; - let type_info = copy_array_into_sample(&mut sample, &arrow_array)?; - - self.node - .send_output_sample(output_id.into(), type_info, parameters, Some(sample)) - .wrap_err("failed to send output")?; + self.node.send_output( + output_id.into(), + parameters, + arrow::array::make_array(arrow_array), + )?; } else { eyre::bail!("invalid `data` type, must by `PyBytes` or arrow array") } diff --git a/apis/python/operator/Cargo.toml b/apis/python/operator/Cargo.toml index 897e8693d..07cef1f07 100644 --- a/apis/python/operator/Cargo.toml +++ b/apis/python/operator/Cargo.toml @@ -11,9 +11,10 @@ license.workspace = true [dependencies] dora-node-api = { workspace = true } -pyo3 = { version = "0.19", features = ["eyre", "abi3-py37"] } +pyo3 = { workspace = true, features = ["eyre", "abi3-py37"] } eyre = "0.6" serde_yaml = "0.8.23" flume = "0.10.14" -arrow = { version = "45.0.0", features = ["pyarrow"] } -arrow-schema = "45.0.0" +arrow = { workspace = true, features = ["pyarrow"] } +arrow-schema = { workspace = true } +aligned-vec = "0.5.0" diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index b6d89b10b..9e9ed68ec 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,22 +1,12 @@ -use std::sync::Arc; - -use arrow::{array::ArrayData, pyarrow::ToPyArrow}; -use dora_node_api::{ - dora_core::message::{ArrowTypeInfo, BufferOffset}, - merged::MergedEvent, - Data, Event, Metadata, MetadataParameters, -}; +use arrow::{array::ArrayRef, pyarrow::ToPyArrow}; +use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; use eyre::{Context, Result}; -use pyo3::{ - exceptions::PyLookupError, - prelude::*, - types::{PyBytes, PyDict}, -}; +use pyo3::{exceptions::PyLookupError, prelude::*, types::PyDict}; #[pyclass] pub struct PyEvent { event: MergedEvent, - data: Option>, + data: Option, } #[pymethods] @@ -34,7 +24,6 @@ impl PyEvent { let value = match key { "type" => Some(Self::ty(event).to_object(py)), "id" => Self::id(event).map(|v| v.to_object(py)), - "data" => self.data(py), "value" => self.value(py)?, "metadata" => Self::metadata(event, py), "error" => Self::error(event).map(|v| v.to_object(py)), @@ -77,21 +66,12 @@ impl PyEvent { } } - /// Returns the payload of an input event as a `PyBytes` object (if any). - fn data(&self, py: Python<'_>) -> Option { - self.data.as_ref().map(|data| PyBytes::new(py, data).into()) - } - /// Returns the payload of an input event as an arrow array (if any). fn value(&self, py: Python<'_>) -> PyResult> { match (&self.event, &self.data) { - (MergedEvent::Dora(Event::Input { metadata, .. }), Some(data)) => { - let array = data - .clone() - .into_arrow_array(&metadata.type_info) - .context("Could not create arrow array")?; + (MergedEvent::Dora(Event::Input { .. }), Some(data)) => { // TODO: Does this call leak data? - let array_data = array.to_pyarrow(py)?; + let array_data = data.to_data().to_pyarrow(py)?; Ok(Some(array_data)) } _ => Ok(None), @@ -122,7 +102,7 @@ impl From for PyEvent { impl From> for PyEvent { fn from(mut event: MergedEvent) -> Self { let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event { - data.take().map(Arc::new) + Some(data.clone()) } else { None }; @@ -167,68 +147,11 @@ pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> &'a PyD dict } -pub fn copy_array_into_sample( - target_buffer: &mut [u8], - arrow_array: &ArrayData, -) -> eyre::Result { - let mut next_offset = 0; - copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) -} - -fn copy_array_into_sample_inner( - target_buffer: &mut [u8], - next_offset: &mut usize, - arrow_array: &ArrayData, -) -> eyre::Result { - let mut buffer_offsets = Vec::new(); - for buffer in arrow_array.buffers().iter() { - let len = buffer.len(); - assert!( - target_buffer[*next_offset..].len() >= len, - "target buffer too small (total_len: {}, offset: {}, required_len: {len})", - target_buffer.len(), - *next_offset, - ); - target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice()); - buffer_offsets.push(BufferOffset { - offset: *next_offset, - len, - }); - *next_offset += len; - } - - let mut child_data = Vec::new(); - for child in arrow_array.child_data() { - let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?; - child_data.push(child_type_info); - } - - Ok(ArrowTypeInfo { - data_type: arrow_array.data_type().clone(), - len: arrow_array.len(), - null_count: arrow_array.null_count(), - validity: arrow_array.nulls().map(|b| b.validity().to_owned()), - offset: arrow_array.offset(), - buffer_offsets, - child_data, - }) -} - -pub fn required_data_size(array: &ArrayData) -> usize { - let mut size = 0; - for buffer in array.buffers() { - size += buffer.len(); - } - for child in array.child_data() { - size += required_data_size(child); - } - size -} - #[cfg(test)] mod tests { use std::sync::Arc; + use aligned_vec::avec; use arrow::{ array::{ ArrayData, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, Int8Array, @@ -238,18 +161,19 @@ mod tests { }; use arrow_schema::{DataType, Field}; - use dora_node_api::Data; + use dora_node_api::{ + arrow_utils::{copy_array_into_sample, required_data_size}, + RawData, + }; use eyre::{Context, Result}; - use crate::{copy_array_into_sample, required_data_size}; - fn assert_roundtrip(arrow_array: &ArrayData) -> Result<()> { - let size = required_data_size(&arrow_array); - let mut sample: Vec = vec![0; size]; + let size = required_data_size(arrow_array); + let mut sample = avec![0; size]; - let info = copy_array_into_sample(&mut sample, &arrow_array)?; + let info = copy_array_into_sample(&mut sample, arrow_array)?; - let serialized_deserialized_arrow_array = Arc::new(Data::Vec(sample)) + let serialized_deserialized_arrow_array = RawData::Vec(sample) .into_arrow_array(&info) .context("Could not create arrow array")?; assert_eq!(arrow_array, &serialized_deserialized_arrow_array); @@ -261,15 +185,15 @@ mod tests { fn serialize_deserialize_arrow() -> Result<()> { // Int8 let arrow_array = Int8Array::from(vec![1, -2, 3, 4]).into(); - assert_roundtrip(&arrow_array)?; + assert_roundtrip(&arrow_array).context("Int8Array roundtrip failed")?; // Int64 let arrow_array = Int64Array::from(vec![1, -2, 3, 4]).into(); - assert_roundtrip(&arrow_array)?; + assert_roundtrip(&arrow_array).context("Int64Array roundtrip failed")?; // Float64 let arrow_array = Float64Array::from(vec![1., -2., 3., 4.]).into(); - assert_roundtrip(&arrow_array)?; + assert_roundtrip(&arrow_array).context("Float64Array roundtrip failed")?; // Struct let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true])); @@ -278,15 +202,15 @@ mod tests { let struct_array = StructArray::from(vec![ ( Arc::new(Field::new("b", DataType::Boolean, false)), - boolean.clone() as ArrayRef, + boolean as ArrayRef, ), ( Arc::new(Field::new("c", DataType::Int32, false)), - int.clone() as ArrayRef, + int as ArrayRef, ), ]) .into(); - assert_roundtrip(&struct_array)?; + assert_roundtrip(&struct_array).context("StructArray roundtrip failed")?; // List let value_data = ArrayData::builder(DataType::Int32) @@ -301,14 +225,14 @@ mod tests { // Construct a list array from the above two let list_data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, false))); - let list_data = ArrayData::builder(list_data_type.clone()) + let list_data = ArrayData::builder(list_data_type) .len(3) - .add_buffer(value_offsets.clone()) - .add_child_data(value_data.clone()) + .add_buffer(value_offsets) + .add_child_data(value_data) .build() .unwrap(); let list_array = ListArray::from(list_data).into(); - assert_roundtrip(&list_array)?; + assert_roundtrip(&list_array).context("ListArray roundtrip failed")?; Ok(()) } diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index d15ebc1e6..d13f4583f 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -26,10 +26,12 @@ capnp = "0.14.11" bincode = "1.3.3" shared_memory_extended = "0.13.0" dora-tracing = { workspace = true, optional = true } -arrow = "45.0.0" -arrow-schema = "45.0.0" +arrow = { workspace = true } +arrow-schema = { workspace = true } futures = "0.3.28" futures-concurrency = "7.3.0" +dora-arrow-convert = { workspace = true } +aligned-vec = "0.5.0" [dev-dependencies] tokio = { version = "1.24.2", features = ["rt"] } diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index 919894515..75b3c595b 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -1,5 +1,7 @@ use std::{ptr::NonNull, sync::Arc}; +use aligned_vec::{AVec, ConstAlign}; +use dora_arrow_convert::{ArrowData, IntoArrow}; use dora_core::{ config::{DataId, OperatorId}, message::{ArrowTypeInfo, BufferOffset, Metadata}, @@ -17,7 +19,7 @@ pub enum Event { Input { id: DataId, metadata: Metadata, - data: Option, + data: ArrowData, }, InputClosed { id: DataId, @@ -25,31 +27,47 @@ pub enum Event { Error(String), } -pub enum Data { - Vec(Vec), - SharedMemory { - data: MappedInputData, - _drop: flume::Sender<()>, - }, +pub enum RawData { + Empty, + Vec(AVec>), + SharedMemory(SharedMemoryData), } -impl Data { - pub fn into_arrow_array( - self: Arc, - type_info: &ArrowTypeInfo, - ) -> Result { - let ptr = NonNull::new(self.as_ptr() as *mut _).unwrap(); - let len = self.len(); - let raw_buffer = unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, self) }; +impl RawData { + pub fn into_arrow_array(self, type_info: &ArrowTypeInfo) -> Result { + let raw_buffer = match self { + RawData::Empty => return Ok(().into_arrow().into()), + RawData::Vec(data) => { + let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap(); + let len = data.len(); + + unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } + } + RawData::SharedMemory(data) => { + let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap(); + let len = data.data.len(); + + unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } + } + }; buffer_into_arrow_array(&raw_buffer, type_info) } } +pub struct SharedMemoryData { + pub data: MappedInputData, + pub _drop: flume::Sender<()>, +} + fn buffer_into_arrow_array( raw_buffer: &arrow::buffer::Buffer, type_info: &ArrowTypeInfo, -) -> std::result::Result { +) -> eyre::Result { + if raw_buffer.is_empty() { + return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type)); + } + let mut buffers = Vec::new(); for BufferOffset { offset, len } in &type_info.buffer_offsets { buffers.push(raw_buffer.slice_with_length(*offset, *len)); @@ -71,21 +89,10 @@ fn buffer_into_arrow_array( buffers, child_data, ) - .context("Error creating Arrow Array") -} - -impl std::ops::Deref for Data { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - match self { - Data::SharedMemory { data, .. } => data, - Data::Vec(data) => data, - } - } + .context("Error creating Arrow array") } -impl std::fmt::Debug for Data { +impl std::fmt::Debug for RawData { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Data").finish_non_exhaustive() } diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index f052c123c..48e7ecaab 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -1,9 +1,12 @@ use std::sync::Arc; -pub use event::{Data, Event, MappedInputData}; +pub use event::{Event, MappedInputData, RawData}; use futures::{Stream, StreamExt}; -use self::thread::{EventItem, EventStreamThreadHandle}; +use self::{ + event::SharedMemoryData, + thread::{EventItem, EventStreamThreadHandle}, +}; use crate::daemon_connection::DaemonChannel; use dora_core::{ config::NodeId, @@ -117,22 +120,32 @@ impl EventStream { NodeEvent::Input { id, metadata, data } => { let data = match data { None => Ok(None), - Some(daemon_messages::Data::Vec(v)) => Ok(Some(Data::Vec(v))), - Some(daemon_messages::Data::SharedMemory { + Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), + Some(daemon_messages::DataMessage::SharedMemory { shared_memory_id, len, drop_token: _, // handled in `event_stream_loop` }) => unsafe { MappedInputData::map(&shared_memory_id, len).map(|data| { - Some(Data::SharedMemory { + Some(RawData::SharedMemory(SharedMemoryData { data, _drop: ack_channel, - }) + })) }) }, }; + let data = data.and_then(|data| { + let raw_data = data.unwrap_or(RawData::Empty); + raw_data + .into_arrow_array(&metadata.type_info) + .map(arrow::array::make_array) + }); match data { - Ok(data) => Event::Input { id, metadata, data }, + Ok(data) => Event::Input { + id, + metadata, + data: data.into(), + }, Err(err) => Event::Error(format!("{err:?}")), } } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index b183a94d8..d2ce63dd2 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -13,11 +13,13 @@ //! dora new project_xyz --kind dataflow //! ``` //! +pub use arrow; +pub use dora_arrow_convert::*; pub use dora_core; pub use dora_core::message::{uhlc, Metadata, MetadataParameters}; -pub use event_stream::{merged, Data, Event, EventStream, MappedInputData}; +pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData}; pub use flume::Receiver; -pub use node::{DataSample, DoraNode, ZERO_COPY_THRESHOLD}; +pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; mod daemon_connection; mod event_stream; diff --git a/apis/rust/node/src/node/arrow_utils.rs b/apis/rust/node/src/node/arrow_utils.rs new file mode 100644 index 000000000..da14489af --- /dev/null +++ b/apis/rust/node/src/node/arrow_utils.rs @@ -0,0 +1,74 @@ +use arrow::array::{ArrayData, BufferSpec}; +use dora_core::message::{ArrowTypeInfo, BufferOffset}; + +pub fn required_data_size(array: &ArrayData) -> usize { + let mut next_offset = 0; + required_data_size_inner(array, &mut next_offset); + next_offset +} +fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { + let layout = arrow::array::layout(array.data_type()); + for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) { + // consider alignment padding + if let BufferSpec::FixedWidth { alignment, .. } = spec { + *next_offset = (*next_offset + alignment - 1) / alignment * alignment; + } + *next_offset += buffer.len(); + } + for child in array.child_data() { + required_data_size_inner(child, next_offset); + } +} + +pub fn copy_array_into_sample( + target_buffer: &mut [u8], + arrow_array: &ArrayData, +) -> eyre::Result { + let mut next_offset = 0; + copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) +} + +fn copy_array_into_sample_inner( + target_buffer: &mut [u8], + next_offset: &mut usize, + arrow_array: &ArrayData, +) -> eyre::Result { + let mut buffer_offsets = Vec::new(); + let layout = arrow::array::layout(arrow_array.data_type()); + for (buffer, spec) in arrow_array.buffers().iter().zip(&layout.buffers) { + let len = buffer.len(); + assert!( + target_buffer[*next_offset..].len() >= len, + "target buffer too small (total_len: {}, offset: {}, required_len: {len})", + target_buffer.len(), + *next_offset, + ); + // add alignment padding + if let BufferSpec::FixedWidth { alignment, .. } = spec { + *next_offset = (*next_offset + alignment - 1) / alignment * alignment; + } + + target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice()); + buffer_offsets.push(BufferOffset { + offset: *next_offset, + len, + }); + *next_offset += len; + } + + let mut child_data = Vec::new(); + for child in arrow_array.child_data() { + let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?; + child_data.push(child_type_info); + } + + Ok(ArrowTypeInfo { + data_type: arrow_array.data_type().clone(), + len: arrow_array.len(), + null_count: arrow_array.null_count(), + validity: arrow_array.nulls().map(|b| b.validity().to_owned()), + offset: arrow_array.offset(), + buffer_offsets, + child_data, + }) +} diff --git a/apis/rust/node/src/node/control_channel.rs b/apis/rust/node/src/node/control_channel.rs index f0d74a788..693295789 100644 --- a/apis/rust/node/src/node/control_channel.rs +++ b/apis/rust/node/src/node/control_channel.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use crate::daemon_connection::DaemonChannel; use dora_core::{ config::{DataId, NodeId}, - daemon_messages::{DaemonCommunication, DaemonRequest, Data, DataflowId, Timestamped}, + daemon_messages::{DaemonCommunication, DaemonRequest, DataMessage, DataflowId, Timestamped}, message::{uhlc::HLC, Metadata}, }; use eyre::{bail, eyre, Context}; @@ -84,7 +84,7 @@ impl ControlChannel { &mut self, output_id: DataId, metadata: Metadata, - data: Option, + data: Option, ) -> eyre::Result<()> { let request = DaemonRequest::SendMessage { output_id, diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index bc91e581f..39bd8af3b 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -1,9 +1,15 @@ use crate::EventStream; -use self::{control_channel::ControlChannel, drop_stream::DropStream}; +use self::{ + arrow_utils::{copy_array_into_sample, required_data_size}, + control_channel::ControlChannel, + drop_stream::DropStream, +}; +use aligned_vec::{avec, AVec, ConstAlign}; +use arrow::array::Array; use dora_core::{ config::{DataId, NodeId, NodeRunConfig}, - daemon_messages::{Data, DropToken, NodeConfig}, + daemon_messages::{DataMessage, DropToken, NodeConfig}, descriptor::Descriptor, message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, }; @@ -19,6 +25,7 @@ use std::{ #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; +pub mod arrow_utils; mod control_channel; mod drop_stream; @@ -108,7 +115,7 @@ impl DoraNode { /// let data: &[u8] = &[0, 1, 2, 3]; /// let parameters = MetadataParameters::default(); /// - /// node.send_output( + /// node.send_output_raw( /// output, /// parameters, /// data.len(), @@ -117,7 +124,7 @@ impl DoraNode { /// }).expect("Could not send output"); /// ``` /// - pub fn send_output( + pub fn send_output_raw( &mut self, output_id: DataId, parameters: MetadataParameters, @@ -135,6 +142,25 @@ impl DoraNode { self.send_output_sample(output_id, type_info, parameters, Some(sample)) } + pub fn send_output( + &mut self, + output_id: DataId, + parameters: MetadataParameters, + data: impl Array, + ) -> eyre::Result<()> { + let arrow_array = data.to_data(); + + let total_len = required_data_size(&arrow_array); + + let mut sample = self.allocate_data_sample(total_len)?; + let type_info = copy_array_into_sample(&mut sample, &arrow_array)?; + + self.send_output_sample(output_id, type_info, parameters, Some(sample)) + .wrap_err("failed to send output")?; + + Ok(()) + } + pub fn send_output_bytes( &mut self, output_id: DataId, @@ -142,7 +168,7 @@ impl DoraNode { data_len: usize, data: &[u8], ) -> eyre::Result<()> { - self.send_output(output_id, parameters, data_len, |sample| { + self.send_output_raw(output_id, parameters, data_len, |sample| { sample.copy_from_slice(data) }) } @@ -231,7 +257,7 @@ impl DoraNode { len: data_len, } } else { - vec![0; data_len].into() + avec![0; data_len].into() }; Ok(data) @@ -356,18 +382,18 @@ pub struct DataSample { } impl DataSample { - fn finalize(self) -> (Option, Option<(ShmemHandle, DropToken)>) { + fn finalize(self) -> (Option, Option<(ShmemHandle, DropToken)>) { match self.inner { DataSampleInner::Shmem(shared_memory) => { let drop_token = DropToken::generate(); - let data = Data::SharedMemory { + let data = DataMessage::SharedMemory { shared_memory_id: shared_memory.get_os_id().to_owned(), len: self.len, drop_token, }; (Some(data), Some((shared_memory, drop_token))) } - DataSampleInner::Vec(buffer) => (Some(Data::Vec(buffer)), None), + DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None), } } } @@ -394,8 +420,8 @@ impl DerefMut for DataSample { } } -impl From> for DataSample { - fn from(value: Vec) -> Self { +impl From>> for DataSample { + fn from(value: AVec>) -> Self { Self { len: value.len(), inner: DataSampleInner::Vec(value), @@ -418,7 +444,7 @@ impl std::fmt::Debug for DataSample { enum DataSampleInner { Shmem(ShmemHandle), - Vec(Vec), + Vec(AVec>), } struct ShmemHandle(Box); diff --git a/apis/rust/operator/Cargo.toml b/apis/rust/operator/Cargo.toml index 21a26d8bf..6391df393 100644 --- a/apis/rust/operator/Cargo.toml +++ b/apis/rust/operator/Cargo.toml @@ -11,3 +11,4 @@ license.workspace = true [dependencies] dora-operator-api-macros = { workspace = true } dora-operator-api-types = { workspace = true } +dora-arrow-convert = { workspace = true } diff --git a/apis/rust/operator/macros/src/lib.rs b/apis/rust/operator/macros/src/lib.rs index 2af1687dd..8b5384f67 100644 --- a/apis/rust/operator/macros/src/lib.rs +++ b/apis/rust/operator/macros/src/lib.rs @@ -52,7 +52,7 @@ fn register_operator_impl(item: &TokenStream2) -> syn::Result { let on_event = quote! { #[no_mangle] pub unsafe extern "C" fn dora_on_event( - event: &dora_operator_api::types::RawEvent, + event: &mut dora_operator_api::types::RawEvent, send_output: &dora_operator_api::types::SendOutput, operator_context: *mut std::ffi::c_void, ) -> dora_operator_api::types::OnEventResult { diff --git a/apis/rust/operator/src/lib.rs b/apis/rust/operator/src/lib.rs index a8707a5c2..fd770c78c 100644 --- a/apis/rust/operator/src/lib.rs +++ b/apis/rust/operator/src/lib.rs @@ -18,17 +18,22 @@ #![warn(unsafe_op_in_unsafe_fn)] #![allow(clippy::missing_safety_doc)] +pub use dora_arrow_convert::*; pub use dora_operator_api_macros::register_operator; pub use dora_operator_api_types as types; pub use types::DoraStatus; -use types::{Metadata, Output, SendOutput}; +use types::{ + arrow::{self, array::Array}, + Metadata, Output, SendOutput, +}; pub mod raw; #[derive(Debug)] #[non_exhaustive] pub enum Event<'a> { - Input { id: &'a str, data: &'a [u8] }, + Input { id: &'a str, data: ArrowData }, + InputParseError { id: &'a str, error: String }, InputClosed { id: &'a str }, Stop, } @@ -48,10 +53,13 @@ impl DoraOutputSender<'_> { /// Send an output from the operator: /// - `id` is the `output_id` as defined in your dataflow. /// - `data` is the data that should be sent - pub fn send(&mut self, id: String, data: Vec) -> Result<(), String> { + pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String> { + let (data_array, schema) = + arrow::ffi::to_ffi(&data.into_data()).map_err(|err| err.to_string())?; let result = self.0.send_output.call(Output { id: id.into(), - data: data.into(), + data_array, + schema, metadata: Metadata { open_telemetry_context: String::new().into(), // TODO }, diff --git a/apis/rust/operator/src/raw.rs b/apis/rust/operator/src/raw.rs index c3db60e43..9a05a5e3f 100644 --- a/apis/rust/operator/src/raw.rs +++ b/apis/rust/operator/src/raw.rs @@ -1,5 +1,7 @@ use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event}; -use dora_operator_api_types::{DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput}; +use dora_operator_api_types::{ + arrow, DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput, +}; use std::ffi::c_void; pub type OutputFnRaw = unsafe extern "C" fn( @@ -27,7 +29,7 @@ pub unsafe fn dora_drop_operator(operator_context: *mut c_void) -> DoraResult } pub unsafe fn dora_on_event( - event: &RawEvent, + event: &mut RawEvent, send_output: &SendOutput, operator_context: *mut std::ffi::c_void, ) -> OnEventResult { @@ -35,11 +37,24 @@ pub unsafe fn dora_on_event( let operator: &mut O = unsafe { &mut *operator_context.cast() }; - let event_variant = if let Some(input) = &event.input { - let data = input.data.as_ref().as_slice(); - Event::Input { - id: &input.id, - data, + let event_variant = if let Some(input) = &mut event.input { + let Some(data_array) = input.data_array.take() else { + return OnEventResult { + result: DoraResult { error: Some("data already taken".to_string().into()) }, + status: DoraStatus::Continue, + }; + }; + let data = arrow::ffi::from_ffi(data_array, &input.schema); + + match data { + Ok(data) => Event::Input { + id: &input.id, + data: arrow::array::make_array(data).into(), + }, + Err(err) => Event::InputParseError { + id: &input.id, + error: format!("{err}"), + }, } } else if let Some(input_id) = &event.input_closed { Event::InputClosed { id: input_id } diff --git a/apis/rust/operator/types/Cargo.toml b/apis/rust/operator/types/Cargo.toml index 517173782..499a02ed8 100644 --- a/apis/rust/operator/types/Cargo.toml +++ b/apis/rust/operator/types/Cargo.toml @@ -8,6 +8,10 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies] +arrow = { workspace = true, features = ["ffi"] } +dora-arrow-convert = { workspace = true } + [dependencies.safer-ffi] version = "0.1.3" features = ["headers", "inventory-0-3-1"] diff --git a/apis/rust/operator/types/src/lib.rs b/apis/rust/operator/types/src/lib.rs index fd2715b5d..af44c5bec 100644 --- a/apis/rust/operator/types/src/lib.rs +++ b/apis/rust/operator/types/src/lib.rs @@ -1,7 +1,19 @@ #![deny(elided_lifetimes_in_paths)] // required for safer-ffi +pub use arrow; +use dora_arrow_convert::{ArrowData, IntoArrow}; pub use safer_ffi; -use safer_ffi::{closure::ArcDynFn1, derive_ReprC, ffi_export}; + +use arrow::{ + array::Array, + ffi::{FFI_ArrowArray, FFI_ArrowSchema}, +}; +use core::slice; +use safer_ffi::{ + char_p::{self, char_p_boxed}, + closure::ArcDynFn1, + derive_ReprC, ffi_export, +}; use std::path::Path; #[derive_ReprC] @@ -46,7 +58,7 @@ pub struct DoraOnEvent { #[repr(transparent)] pub struct OnEventFn( pub unsafe extern "C" fn( - event: &RawEvent, + event: &mut RawEvent, send_output: &SendOutput, operator_context: *mut std::ffi::c_void, ) -> OnEventResult, @@ -64,12 +76,12 @@ pub struct RawEvent { } #[derive_ReprC] -#[ffi_export] -#[repr(C)] +#[repr(opaque)] #[derive(Debug)] pub struct Input { pub id: safer_ffi::String, - pub data: safer_ffi::Vec, + pub data_array: Option, + pub schema: FFI_ArrowSchema, pub metadata: Metadata, } @@ -89,12 +101,12 @@ pub struct SendOutput { } #[derive_ReprC] -#[ffi_export] -#[repr(C)] +#[repr(opaque)] #[derive(Debug)] pub struct Output { pub id: safer_ffi::String, - pub data: safer_ffi::Vec, + pub data_array: FFI_ArrowArray, + pub schema: FFI_ArrowSchema, pub metadata: Metadata, } @@ -117,6 +129,56 @@ pub enum DoraStatus { StopAll = 2, } +#[ffi_export] +pub fn dora_read_input_id(input: &Input) -> char_p_boxed { + char_p::new(&*input.id) +} + +#[ffi_export] +pub fn dora_free_input_id(_input_id: char_p_boxed) {} + +#[ffi_export] +pub fn dora_read_data(input: &mut Input) -> Option> { + let data_array = input.data_array.take()?; + let data = arrow::ffi::from_ffi(data_array, &input.schema).ok()?; + let array = ArrowData(arrow::array::make_array(data)); + let bytes: &[u8] = TryFrom::try_from(&array).ok()?; + Some(bytes.to_owned().into()) +} + +#[ffi_export] +pub fn dora_free_data(_data: safer_ffi::Vec) {} + +#[ffi_export] +pub unsafe fn dora_send_operator_output( + send_output: &SendOutput, + id: safer_ffi::char_p::char_p_ref<'_>, + data_ptr: *const u8, + data_len: usize, +) -> DoraResult { + let result = || { + let data = unsafe { slice::from_raw_parts(data_ptr, data_len) }; + let arrow_data = data.to_owned().into_arrow(); + let (data_array, schema) = + arrow::ffi::to_ffi(&arrow_data.into_data()).map_err(|err| err.to_string())?; + let output = Output { + id: id.to_str().to_owned().into(), + data_array, + schema, + metadata: Metadata { + open_telemetry_context: String::new().into(), // TODO + }, + }; + Result::<_, String>::Ok(output) + }; + match result() { + Ok(output) => send_output.send_output.call(output), + Err(error) => DoraResult { + error: Some(error.into()), + }, + } +} + pub fn generate_headers(target_file: &Path) -> ::std::io::Result<()> { ::safer_ffi::headers::builder() .to_file(target_file)? diff --git a/binaries/cli/src/template/c/operator/operator-template.c b/binaries/cli/src/template/c/operator/operator-template.c index 3ba7fd5d6..7639cfc66 100644 --- a/binaries/cli/src/template/c/operator/operator-template.c +++ b/binaries/cli/src/template/c/operator/operator-template.c @@ -23,7 +23,7 @@ DoraResult_t dora_drop_operator(void *operator_context) } OnEventResult_t dora_on_event( - const RawEvent_t *event, + RawEvent_t *event, const SendOutput_t *send_output, void *operator_context) { diff --git a/binaries/cli/src/template/python/node/node-template.py b/binaries/cli/src/template/python/node/node-template.py index 45d1cdf41..f29c0a998 100644 --- a/binaries/cli/src/template/python/node/node-template.py +++ b/binaries/cli/src/template/python/node/node-template.py @@ -10,6 +10,6 @@ print( f"""Node received: id: {event["id"]}, - value: {event["data"]}, + value: {event["value"]}, metadata: {event["metadata"]}""" ) diff --git a/binaries/cli/src/template/python/operator/operator-template.py b/binaries/cli/src/template/python/operator/operator-template.py index 4efb4457e..884522217 100644 --- a/binaries/cli/src/template/python/operator/operator-template.py +++ b/binaries/cli/src/template/python/operator/operator-template.py @@ -45,7 +45,7 @@ def on_input( """ print( - f"Received input {dora_input['id']}, with data: {dora_input['data']}" + f"Received input {dora_input['id']}, with data: {dora_input['value']}" ) return DoraStatus.CONTINUE diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 222de6566..f580ca308 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -37,4 +37,5 @@ shared-memory-server = { workspace = true } ctrlc = "3.2.5" bincode = "1.3.3" async-trait = "0.1.64" -arrow-schema = "45.0.0" +arrow-schema = { workspace = true } +aligned-vec = "0.5.0" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d8201f398..bb806859a 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,7 +1,8 @@ +use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; -use dora_core::daemon_messages::{Data, InterDaemonEvent, Timestamped}; +use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped}; use dora_core::message::uhlc::{self, HLC}; use dora_core::message::{ArrowTypeInfo, MetadataParameters}; use dora_core::{ @@ -461,7 +462,7 @@ impl Daemon { output_id.clone(), dataflow, &metadata, - data.map(Data::Vec), + data.map(DataMessage::Vec), &self.clock, ) .await?; @@ -759,7 +760,7 @@ impl Daemon { node_id: NodeId, output_id: DataId, metadata: dora_core::message::Metadata, - data: Option, + data: Option, ) -> Result<(), eyre::ErrReport> { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("send out failed: no running dataflow with ID `{dataflow_id}`") @@ -1070,9 +1071,9 @@ async fn send_output_to_local_receivers( output_id: DataId, dataflow: &mut RunningDataflow, metadata: &dora_core::message::Metadata, - data: Option, + data: Option, clock: &HLC, -) -> Result>, eyre::ErrReport> { +) -> Result>>, eyre::ErrReport> { let timestamp = metadata.timestamp(); let empty_set = BTreeSet::new(); let output_id = OutputId(node_id, output_id); @@ -1114,7 +1115,7 @@ async fn send_output_to_local_receivers( } let (data_bytes, drop_token) = match data { None => (None, None), - Some(Data::SharedMemory { + Some(DataMessage::SharedMemory { shared_memory_id, len, drop_token, @@ -1123,10 +1124,10 @@ async fn send_output_to_local_receivers( .os_id(shared_memory_id) .open() .wrap_err("failed to map shared memory output")?; - let data = Some(unsafe { memory.as_slice() }[..len].to_owned()); + let data = Some(AVec::from_slice(1, &unsafe { memory.as_slice() }[..len])); (data, Some(drop_token)) } - Some(Data::Vec(v)) => (Some(v), None), + Some(DataMessage::Vec(v)) => (Some(v), None), }; if let Some(token) = drop_token { // insert token into `pending_drop_tokens` even if there are no local subscribers @@ -1445,7 +1446,7 @@ pub enum DaemonNodeEvent { SendOut { output_id: DataId, metadata: dora_core::message::Metadata, - data: Option, + data: Option, }, ReportDrop { tokens: Vec, diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 04d950466..90f9ff017 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -28,16 +28,17 @@ serde_yaml = "0.8.23" tokio = { version = "1.24.2", features = ["full"] } tokio-stream = "0.1.8" # pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html -pyo3 = { version = "0.19", features = ["eyre", "abi3-py37"], optional = true } +pyo3 = { workspace = true, features = ["eyre", "abi3-py37"], optional = true } tracing = "0.1.36" tracing-subscriber = "0.3.15" dora-download = { workspace = true } flume = "0.10.14" clap = { version = "4.0.3", features = ["derive"] } tracing-opentelemetry = { version = "0.18.0", optional = true } -pythonize = { version = "0.19.0", optional = true } -arrow-schema = "45.0.0" -arrow = { version = "45.0.0" } +pythonize = { workspace = true, optional = true } +arrow-schema = { workspace = true } +arrow = { workspace = true, features = ["ffi"] } +aligned-vec = "0.5.0" [features] default = ["tracing"] diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 7d9ee39d3..e9a2f9360 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -269,12 +269,14 @@ mod callback_impl { use crate::operator::OperatorEvent; use super::SendOutputCallback; + use aligned_vec::avec; use arrow::{array::ArrayData, pyarrow::FromPyArrow}; use dora_core::message::ArrowTypeInfo; - use dora_node_api::ZERO_COPY_THRESHOLD; - use dora_operator_api_python::{ - copy_array_into_sample, pydict_to_metadata, required_data_size, + use dora_node_api::{ + arrow_utils::{copy_array_into_sample, required_data_size}, + ZERO_COPY_THRESHOLD, }; + use dora_operator_api_python::pydict_to_metadata; use eyre::{eyre, Context, Result}; use pyo3::{ pymethods, @@ -310,7 +312,7 @@ mod callback_impl { .wrap_err("failed to request output sample")? .wrap_err("failed to allocate output sample") } else { - Ok(vec![0; data_len].into()) + Ok(avec![0; data_len].into()) } }; diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index c1371def4..391370770 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,12 +1,15 @@ use super::{OperatorEvent, StopReason}; +use aligned_vec::avec; use dora_core::{ adjust_shared_library_path, config::{DataId, NodeId, OperatorId}, descriptor::source_is_url, - message::ArrowTypeInfo, }; use dora_download::download_file; -use dora_node_api::{Event, MetadataParameters}; +use dora_node_api::{ + arrow_utils::{copy_array_into_sample, required_data_size}, + Event, MetadataParameters, +}; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent, DoraResult, DoraStatus, Metadata, OnEventResult, Output, SendOutput, @@ -110,7 +113,8 @@ impl<'lib> SharedLibraryOperator<'lib> { let send_output_closure = Arc::new(move |output: Output| { let Output { id: output_id, - data, + data_array, + schema, metadata: Metadata { open_telemetry_context, }, @@ -120,11 +124,31 @@ impl<'lib> SharedLibraryOperator<'lib> { ..Default::default() }; + let arrow_array = match arrow::ffi::from_ffi(data_array, &schema) { + Ok(a) => a, + Err(err) => { + return DoraResult { + error: Some(err.to_string().into()), + } + } + }; + + let total_len = required_data_size(&arrow_array); + let mut sample = avec![0; total_len]; + let type_info = match copy_array_into_sample(&mut sample, &arrow_array) { + Ok(t) => t, + Err(err) => { + return DoraResult { + error: Some(err.to_string().into()), + } + } + }; + let event = OperatorEvent::Output { output_id: DataId::from(String::from(output_id)), - type_info: ArrowTypeInfo::byte_array(data.len()), + type_info, parameters, - data: Some(Vec::from(data).into()), + data: Some(sample.into()), }; let result = self @@ -168,7 +192,7 @@ impl<'lib> SharedLibraryOperator<'lib> { metadata.parameters.open_telemetry_context = string_cx; } - let operator_event = match event { + let mut operator_event = match event { Event::Stop => dora_operator_api_types::RawEvent { input: None, input_closed: None, @@ -180,9 +204,12 @@ impl<'lib> SharedLibraryOperator<'lib> { metadata, data, } => { + let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?; + let operator_input = dora_operator_api_types::Input { id: String::from(input_id).into(), - data: data.map(|d| d.to_vec()).unwrap_or_default().into(), + data_array: Some(data_array), + schema, metadata: Metadata { open_telemetry_context: metadata .parameters @@ -227,7 +254,7 @@ impl<'lib> SharedLibraryOperator<'lib> { status, } = unsafe { (self.bindings.on_event.on_event)( - &operator_event, + &mut operator_event, &send_output, operator_context.raw, ) diff --git a/examples/benchmark/node/src/main.rs b/examples/benchmark/node/src/main.rs index 2ec10cad5..9080bbc55 100644 --- a/examples/benchmark/node/src/main.rs +++ b/examples/benchmark/node/src/main.rs @@ -31,7 +31,7 @@ fn main() -> eyre::Result<()> { .sample_iter(rand::distributions::Standard) .take(size) .collect(); - node.send_output(latency.clone(), Default::default(), data.len(), |out| { + node.send_output_raw(latency.clone(), Default::default(), data.len(), |out| { out.copy_from_slice(&data); })?; @@ -50,7 +50,7 @@ fn main() -> eyre::Result<()> { .sample_iter(rand::distributions::Standard) .take(size) .collect(); - node.send_output(throughput.clone(), Default::default(), data.len(), |out| { + node.send_output_raw(throughput.clone(), Default::default(), data.len(), |out| { out.copy_from_slice(&data); })?; } diff --git a/examples/benchmark/sink/src/main.rs b/examples/benchmark/sink/src/main.rs index 7ce064b8c..154b47d24 100644 --- a/examples/benchmark/sink/src/main.rs +++ b/examples/benchmark/sink/src/main.rs @@ -22,7 +22,7 @@ fn main() -> eyre::Result<()> { match event { Event::Input { id, metadata, data } => { // check if new size bracket - let data_len = data.map(|d| d.len()).unwrap_or_default(); + let data_len = data.len(); if data_len != current_size { if n > 0 { record_results(start, current_size, n, latencies, latency); diff --git a/examples/c++-dataflow/operator-c-api/operator.cc b/examples/c++-dataflow/operator-c-api/operator.cc index 7c9fd299d..f15492dda 100644 --- a/examples/c++-dataflow/operator-c-api/operator.cc +++ b/examples/c++-dataflow/operator-c-api/operator.cc @@ -7,6 +7,7 @@ extern "C" #include #include #include +#include class Operator { @@ -31,7 +32,7 @@ extern "C" DoraResult_t dora_drop_operator(void *operator_context) } extern "C" OnEventResult_t dora_on_event( - const RawEvent_t *event, + RawEvent_t *event, const SendOutput_t *send_output, void *operator_context) { @@ -39,19 +40,16 @@ extern "C" OnEventResult_t dora_on_event( { // input event Input_t *input = event->input; - std::string id((char *)input->id.ptr, input->id.len); + char *id = dora_read_input_id(input); - std::vector data; - for (size_t i = 0; i < input->data.len; i++) - { - data.push_back(*(input->data.ptr + i)); - } + Vec_uint8_t data = dora_read_data(input); + assert(data.ptr != NULL); std::cout << "C++ Operator (C-API) received input `" << id << "` with data: ["; - for (unsigned char &v : data) + for (int i = 0; i < data.len; i++) { - std::cout << (unsigned int)v << ", "; + std::cout << (unsigned int)data.ptr[i] << ", "; } std::cout << "]" << std::endl; @@ -60,18 +58,15 @@ extern "C" OnEventResult_t dora_on_event( size_t out_data_len = 1; uint8_t *out_data_heap = (uint8_t *)malloc(out_data_len); - *out_data_heap = data[0] / 2; - - Output_t output = {.id = { - .ptr = (uint8_t *)out_id_heap, - .len = strlen(out_id_heap), - .cap = strlen(out_id_heap) + 1, - }, - .data = {.ptr = out_data_heap, .len = out_data_len, .cap = out_data_len}}; + *out_data_heap = *data.ptr / 2; - DoraResult_t send_result = (send_output->send_output.call)(send_output->send_output.env_ptr, output); + DoraResult_t send_result = dora_send_operator_output(send_output, out_id_heap, out_data_heap, out_data_len); OnEventResult_t result = {.result = send_result, .status = DORA_STATUS_CONTINUE}; + + dora_free_data(data); + dora_free_input_id(id); + return result; } if (event->stop) diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs index ef34861b9..27914ac38 100644 --- a/examples/c++-dataflow/run.rs +++ b/examples/c++-dataflow/run.rs @@ -113,7 +113,12 @@ async fn main() -> eyre::Result<()> { Path::new("operator-c-api").join("operator.cc"), )?], "operator_c_api", - &[], + &[ + "-l", + "dora_operator_api_c", + "-L", + root.join("target").join("debug").to_str().unwrap(), + ], ) .await?; @@ -268,6 +273,16 @@ async fn build_cxx_operator( link.arg("-lmsvcrt"); link.arg("-fms-runtime-lib=static"); } + #[cfg(target_os = "macos")] + { + link.arg("-framework").arg("CoreServices"); + link.arg("-framework").arg("Security"); + link.arg("-l").arg("System"); + link.arg("-l").arg("resolv"); + link.arg("-l").arg("pthread"); + link.arg("-l").arg("c"); + link.arg("-l").arg("m"); + } link.arg("-o") .arg(Path::new("../build").join(format!("{DLL_PREFIX}{out_name}{DLL_SUFFIX}"))); if let Some(parent) = paths[0].parent() { diff --git a/examples/c-dataflow/operator.c b/examples/c-dataflow/operator.c index ba64e8e3f..cc5ee83bd 100644 --- a/examples/c-dataflow/operator.c +++ b/examples/c-dataflow/operator.c @@ -23,10 +23,12 @@ DoraResult_t dora_drop_operator(void *operator_context) } OnEventResult_t dora_on_event( - const RawEvent_t *event, + RawEvent_t *event, const SendOutput_t *send_output, void *operator_context) { + OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; + char *counter = (char *)operator_context; if (event->input != NULL) @@ -34,18 +36,17 @@ OnEventResult_t dora_on_event( // input event Input_t *input = event->input; - char id[input->id.len + 1]; - memcpy(id, input->id.ptr, input->id.len); - id[input->id.len] = 0; + char *id = dora_read_input_id(input); if (strcmp(id, "message") == 0) { - char data[input->data.len + 1]; - memcpy(data, input->data.ptr, input->data.len); - data[input->data.len] = 0; + printf("message event\n"); + + Vec_uint8_t data = dora_read_data(input); + assert(data.ptr != NULL); *counter += 1; - printf("C operator received message `%s`, counter: %i\n", data, *counter); + printf("C operator received message `%.*s`, counter: %i\n", (int)data.len, data.ptr, *counter); char *out_id = "counter"; char *out_id_heap = strdup(out_id); @@ -55,27 +56,22 @@ OnEventResult_t dora_on_event( int count = snprintf(out_data, data_alloc_size, "The current counter value is %d", *counter); assert(count >= 0 && count < 100); - Output_t output = {.id = { - .ptr = (uint8_t *)out_id_heap, - .len = strlen(out_id_heap), - .cap = strlen(out_id_heap) + 1, - }, - .data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}}; - DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output); + DoraResult_t res = dora_send_operator_output(send_output, out_id_heap, (uint8_t *)out_data, strlen(out_data)); + result.result = res; - OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE}; - return result; + dora_free_data(data); } else { printf("C operator received unexpected input %s, context: %i\n", id, *counter); } + + dora_free_input_id(id); } if (event->stop) { printf("C operator received stop event\n"); } - OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; return result; } diff --git a/examples/c-dataflow/run.rs b/examples/c-dataflow/run.rs index 2143dddf2..7db5a7f7b 100644 --- a/examples/c-dataflow/run.rs +++ b/examples/c-dataflow/run.rs @@ -33,7 +33,7 @@ async fn main() -> eyre::Result<()> { build_c_node(root, "sink.c", "c_sink").await?; build_package("dora-operator-api-c").await?; - build_c_operator().await?; + build_c_operator(root).await?; let dataflow = Path::new("dataflow.yml").to_owned(); dora_daemon::Daemon::run_dataflow(&dataflow).await?; @@ -116,7 +116,7 @@ async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<( Ok(()) } -async fn build_c_operator() -> eyre::Result<()> { +async fn build_c_operator(root: &Path) -> eyre::Result<()> { let mut compile = tokio::process::Command::new("clang"); compile.arg("-c").arg("operator.c"); compile.arg("-o").arg("build/operator.o"); @@ -129,6 +129,51 @@ async fn build_c_operator() -> eyre::Result<()> { let mut link = tokio::process::Command::new("clang"); link.arg("-shared").arg("build/operator.o"); + link.arg("-L").arg(root.join("target").join("debug")); + link.arg("-l").arg("dora_operator_api_c"); + #[cfg(target_os = "windows")] + { + link.arg("-ladvapi32"); + link.arg("-luserenv"); + link.arg("-lkernel32"); + link.arg("-lws2_32"); + link.arg("-lbcrypt"); + link.arg("-lncrypt"); + link.arg("-lschannel"); + link.arg("-lntdll"); + link.arg("-liphlpapi"); + + link.arg("-lcfgmgr32"); + link.arg("-lcredui"); + link.arg("-lcrypt32"); + link.arg("-lcryptnet"); + link.arg("-lfwpuclnt"); + link.arg("-lgdi32"); + link.arg("-lmsimg32"); + link.arg("-lmswsock"); + link.arg("-lole32"); + link.arg("-loleaut32"); + link.arg("-lopengl32"); + link.arg("-lsecur32"); + link.arg("-lshell32"); + link.arg("-lsynchronization"); + link.arg("-luser32"); + link.arg("-lwinspool"); + + link.arg("-Wl,-nodefaultlib:libcmt"); + link.arg("-D_DLL"); + link.arg("-lmsvcrt"); + } + #[cfg(target_os = "macos")] + { + link.arg("-framework").arg("CoreServices"); + link.arg("-framework").arg("Security"); + link.arg("-l").arg("System"); + link.arg("-l").arg("resolv"); + link.arg("-l").arg("pthread"); + link.arg("-l").arg("c"); + link.arg("-l").arg("m"); + } link.arg("-o") .arg(Path::new("build").join(format!("{DLL_PREFIX}operator{DLL_SUFFIX}"))); if !link.status().await?.success() { diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs index c52e46181..36f42d578 100644 --- a/examples/multiple-daemons/node/src/main.rs +++ b/examples/multiple-daemons/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event}; +use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow}; fn main() -> eyre::Result<()> { println!("hello"); @@ -22,10 +22,7 @@ fn main() -> eyre::Result<()> { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); - let data: &[u8] = &random.to_le_bytes(); - node.send_output(output.clone(), metadata.parameters, data.len(), |out| { - out.copy_from_slice(data); - })?; + node.send_output(output.clone(), metadata.parameters, random.into_arrow())?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, diff --git a/examples/multiple-daemons/operator/src/lib.rs b/examples/multiple-daemons/operator/src/lib.rs index 81a1a1f0d..d18138c53 100644 --- a/examples/multiple-daemons/operator/src/lib.rs +++ b/examples/multiple-daemons/operator/src/lib.rs @@ -1,6 +1,8 @@ #![warn(unsafe_op_in_unsafe_fn)] -use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; +use dora_operator_api::{ + register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, +}; register_operator!(ExampleOperator); @@ -21,16 +23,14 @@ impl DoraOperator for ExampleOperator { self.ticks += 1; } "random" => { - let parsed = { - let data: [u8; 8] = - (*data).try_into().map_err(|_| "unexpected random data")?; - u64::from_le_bytes(data) - }; + let data = u64::try_from(data) + .map_err(|err| format!("expected u64 message: {err}"))?; + let output = format!( - "operator received random value {parsed:#x} after {} ticks", + "operator received random value {data:#x} after {} ticks", self.ticks ); - output_sender.send("status".into(), output.into_bytes())?; + output_sender.send("status".into(), output.into_arrow())?; } other => eprintln!("ignoring unexpected input {other}"), }, diff --git a/examples/multiple-daemons/sink/src/main.rs b/examples/multiple-daemons/sink/src/main.rs index 18632c3f3..e180af081 100644 --- a/examples/multiple-daemons/sink/src/main.rs +++ b/examples/multiple-daemons/sink/src/main.rs @@ -1,5 +1,5 @@ use dora_node_api::{self, DoraNode, Event}; -use eyre::{bail, Context, ContextCompat}; +use eyre::{bail, Context}; fn main() -> eyre::Result<()> { let (_node, mut events) = DoraNode::init_from_env()?; @@ -12,9 +12,8 @@ fn main() -> eyre::Result<()> { data, } => match id.as_str() { "message" => { - let data = data.wrap_err("no data")?; - let received_string = std::str::from_utf8(&data) - .wrap_err("received message was not utf8-encoded")?; + let received_string: &str = + TryFrom::try_from(&data).context("expected string message")?; println!("sink received message: {}", received_string); if !received_string.starts_with("operator received random value ") { bail!("unexpected message format (should start with 'operator received random value')") diff --git a/examples/python-dataflow/object_detection.py b/examples/python-dataflow/object_detection.py index 7209c2ea5..e74feadd7 100755 --- a/examples/python-dataflow/object_detection.py +++ b/examples/python-dataflow/object_detection.py @@ -18,7 +18,7 @@ match event["id"]: case "image": print("[object detection] received image input") - frame = np.frombuffer(event["data"], dtype="uint8") + frame = event["value"].to_numpy() frame = cv2.imdecode(frame, -1) frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) results = model(frame) # includes NMS diff --git a/examples/python-dataflow/plot.py b/examples/python-dataflow/plot.py index 5d0c6c013..7284a6d7d 100755 --- a/examples/python-dataflow/plot.py +++ b/examples/python-dataflow/plot.py @@ -33,15 +33,15 @@ def on_input( Args: dora_input["id"] (str): Id of the dora_input declared in the yaml configuration - dora_input["data"] (bytes): Bytes message of the dora_input + dora_input["value"] (arrow array): message of the dora_input """ if dora_input["id"] == "image": - frame = np.frombuffer(dora_input["data"], dtype="uint8") + frame = dora_input["value"].to_numpy() frame = cv2.imdecode(frame, -1) self.image = frame elif dora_input["id"] == "bbox" and len(self.image) != 0: - bboxs = np.frombuffer(dora_input["data"], dtype="float32") + bboxs = dora_input["value"].to_numpy() self.bboxs = np.reshape(bboxs, (-1, 6)) for bbox in self.bboxs: [ diff --git a/examples/python-dataflow/run.rs b/examples/python-dataflow/run.rs index e531b8b27..e33100b83 100644 --- a/examples/python-dataflow/run.rs +++ b/examples/python-dataflow/run.rs @@ -20,9 +20,7 @@ async fn main() -> eyre::Result<()> { let venv = &root.join("examples").join(".env"); std::env::set_var( "VIRTUAL_ENV", - venv.to_str() - .context("venv path not valid unicode")? - .to_owned(), + venv.to_str().context("venv path not valid unicode")?, ); let orig_path = std::env::var("PATH")?; let venv_bin = venv.join("bin"); diff --git a/examples/python-operator-dataflow/plot.py b/examples/python-operator-dataflow/plot.py index 948b80867..e7c32f294 100755 --- a/examples/python-operator-dataflow/plot.py +++ b/examples/python-operator-dataflow/plot.py @@ -50,7 +50,7 @@ def on_input( Args: dora_input["id"] (str): Id of the dora_input declared in the yaml configuration - dora_input["data"] (bytes): Bytes message of the dora_input + dora_input["value"] (arrow array): message of the dora_input send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]: Function for sending output to the dataflow: - First argument is the `output_id` diff --git a/examples/python-operator-dataflow/run.rs b/examples/python-operator-dataflow/run.rs index e531b8b27..e33100b83 100644 --- a/examples/python-operator-dataflow/run.rs +++ b/examples/python-operator-dataflow/run.rs @@ -20,9 +20,7 @@ async fn main() -> eyre::Result<()> { let venv = &root.join("examples").join(".env"); std::env::set_var( "VIRTUAL_ENV", - venv.to_str() - .context("venv path not valid unicode")? - .to_owned(), + venv.to_str().context("venv path not valid unicode")?, ); let orig_path = std::env::var("PATH")?; let venv_bin = venv.join("bin"); diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index c52e46181..36f42d578 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event}; +use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow}; fn main() -> eyre::Result<()> { println!("hello"); @@ -22,10 +22,7 @@ fn main() -> eyre::Result<()> { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); - let data: &[u8] = &random.to_le_bytes(); - node.send_output(output.clone(), metadata.parameters, data.len(), |out| { - out.copy_from_slice(data); - })?; + node.send_output(output.clone(), metadata.parameters, random.into_arrow())?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, diff --git a/examples/rust-dataflow/operator/src/lib.rs b/examples/rust-dataflow/operator/src/lib.rs index 81a1a1f0d..5a131a157 100644 --- a/examples/rust-dataflow/operator/src/lib.rs +++ b/examples/rust-dataflow/operator/src/lib.rs @@ -1,6 +1,8 @@ #![warn(unsafe_op_in_unsafe_fn)] -use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; +use dora_operator_api::{ + register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, +}; register_operator!(ExampleOperator); @@ -21,16 +23,14 @@ impl DoraOperator for ExampleOperator { self.ticks += 1; } "random" => { - let parsed = { - let data: [u8; 8] = - (*data).try_into().map_err(|_| "unexpected random data")?; - u64::from_le_bytes(data) - }; + let value = u64::try_from(data) + .map_err(|err| format!("unexpected data type: {err}"))?; + let output = format!( - "operator received random value {parsed:#x} after {} ticks", + "operator received random value {value:#x} after {} ticks", self.ticks ); - output_sender.send("status".into(), output.into_bytes())?; + output_sender.send("status".into(), output.into_arrow())?; } other => eprintln!("ignoring unexpected input {other}"), }, diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index 18632c3f3..e180af081 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -1,5 +1,5 @@ use dora_node_api::{self, DoraNode, Event}; -use eyre::{bail, Context, ContextCompat}; +use eyre::{bail, Context}; fn main() -> eyre::Result<()> { let (_node, mut events) = DoraNode::init_from_env()?; @@ -12,9 +12,8 @@ fn main() -> eyre::Result<()> { data, } => match id.as_str() { "message" => { - let data = data.wrap_err("no data")?; - let received_string = std::str::from_utf8(&data) - .wrap_err("received message was not utf8-encoded")?; + let received_string: &str = + TryFrom::try_from(&data).context("expected string message")?; println!("sink received message: {}", received_string); if !received_string.starts_with("operator received random value ") { bail!("unexpected message format (should start with 'operator received random value')") diff --git a/libraries/arrow-convert/Cargo.toml b/libraries/arrow-convert/Cargo.toml new file mode 100644 index 000000000..8307d72f9 --- /dev/null +++ b/libraries/arrow-convert/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "dora-arrow-convert" +version.workspace = true +edition = "2021" +documentation.workspace = true +description.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +arrow = { workspace = true } +eyre = "0.6.8" diff --git a/libraries/arrow-convert/src/from_impls.rs b/libraries/arrow-convert/src/from_impls.rs new file mode 100644 index 000000000..5b0b687d1 --- /dev/null +++ b/libraries/arrow-convert/src/from_impls.rs @@ -0,0 +1,153 @@ +use arrow::{ + array::{Array, AsArray, PrimitiveArray, StringArray}, + datatypes::ArrowPrimitiveType, +}; +use eyre::ContextCompat; + +use crate::ArrowData; + +impl From for arrow::array::ArrayRef { + fn from(value: ArrowData) -> Self { + value.0 + } +} + +impl From for ArrowData { + fn from(value: arrow::array::ArrayRef) -> Self { + Self(value) + } +} + +impl TryFrom<&ArrowData> for bool { + type Error = eyre::Report; + fn try_from(value: &ArrowData) -> Result { + let bool_array = value.as_boolean_opt().context("not a bool array")?; + if bool_array.is_empty() { + eyre::bail!("empty array"); + } + if bool_array.len() != 1 { + eyre::bail!("expected length 1"); + } + if bool_array.null_count() != 0 { + eyre::bail!("bool array has nulls"); + } + Ok(bool_array.value(0)) + } +} +impl TryFrom<&ArrowData> for u8 { + type Error = eyre::Report; + fn try_from(value: &ArrowData) -> Result { + let array = value + .as_primitive_opt::() + .context("not a primitive array")?; + extract_single_primitive(array) + } +} +impl TryFrom<&ArrowData> for u16 { + type Error = eyre::Report; + fn try_from(value: &ArrowData) -> Result { + let array = value + .as_primitive_opt::() + .context("not a primitive array")?; + extract_single_primitive(array) + } +} +impl TryFrom<&ArrowData> for u32 { + type Error = eyre::Report; + fn try_from(value: &ArrowData) -> Result { + let array = value + .as_primitive_opt::() + .context("not a primitive array")?; + extract_single_primitive(array) + } +} +impl TryFrom<&ArrowData> for u64 { + type Error = eyre::Report; + fn try_from(value: &ArrowData) -> Result { + let array = value + .as_primitive_opt::() + .context("not a primitive array")?; + extract_single_primitive(array) + } +} +impl TryFrom<&ArrowData> for i8 { + type Error = eyre::Report; + fn try_from(value: &ArrowData) -> Result { + let array = value + .as_primitive_opt::() + .context("not a primitive array")?; + extract_single_primitive(array) + } +} +impl TryFrom<&ArrowData> for i16 { + type Error = eyre::Report; + fn try_from(value: &ArrowData) -> Result { + let array = value + .as_primitive_opt::() + .context("not a primitive array")?; + extract_single_primitive(array) + } +} +impl TryFrom<&ArrowData> for i32 { + type Error = eyre::Report; + fn try_from(value: &ArrowData) -> Result { + let array = value + .as_primitive_opt::() + .context("not a primitive array")?; + extract_single_primitive(array) + } +} +impl TryFrom<&ArrowData> for i64 { + type Error = eyre::Report; + fn try_from(value: &ArrowData) -> Result { + let array = value + .as_primitive_opt::() + .context("not a primitive array")?; + extract_single_primitive(array) + } +} + +impl<'a> TryFrom<&'a ArrowData> for &'a str { + type Error = eyre::Report; + fn try_from(value: &'a ArrowData) -> Result { + let array: &StringArray = value.as_string_opt().wrap_err("not a string array")?; + if array.is_empty() { + eyre::bail!("empty array"); + } + if array.len() != 1 { + eyre::bail!("expected length 1"); + } + if array.null_count() != 0 { + eyre::bail!("array has nulls"); + } + Ok(array.value(0)) + } +} + +impl<'a> TryFrom<&'a ArrowData> for &'a [u8] { + type Error = eyre::Report; + fn try_from(value: &'a ArrowData) -> Result { + let array: &PrimitiveArray = + value.as_primitive_opt().wrap_err("not a primitive array")?; + if array.null_count() != 0 { + eyre::bail!("array has nulls"); + } + Ok(array.values()) + } +} + +fn extract_single_primitive(array: &PrimitiveArray) -> Result +where + T: ArrowPrimitiveType, +{ + if array.is_empty() { + eyre::bail!("empty array"); + } + if array.len() != 1 { + eyre::bail!("expected length 1"); + } + if array.null_count() != 0 { + eyre::bail!("array has nulls"); + } + Ok(array.value(0)) +} diff --git a/libraries/arrow-convert/src/into_impls.rs b/libraries/arrow-convert/src/into_impls.rs new file mode 100644 index 000000000..030d1b477 --- /dev/null +++ b/libraries/arrow-convert/src/into_impls.rs @@ -0,0 +1,147 @@ +use arrow::array::{PrimitiveArray, StringArray}; + +use crate::IntoArrow; + +impl IntoArrow for bool { + type A = arrow::array::BooleanArray; + fn into_arrow(self) -> Self::A { + std::iter::once(Some(self)).collect() + } +} + +impl IntoArrow for u8 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for u16 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for u32 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for u64 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for i8 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for i16 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for i32 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for i64 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for f32 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for f64 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} + +impl IntoArrow for &str { + type A = StringArray; + fn into_arrow(self) -> Self::A { + std::iter::once(Some(self)).collect() + } +} + +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} +impl IntoArrow for Vec { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + self.into() + } +} + +impl IntoArrow for () { + type A = arrow::array::NullArray; + + fn into_arrow(self) -> Self::A { + arrow::array::NullArray::new(0) + } +} diff --git a/libraries/arrow-convert/src/lib.rs b/libraries/arrow-convert/src/lib.rs new file mode 100644 index 000000000..826d45634 --- /dev/null +++ b/libraries/arrow-convert/src/lib.rs @@ -0,0 +1,29 @@ +use std::ops::{Deref, DerefMut}; + +use arrow::array::Array; + +mod from_impls; +mod into_impls; + +pub trait IntoArrow { + type A: Array; + + fn into_arrow(self) -> Self::A; +} + +#[derive(Debug)] +pub struct ArrowData(pub arrow::array::ArrayRef); + +impl Deref for ArrowData { + type Target = arrow::array::ArrayRef; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for ArrowData { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index a408d90d1..3be48e236 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -20,3 +20,4 @@ dora-message = { workspace = true } tracing = "0.1" serde-with-expand-env = "1.1.0" tokio = { version = "1.24.1", features = ["fs"] } +aligned-vec = { version = "0.5.0", features = ["serde"] } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 21fe93323..a9a6bef84 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -9,6 +9,7 @@ use crate::{ config::{DataId, NodeId, NodeRunConfig, OperatorId}, descriptor::{Descriptor, OperatorDefinition, ResolvedNode}, }; +use aligned_vec::{AVec, ConstAlign}; use dora_message::{uhlc, Metadata}; use uuid::Uuid; @@ -51,7 +52,7 @@ pub enum DaemonRequest { SendMessage { output_id: DataId, metadata: Metadata, - data: Option, + data: Option, }, CloseOutputs(Vec), /// Signals that the node is finished sending outputs and that it received all @@ -86,9 +87,8 @@ impl DaemonRequest { } #[derive(serde::Serialize, serde::Deserialize, Clone)] -pub enum Data { - #[serde(with = "serde_bytes")] - Vec(Vec), +pub enum DataMessage { + Vec(AVec>), SharedMemory { shared_memory_id: String, len: usize, @@ -96,16 +96,16 @@ pub enum Data { }, } -impl Data { +impl DataMessage { pub fn drop_token(&self) -> Option { match self { - Data::Vec(_) => None, - Data::SharedMemory { drop_token, .. } => Some(*drop_token), + DataMessage::Vec(_) => None, + DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token), } } } -impl fmt::Debug for Data { +impl fmt::Debug for DataMessage { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Vec(v) => f @@ -153,7 +153,7 @@ pub enum NodeEvent { Input { id: DataId, metadata: Metadata, - data: Option, + data: Option, }, InputClosed { id: DataId, @@ -234,7 +234,7 @@ pub enum InterDaemonEvent { node_id: NodeId, output_id: DataId, metadata: Metadata, - data: Option>, + data: Option>>, }, InputsClosed { dataflow_id: DataflowId, diff --git a/libraries/extensions/ros2-bridge/python/Cargo.toml b/libraries/extensions/ros2-bridge/python/Cargo.toml index 2b06a593b..8c07908f9 100644 --- a/libraries/extensions/ros2-bridge/python/Cargo.toml +++ b/libraries/extensions/ros2-bridge/python/Cargo.toml @@ -7,9 +7,9 @@ edition = "2021" [dependencies] dora-ros2-bridge = { path = "..", default-features = false } dora-ros2-bridge-msg-gen = { path = "../msg-gen" } -pyo3 = { version = "0.19", features = ["eyre", "abi3-py37", "serde"] } +pyo3 = { workspace = true, features = ["eyre", "abi3-py37", "serde"] } eyre = "0.6" serde = "1.0.166" flume = "0.10.14" -arrow = { version = "45.0.0", features = ["pyarrow"] } +arrow = { workspace = true, features = ["pyarrow"] } futures = "0.3.28" diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index 02793e7a3..a9dd3c04b 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -10,8 +10,8 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow-data = { version = "45.0.0" } +arrow-data = { workspace = true } uhlc = "0.5.1" serde = { version = "1.0.136", features = ["derive"] } eyre = "0.6.8" -arrow-schema = { version = "45.0.0", features = ["serde"] } +arrow-schema = { workspace = true, features = ["serde"] }