Skip to content

Commit

Permalink
feat: add Attachment to API (#189)
Browse files Browse the repository at this point in the history
* feat: add Attachment to API

* fix: make attachment a CLI arg in z_put example

* Update zenoh/session.py

Co-authored-by: Luca Cominardi <luca.cominardi@gmail.com>

* Apply suggestions from code review

Co-authored-by: Luca Cominardi <luca.cominardi@gmail.com>

* test: add test for attachment

---------

Co-authored-by: Luca Cominardi <luca.cominardi@gmail.com>
  • Loading branch information
wyfo and Mallets authored Apr 22, 2024
1 parent 0812267 commit 9e6bd83
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 36 deletions.
16 changes: 14 additions & 2 deletions examples/z_put.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
default='Put from Python!',
type=str,
help='The value to write.')
parser.add_argument('--attach', '-a', dest='attach',
type=str,
help='The key-values to attach')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
Expand All @@ -60,6 +63,14 @@
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key
value = args.value
attachment = args.attach
if attachment is not None:
attachment = {
k: v
for pair in attachment.split("&")
for (k, v) in [pair.split("=")]
}


# Zenoh code --- --- --- --- --- --- --- --- --- --- ---
def main():
Expand All @@ -70,7 +81,7 @@ def main():
session = zenoh.open(conf)

print("Putting Data ('{}': '{}')...".format(key, value))
session.put(key, value)
session.put(key, value, attachment=attachment)

# --- Examples of put with other types:

Expand Down Expand Up @@ -103,4 +114,5 @@ def main():

session.close()

main()

main()
7 changes: 4 additions & 3 deletions examples/z_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key


# Zenoh code --- --- --- --- --- --- --- --- --- --- ---
def main():
# initiate logging
Expand All @@ -68,10 +69,8 @@ def main():

print("Declaring Subscriber on '{}'...".format(key))


def listener(sample: Sample):
print(f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}')")


# WARNING, you MUST store the return value in order for the subscription to work!!
# This is because if you don't, the reference counter will reach 0 and the subscription
Expand All @@ -86,4 +85,6 @@ def listener(sample: Sample):
# the reference counter reaches 0
# sub.undeclare()
# session.close()
main()


main()
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ fn zenoh(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<queryable::_Query>()?;
m.add_class::<queryable::_Queryable>()?;
m.add_class::<value::_Value>()?;
m.add_class::<value::_Attachment>()?;
m.add_class::<value::_Sample>()?;
m.add_class::<value::_QoS>()?;
m.add_class::<value::_Reply>()?;
Expand Down
25 changes: 22 additions & 3 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::enums::{
};
use crate::keyexpr::{_KeyExpr, _Selector};
use crate::queryable::{_Query, _Queryable};
use crate::value::{_Hello, _Reply, _Sample, _Value, _ZenohId};
use crate::value::{_Attachment, _Hello, _Reply, _Sample, _Value, _ZenohId};
use crate::{PyAnyToValue, PyExtract, ToPyErr};

#[pyclass(subclass)]
Expand Down Expand Up @@ -88,6 +88,11 @@ impl _Session {
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
match kwargs.extract_item::<_Attachment>("attachment") {
Ok(attachment) => builder = builder.with_attachment(attachment.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
Expand Down Expand Up @@ -115,6 +120,11 @@ impl _Session {
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
match kwargs.extract_item::<_Attachment>("attachment") {
Ok(attachment) => builder = builder.with_attachment(attachment.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
Expand Down Expand Up @@ -144,6 +154,11 @@ impl _Session {
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
match kwargs.extract_item::<_Attachment>("attachment") {
Ok(attachment) => builder = builder.with_attachment(attachment.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
Expand Down Expand Up @@ -275,8 +290,12 @@ impl _Publisher {
pub fn key_expr(&self) -> _KeyExpr {
_KeyExpr(self.0.key_expr().clone())
}
pub fn put(&self, value: _Value) -> PyResult<()> {
self.0.put(value).res_sync().map_err(|e| e.to_pyerr())
pub fn put(&self, value: _Value, attachment: Option<_Attachment>) -> PyResult<()> {
let mut builder = self.0.put(value);
if let Some(attachment) = attachment {
builder = builder.with_attachment(attachment.0);
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
pub fn delete(&self) -> PyResult<()> {
self.0.delete().res_sync().map_err(|e| e.to_pyerr())
Expand Down
72 changes: 71 additions & 1 deletion src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
// Contributors:
// ZettaScale Zenoh team, <zenoh@zettascale.tech>

use pyo3::{prelude::*, types::PyBytes};
use pyo3::{
prelude::*,
types::{PyBytes, PyDict},
};
use std::collections::HashMap;
use uhlc::Timestamp;
use zenoh::sample::{Attachment, AttachmentBuilder};
use zenoh::{
prelude::{Encoding, KeyExpr, Sample, Value, ZenohId},
query::Reply,
Expand Down Expand Up @@ -188,6 +193,60 @@ impl _QoS {
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug)]
pub struct _Attachment(pub Attachment);

#[pymethods]
impl _Attachment {
#[new]
pub fn pynew(this: Self) -> Self {
this
}

#[staticmethod]
fn new(attachment: HashMap<Vec<u8>, Vec<u8>>) -> Self {
Self(attachment.iter().map(|(k, v)| (&k[..], &v[..])).collect())
}

fn is_empty(&self) -> bool {
self.0.is_empty()
}

fn len(&self) -> usize {
self.0.len()
}

fn items<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let items = PyDict::new_bound(py);
for (k, v) in self.0.iter() {
items.set_item(PyBytes::new_bound(py, &k), PyBytes::new_bound(py, &v))?;
}
Ok(items)
}

fn get(&self, key: Vec<u8>) -> Option<Vec<u8>> {
self.0.get(&key).map(|v| v.to_vec())
}

fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
self.0.insert(&key, &value)
}

fn extend(mut this: PyRefMut<Self>, attachment: HashMap<Vec<u8>, Vec<u8>>) -> PyRefMut<Self> {
this.0.extend(
attachment
.iter()
.map(|(k, v)| (&k[..], &v[..]))
.collect::<AttachmentBuilder>(),
);
this
}
fn as_str(&self) -> String {
format!("{:?}", self.0)
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug)]
pub struct _Sample {
Expand All @@ -196,6 +255,7 @@ pub struct _Sample {
kind: _SampleKind,
timestamp: Option<_Timestamp>,
qos: _QoS,
attachment: Option<_Attachment>,
}
impl From<Sample> for _Sample {
fn from(sample: Sample) -> Self {
Expand All @@ -205,6 +265,7 @@ impl From<Sample> for _Sample {
kind,
timestamp,
qos,
attachment,
..
} = sample;
_Sample {
Expand All @@ -213,6 +274,7 @@ impl From<Sample> for _Sample {
kind: _SampleKind(kind),
timestamp: timestamp.map(_Timestamp),
qos: _QoS(qos),
attachment: attachment.map(_Attachment),
}
}
}
Expand Down Expand Up @@ -310,20 +372,26 @@ impl _Sample {
pub fn timestamp(&self) -> Option<_Timestamp> {
self.timestamp
}
#[getter]
pub fn attachment(&self) -> Option<_Attachment> {
self.attachment.clone()
}
#[staticmethod]
pub fn new(
key_expr: _KeyExpr,
value: _Value,
qos: _QoS,
kind: _SampleKind,
timestamp: Option<_Timestamp>,
attachment: Option<_Attachment>,
) -> Self {
_Sample {
key_expr: key_expr.0,
value,
qos,
kind,
timestamp,
attachment,
}
}
fn __str__(&self) -> String {
Expand All @@ -339,11 +407,13 @@ impl From<_Sample> for Sample {
kind,
timestamp,
qos,
attachment,
} = sample;
let mut sample = Sample::new(key_expr, value);
sample.kind = kind.0;
sample.timestamp = timestamp.map(|t| t.0);
sample.qos = qos.0;
sample.attachment = attachment.map(|a| a.0);
sample
}
}
Expand Down
44 changes: 39 additions & 5 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import zenoh
import json
from zenoh import Session, Query, Sample, Priority, CongestionControl
from typing import List, Tuple
from typing import List, Tuple, Optional
import time

SLEEP = 1
Expand Down Expand Up @@ -92,9 +92,9 @@ def sub_callback(sample: Sample):
nonlocal num_received
nonlocal num_errors
if sample.key_expr != keyexpr \
or sample.qos.priority != Priority.DATA_HIGH() \
or sample.qos.congestion_control != CongestionControl.BLOCK() \
or sample.payload != msg:
or sample.qos.priority != Priority.DATA_HIGH() \
or sample.qos.congestion_control != CongestionControl.BLOCK() \
or sample.payload != msg:
num_errors += 1
num_received += 1

Expand Down Expand Up @@ -124,9 +124,43 @@ def sub_callback(sample: Sample):
subscriber.undeclare()


def run_session_attachment(peer01, peer02):
keyexpr = "test_attachment/session"

last_sample: Optional[Sample] = None

def callback(sample: Sample):
nonlocal last_sample
last_sample = sample

print("[A][01d] Publisher on peer01 session");
publisher = peer01.declare_publisher(keyexpr)
time.sleep(SLEEP)

print("[A][02d] Publisher on peer01 session");
subscriber = peer02.declare_subscriber(keyexpr, callback)
time.sleep(SLEEP)

publisher.put("no attachment")
time.sleep(SLEEP)
assert last_sample is not None
assert last_sample.attachment is None

publisher.put("attachment", attachment={"key1": "value1", b"key2": b"value2"})
time.sleep(SLEEP)
assert last_sample.attachment is not None
assert last_sample.attachment.items() == {b"key1": b"value1", b"key2": b"value2"}

print("[A][03d] Undeclare publisher on peer01 session");
publisher.undeclare()
print("[A][04d] Undeclare subscriber on peer02 session");
subscriber.undeclare()


def test_session():
zenoh.init_logger()
(peer01, peer02) = open_session(["tcp/127.0.0.1:17447"])
run_session_qryrep(peer01, peer02)
run_session_pubsub(peer01, peer02)
close_session(peer01, peer02)
run_session_attachment(peer01, peer02)
close_session(peer01, peer02)
Loading

0 comments on commit 9e6bd83

Please sign in to comment.