Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Attachment to API #189

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions examples/z_put.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
default='Put from Python!',
type=str,
help='The value to write.')
parser.add_argument('--attach', '-a', dest='attach',
type=str,
help='The key-values to attach')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
Expand All @@ -60,6 +63,14 @@
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key
value = args.value
attachment = args.attach
if attachment is not None:
attachment = {
k: v
for pair in attachment.split("&")
Mallets marked this conversation as resolved.
Show resolved Hide resolved
for (k, v) in [pair.split("=")]
}


# Zenoh code --- --- --- --- --- --- --- --- --- --- ---
def main():
Expand All @@ -70,7 +81,7 @@ def main():
session = zenoh.open(conf)

print("Putting Data ('{}': '{}')...".format(key, value))
session.put(key, value)
session.put(key, value, attachment=attachment)

# --- Examples of put with other types:

Expand Down Expand Up @@ -103,4 +114,5 @@ def main():

session.close()

main()

main()
7 changes: 4 additions & 3 deletions examples/z_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key


# Zenoh code --- --- --- --- --- --- --- --- --- --- ---
def main():
# initiate logging
Expand All @@ -68,10 +69,8 @@ def main():

print("Declaring Subscriber on '{}'...".format(key))


def listener(sample: Sample):
print(f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}')")


# WARNING, you MUST store the return value in order for the subscription to work!!
# This is because if you don't, the reference counter will reach 0 and the subscription
Expand All @@ -86,4 +85,6 @@ def listener(sample: Sample):
# the reference counter reaches 0
# sub.undeclare()
# session.close()
main()


main()
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ fn zenoh(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<queryable::_Query>()?;
m.add_class::<queryable::_Queryable>()?;
m.add_class::<value::_Value>()?;
m.add_class::<value::_Attachment>()?;
m.add_class::<value::_Sample>()?;
m.add_class::<value::_QoS>()?;
m.add_class::<value::_Reply>()?;
Expand Down
25 changes: 22 additions & 3 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::enums::{
};
use crate::keyexpr::{_KeyExpr, _Selector};
use crate::queryable::{_Query, _Queryable};
use crate::value::{_Hello, _Reply, _Sample, _Value, _ZenohId};
use crate::value::{_Attachment, _Hello, _Reply, _Sample, _Value, _ZenohId};
use crate::{PyAnyToValue, PyExtract, ToPyErr};

#[pyclass(subclass)]
Expand Down Expand Up @@ -88,6 +88,11 @@ impl _Session {
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
match kwargs.extract_item::<_Attachment>("attachment") {
Ok(attachment) => builder = builder.with_attachment(attachment.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
Expand Down Expand Up @@ -115,6 +120,11 @@ impl _Session {
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
match kwargs.extract_item::<_Attachment>("attachment") {
Ok(attachment) => builder = builder.with_attachment(attachment.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
Expand Down Expand Up @@ -144,6 +154,11 @@ impl _Session {
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
match kwargs.extract_item::<_Attachment>("attachment") {
Ok(attachment) => builder = builder.with_attachment(attachment.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
Expand Down Expand Up @@ -275,8 +290,12 @@ impl _Publisher {
pub fn key_expr(&self) -> _KeyExpr {
_KeyExpr(self.0.key_expr().clone())
}
pub fn put(&self, value: _Value) -> PyResult<()> {
self.0.put(value).res_sync().map_err(|e| e.to_pyerr())
pub fn put(&self, value: _Value, attachment: Option<_Attachment>) -> PyResult<()> {
let mut builder = self.0.put(value);
if let Some(attachment) = attachment {
builder = builder.with_attachment(attachment.0);
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
pub fn delete(&self) -> PyResult<()> {
self.0.delete().res_sync().map_err(|e| e.to_pyerr())
Expand Down
72 changes: 71 additions & 1 deletion src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
// Contributors:
// ZettaScale Zenoh team, <zenoh@zettascale.tech>

use pyo3::{prelude::*, types::PyBytes};
use pyo3::{
prelude::*,
types::{PyBytes, PyDict},
};
use std::collections::HashMap;
use uhlc::Timestamp;
use zenoh::sample::{Attachment, AttachmentBuilder};
use zenoh::{
prelude::{Encoding, KeyExpr, Sample, Value, ZenohId},
query::Reply,
Expand Down Expand Up @@ -188,6 +193,60 @@ impl _QoS {
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug)]
pub struct _Attachment(pub Attachment);

#[pymethods]
impl _Attachment {
#[new]
pub fn pynew(this: Self) -> Self {
this
}

#[staticmethod]
fn new(attachment: HashMap<Vec<u8>, Vec<u8>>) -> Self {
Self(attachment.iter().map(|(k, v)| (&k[..], &v[..])).collect())
}

fn is_empty(&self) -> bool {
self.0.is_empty()
}

fn len(&self) -> usize {
self.0.len()
}

fn items<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let items = PyDict::new_bound(py);
for (k, v) in self.0.iter() {
items.set_item(PyBytes::new_bound(py, &k), PyBytes::new_bound(py, &v))?;
}
Ok(items)
}

fn get(&self, key: Vec<u8>) -> Option<Vec<u8>> {
self.0.get(&key).map(|v| v.to_vec())
}

fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
self.0.insert(&key, &value)
}

fn extend(mut this: PyRefMut<Self>, attachment: HashMap<Vec<u8>, Vec<u8>>) -> PyRefMut<Self> {
this.0.extend(
attachment
.iter()
.map(|(k, v)| (&k[..], &v[..]))
.collect::<AttachmentBuilder>(),
);
this
}
fn as_str(&self) -> String {
format!("{:?}", self.0)
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug)]
pub struct _Sample {
Expand All @@ -196,6 +255,7 @@ pub struct _Sample {
kind: _SampleKind,
timestamp: Option<_Timestamp>,
qos: _QoS,
attachment: Option<_Attachment>,
}
impl From<Sample> for _Sample {
fn from(sample: Sample) -> Self {
Expand All @@ -205,6 +265,7 @@ impl From<Sample> for _Sample {
kind,
timestamp,
qos,
attachment,
..
} = sample;
_Sample {
Expand All @@ -213,6 +274,7 @@ impl From<Sample> for _Sample {
kind: _SampleKind(kind),
timestamp: timestamp.map(_Timestamp),
qos: _QoS(qos),
attachment: attachment.map(_Attachment),
}
}
}
Expand Down Expand Up @@ -310,20 +372,26 @@ impl _Sample {
pub fn timestamp(&self) -> Option<_Timestamp> {
self.timestamp
}
#[getter]
pub fn attachment(&self) -> Option<_Attachment> {
self.attachment.clone()
}
#[staticmethod]
pub fn new(
key_expr: _KeyExpr,
value: _Value,
qos: _QoS,
kind: _SampleKind,
timestamp: Option<_Timestamp>,
attachment: Option<_Attachment>,
) -> Self {
_Sample {
key_expr: key_expr.0,
value,
qos,
kind,
timestamp,
attachment,
}
}
fn __str__(&self) -> String {
Expand All @@ -339,11 +407,13 @@ impl From<_Sample> for Sample {
kind,
timestamp,
qos,
attachment,
} = sample;
let mut sample = Sample::new(key_expr, value);
sample.kind = kind.0;
sample.timestamp = timestamp.map(|t| t.0);
sample.qos = qos.0;
sample.attachment = attachment.map(|a| a.0);
sample
}
}
Expand Down
44 changes: 39 additions & 5 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import zenoh
import json
from zenoh import Session, Query, Sample, Priority, CongestionControl
from typing import List, Tuple
from typing import List, Tuple, Optional
import time

SLEEP = 1
Expand Down Expand Up @@ -92,9 +92,9 @@ def sub_callback(sample: Sample):
nonlocal num_received
nonlocal num_errors
if sample.key_expr != keyexpr \
or sample.qos.priority != Priority.DATA_HIGH() \
or sample.qos.congestion_control != CongestionControl.BLOCK() \
or sample.payload != msg:
or sample.qos.priority != Priority.DATA_HIGH() \
or sample.qos.congestion_control != CongestionControl.BLOCK() \
or sample.payload != msg:
num_errors += 1
num_received += 1

Expand Down Expand Up @@ -124,9 +124,43 @@ def sub_callback(sample: Sample):
subscriber.undeclare()


def run_session_attachment(peer01, peer02):
keyexpr = "test_attachment/session"

last_sample: Optional[Sample] = None

def callback(sample: Sample):
nonlocal last_sample
last_sample = sample

print("[A][01d] Publisher on peer01 session");
publisher = peer01.declare_publisher(keyexpr)
time.sleep(SLEEP)

print("[A][02d] Publisher on peer01 session");
subscriber = peer02.declare_subscriber(keyexpr, callback)
time.sleep(SLEEP)

publisher.put("no attachment")
time.sleep(SLEEP)
assert last_sample is not None
assert last_sample.attachment is None

publisher.put("attachment", attachment={"key1": "value1", b"key2": b"value2"})
time.sleep(SLEEP)
assert last_sample.attachment is not None
assert last_sample.attachment.items() == {b"key1": b"value1", b"key2": b"value2"}

print("[A][03d] Undeclare publisher on peer01 session");
publisher.undeclare()
print("[A][04d] Undeclare subscriber on peer02 session");
subscriber.undeclare()


def test_session():
zenoh.init_logger()
(peer01, peer02) = open_session(["tcp/127.0.0.1:17447"])
run_session_qryrep(peer01, peer02)
run_session_pubsub(peer01, peer02)
close_session(peer01, peer02)
run_session_attachment(peer01, peer02)
close_session(peer01, peer02)
Loading