Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Attachment to API #189

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/z_put.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
key = args.key
value = args.value


# Zenoh code --- --- --- --- --- --- --- --- --- --- ---
def main():
# initiate logging
Expand All @@ -70,7 +71,7 @@ def main():
session = zenoh.open(conf)

print("Putting Data ('{}': '{}')...".format(key, value))
session.put(key, value)
session.put(key, value, attachment={'key': b'value'})
wyfo marked this conversation as resolved.
Show resolved Hide resolved

# --- Examples of put with other types:

Expand Down Expand Up @@ -103,4 +104,5 @@ def main():

session.close()

main()

main()
7 changes: 4 additions & 3 deletions examples/z_sub.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if we could show how to get attachments on the subscriber's side, but it seems like zenoh Rust doesn't show this either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I've tested it using z_put.py and z_sub.py. I've let the attachment in z_put.py, but I removed it from z_pub.py because it was messing with the tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust added it in protocol_changes, so eventually it will come. Not on this PR though.

Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key


# Zenoh code --- --- --- --- --- --- --- --- --- --- ---
def main():
# initiate logging
Expand All @@ -68,10 +69,8 @@ def main():

print("Declaring Subscriber on '{}'...".format(key))


def listener(sample: Sample):
print(f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}')")


# WARNING, you MUST store the return value in order for the subscription to work!!
# This is because if you don't, the reference counter will reach 0 and the subscription
Expand All @@ -86,4 +85,6 @@ def listener(sample: Sample):
# the reference counter reaches 0
# sub.undeclare()
# session.close()
main()


main()
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ fn zenoh(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<queryable::_Query>()?;
m.add_class::<queryable::_Queryable>()?;
m.add_class::<value::_Value>()?;
m.add_class::<value::_Attachment>()?;
m.add_class::<value::_Sample>()?;
m.add_class::<value::_QoS>()?;
m.add_class::<value::_Reply>()?;
Expand Down
25 changes: 22 additions & 3 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::enums::{
};
use crate::keyexpr::{_KeyExpr, _Selector};
use crate::queryable::{_Query, _Queryable};
use crate::value::{_Hello, _Reply, _Sample, _Value, _ZenohId};
use crate::value::{_Attachment, _Hello, _Reply, _Sample, _Value, _ZenohId};
use crate::{PyAnyToValue, PyExtract, ToPyErr};

#[pyclass(subclass)]
Expand Down Expand Up @@ -88,6 +88,11 @@ impl _Session {
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
match kwargs.extract_item::<_Attachment>("attachment") {
Ok(attachment) => builder = builder.with_attachment(attachment.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
Expand Down Expand Up @@ -115,6 +120,11 @@ impl _Session {
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
match kwargs.extract_item::<_Attachment>("attachment") {
Ok(attachment) => builder = builder.with_attachment(attachment.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
Expand Down Expand Up @@ -144,6 +154,11 @@ impl _Session {
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
match kwargs.extract_item::<_Attachment>("attachment") {
Ok(attachment) => builder = builder.with_attachment(attachment.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
Expand Down Expand Up @@ -275,8 +290,12 @@ impl _Publisher {
pub fn key_expr(&self) -> _KeyExpr {
_KeyExpr(self.0.key_expr().clone())
}
pub fn put(&self, value: _Value) -> PyResult<()> {
self.0.put(value).res_sync().map_err(|e| e.to_pyerr())
pub fn put(&self, value: _Value, attachment: Option<_Attachment>) -> PyResult<()> {
let mut builder = self.0.put(value);
if let Some(attachment) = attachment {
builder = builder.with_attachment(attachment.0);
}
builder.res_sync().map_err(|e| e.to_pyerr())
}
pub fn delete(&self) -> PyResult<()> {
self.0.delete().res_sync().map_err(|e| e.to_pyerr())
Expand Down
66 changes: 66 additions & 0 deletions src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
// ZettaScale Zenoh team, <zenoh@zettascale.tech>

use pyo3::{prelude::*, types::PyBytes};
use std::collections::HashMap;
use uhlc::Timestamp;
use zenoh::sample::{Attachment, AttachmentBuilder};
use zenoh::{
prelude::{Encoding, KeyExpr, Sample, Value, ZenohId},
query::Reply,
Expand Down Expand Up @@ -188,6 +190,59 @@ impl _QoS {
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug)]
pub struct _Attachment(pub Attachment);

#[pymethods]
impl _Attachment {
#[new]
pub fn pynew(this: Self) -> Self {
this
}

#[staticmethod]
fn new(attachment: HashMap<Vec<u8>, Vec<u8>>) -> Self {
Self(attachment.iter().map(|(k, v)| (&k[..], &v[..])).collect())
}

fn is_empty(&self) -> bool {
self.0.is_empty()
}

fn len(&self) -> usize {
self.0.len()
}

fn items(&self) -> HashMap<Vec<u8>, Vec<u8>> {
self.0
.iter()
.map(|(k, v)| (k.to_vec(), v.to_vec()))
.collect()
}

fn get(&self, key: Vec<u8>) -> Option<Vec<u8>> {
self.0.get(&key).map(|v| v.to_vec())
}

fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
self.0.insert(&key, &value)
}

fn extend(mut this: PyRefMut<Self>, attachment: HashMap<Vec<u8>, Vec<u8>>) -> PyRefMut<Self> {
this.0.extend(
attachment
.iter()
.map(|(k, v)| (&k[..], &v[..]))
.collect::<AttachmentBuilder>(),
);
this
}
fn as_str(&self) -> String {
format!("{:?}", self.0)
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug)]
pub struct _Sample {
Expand All @@ -196,6 +251,7 @@ pub struct _Sample {
kind: _SampleKind,
timestamp: Option<_Timestamp>,
qos: _QoS,
attachment: Option<_Attachment>,
}
impl From<Sample> for _Sample {
fn from(sample: Sample) -> Self {
Expand All @@ -205,6 +261,7 @@ impl From<Sample> for _Sample {
kind,
timestamp,
qos,
attachment,
..
} = sample;
_Sample {
Expand All @@ -213,6 +270,7 @@ impl From<Sample> for _Sample {
kind: _SampleKind(kind),
timestamp: timestamp.map(_Timestamp),
qos: _QoS(qos),
attachment: attachment.map(_Attachment),
}
}
}
Expand Down Expand Up @@ -310,20 +368,26 @@ impl _Sample {
pub fn timestamp(&self) -> Option<_Timestamp> {
self.timestamp
}
#[getter]
pub fn attachment(&self) -> Option<_Attachment> {
self.attachment.clone()
}
#[staticmethod]
pub fn new(
key_expr: _KeyExpr,
value: _Value,
qos: _QoS,
kind: _SampleKind,
timestamp: Option<_Timestamp>,
attachment: Option<_Attachment>,
) -> Self {
_Sample {
key_expr: key_expr.0,
value,
qos,
kind,
timestamp,
attachment,
}
}
fn __str__(&self) -> String {
Expand All @@ -339,11 +403,13 @@ impl From<_Sample> for Sample {
kind,
timestamp,
qos,
attachment,
} = sample;
let mut sample = Sample::new(key_expr, value);
sample.kind = kind.0;
sample.timestamp = timestamp.map(|t| t.0);
sample.qos = qos.0;
sample.attachment = attachment.map(|a| a.0);
sample
}
}
Expand Down
37 changes: 27 additions & 10 deletions zenoh/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .config import Config
from .closures import IntoHandler, Handler, Receiver
from .enums import *
from .value import IntoValue, Value, Sample, Reply, ZenohId
from .value import IntoValue, Value, Sample, Reply, ZenohId, IntoAttachment, Attachment
from .queryable import Queryable, Query


Expand All @@ -29,9 +29,10 @@ class Publisher:
def __init__(self, p: _Publisher):
self._inner_ = p

def put(self, value: IntoValue, encoding: Encoding = None):
"An optimised version of ``session.put(self.key_expr, value, encoding=encoding)``"
self._inner_.put(Value(value, encoding))
def put(self, value: IntoValue, encoding: Encoding = None, attachment: IntoAttachment = None):
"An optimised version of ``session.put(self.key_expr, value, encoding=encoding, attachment=attachment)``"
Mallets marked this conversation as resolved.
Show resolved Hide resolved
attachment = Attachment(attachment) if attachment is not None else attachment
self._inner_.put(Value(value, encoding), attachment)

def delete(self):
"An optimised version of ``session.delete(self.key_expr)``"
Expand Down Expand Up @@ -100,6 +101,7 @@ class Session(_Session):

Note that most applications will only need a single instance of ``Session``. You should _never_ construct one session per publisher/subscriber, as this will significantly increase the size of your Zenoh network, while preventing potential locality-based optimizations.
"""

def __new__(cls, config: Union[Config, Any] = None):
if config is None:
return super().__new__(cls)
Expand All @@ -110,7 +112,7 @@ def __new__(cls, config: Union[Config, Any] = None):

def put(self, keyexpr: IntoKeyExpr, value: IntoValue, encoding=None,
priority: Priority = None, congestion_control: CongestionControl = None,
sample_kind: SampleKind = None):
sample_kind: SampleKind = None, attachment: IntoAttachment = None):
"""
Sends a value over Zenoh.

Expand All @@ -122,6 +124,7 @@ def put(self, keyexpr: IntoKeyExpr, value: IntoValue, encoding=None,
:param priority: The priority to use when routing the published data
:param congestion_control: The congestion control to use when routing the published data
:param sample_kind: The kind of sample to send
:param attachment: The attachment to attach to the value sent
wyfo marked this conversation as resolved.
Show resolved Hide resolved

:Examples:

Expand All @@ -138,6 +141,8 @@ def put(self, keyexpr: IntoKeyExpr, value: IntoValue, encoding=None,
kwargs['congestion_control'] = congestion_control
if sample_kind is not None:
kwargs['sample_kind'] = sample_kind
if attachment is not None:
kwargs['attachment'] = Attachment(attachment)
return super().put(keyexpr, value, **kwargs)

def config(self) -> Config:
Expand All @@ -149,7 +154,8 @@ def config(self) -> Config:
return super().config()

def delete(self, keyexpr: IntoKeyExpr,
priority: Priority = None, congestion_control: CongestionControl = None):
priority: Priority = None, congestion_control: CongestionControl = None,
attachment: IntoAttachment = None):
"""
Deletes the values associated with the keys included in ``keyexpr``.

Expand All @@ -159,6 +165,7 @@ def delete(self, keyexpr: IntoKeyExpr,
:param keyexpr: The key expression to publish
:param priority: The priority to use when routing the delete
:param congestion_control: The congestion control to use when routing the delete
:param attachment: The attachment to attach to the request
wyfo marked this conversation as resolved.
Show resolved Hide resolved

:Examples:

Expand All @@ -172,9 +179,13 @@ def delete(self, keyexpr: IntoKeyExpr,
kwargs['priority'] = priority
if congestion_control is not None:
kwargs['congestion_control'] = congestion_control
if attachment is not None:
kwargs['attachment'] = Attachment(attachment)
return super().delete(keyexpr, **kwargs)

def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver], consolidation: QueryConsolidation = None, target: QueryTarget = None, value: IntoValue = None) -> Receiver:
def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver],
consolidation: QueryConsolidation = None, target: QueryTarget = None, value: IntoValue = None,
attachment: IntoAttachment = None) -> Receiver:
"""
Emits a query, which queryables with intersecting selectors will be able to reply to.

Expand All @@ -187,6 +198,7 @@ def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver]
:param consolidation: The consolidation to apply to replies
:param target: The queryables that should be target to this query
:param value: An optional value to attach to this query
:param attachment: An optional attachment to attach to this query
wyfo marked this conversation as resolved.
Show resolved Hide resolved
:return: The receiver of the handler
:rtype: Receiver

Expand Down Expand Up @@ -225,6 +237,8 @@ def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver]
kwargs["target"] = target
if value is not None:
kwargs["value"] = Value(value)
if attachment is not None:
kwargs["attachment"] = Attachment(attachment)
super().get(Selector(selector), handler.closure, **kwargs)
return handler.receiver

Expand Down Expand Up @@ -276,7 +290,8 @@ def declare_queryable(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Query, An
inner = super().declare_queryable(KeyExpr(keyexpr), handler.closure, **kwargs)
return Queryable(inner, handler.receiver)

def declare_publisher(self, keyexpr: IntoKeyExpr, priority: Priority = None, congestion_control: CongestionControl = None):
def declare_publisher(self, keyexpr: IntoKeyExpr, priority: Priority = None,
congestion_control: CongestionControl = None):
"""
Declares a publisher, which may be used to send values repeatedly onto a same key expression.

Expand All @@ -302,7 +317,8 @@ def declare_publisher(self, keyexpr: IntoKeyExpr, priority: Priority = None, con
kwargs['congestion_control'] = congestion_control
return Publisher(super().declare_publisher(KeyExpr(keyexpr), **kwargs))

def declare_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], reliability: Reliability = None) -> Subscriber:
def declare_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any],
reliability: Reliability = None) -> Subscriber:
"""
Declares a subscriber, which will receive any published sample with a key expression intersecting ``keyexpr``.

Expand Down Expand Up @@ -342,7 +358,8 @@ def declare_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample,
s = super().declare_subscriber(KeyExpr(keyexpr), handler.closure, **kwargs)
return Subscriber(s, handler.receiver)

def declare_pull_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], reliability: Reliability = None) -> PullSubscriber:
def declare_pull_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any],
reliability: Reliability = None) -> PullSubscriber:
"""
Declares a pull-mode subscriber, which will receive a single published sample with a key expression intersecting ``keyexpr`` any time its ``pull`` method is called.

Expand Down
Loading