-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathdata_stream.rs
407 lines (354 loc) · 13 KB
/
data_stream.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
//! Stabilizer data stream capabilities
//!
//! # Design
//! Data streamining utilizes UDP packets to send data streams at high throughput.
//! Packets are always sent in a best-effort fashion, and data may be dropped.
//!
//! Stabilizer organizes streamed data into batches within a "Frame" that will be sent as a UDP
//! packet. Each frame consits of a header followed by sequential batch serializations. The packet
//! header is constant for all streaming capabilities, but the serialization format after the header
//! is application-defined.
//!
//! ## Frame Header
//! The header consists of the following, all in little-endian.
//!
//! * **Magic word 0x057B** (u16): a constant to identify Stabilizer streaming data.
//! * **Format Code** (u8): a unique ID that indicates the serialization format of each batch of data
//! in the frame. Refer to [StreamFormat] for further information.
//! * **Batch Count** (u8): the number of batches of data.
//! * **Sequence Number** (u32): an the sequence number of the first batch in the frame.
//! This can be used to determine if and how many stream batches are lost.
//!
//! # Example
//! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception
//! of streamed data.
#![allow(non_camel_case_types)] // https://github.com/rust-embedded/heapless/issues/411
use core::{fmt::Write, mem::MaybeUninit, net::SocketAddr};
use heapless::{
box_pool,
pool::boxed::{Box, BoxBlock},
spsc::{Consumer, Producer, Queue},
String,
};
use num_enum::IntoPrimitive;
use serde::Serialize;
use serde_with::DeserializeFromStr;
use smoltcp_nal::embedded_nal::{nb, UdpClientStack};
use super::NetworkReference;
// Magic first bytes indicating a UDP frame of straming data
const MAGIC: u16 = 0x057B;
// The size of the header, calculated in words.
// The header has a 16-bit magic word, an 8-bit format, 8-bit batch-size, and 32-bit sequence
// number, which corresponds to 8 bytes.
const HEADER_SIZE: usize = 8;
// The number of frames that can be buffered.
const FRAME_COUNT: usize = 4;
// The size of each frame in bytes.
// Ensure the resulting ethernet frame is within the MTU:
// 1500 MTU - 40 IP6 header - 8 UDP header - 32 VPN - 20 IP4
const FRAME_SIZE: usize = 1500 - 40 - 8 - 32 - 20;
// The size of the frame queue must be at least as large as the number of frame buffers. Every
// allocated frame buffer should fit in the queue.
const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2;
type Frame = [MaybeUninit<u8>; FRAME_SIZE];
box_pool!(FRAME_POOL: Frame);
/// Represents the destination for the UDP stream to send data to.
///
/// # Miniconf
/// `<addr>:<port>`
///
/// * `<addr>` is an IPv4 address. E.g. `192.168.0.1`
/// * `<port>` is any unsigned 16-bit value.
///
/// ## Example
/// `192.168.0.1:1234`
#[derive(Copy, Clone, Debug, DeserializeFromStr, PartialEq, Eq)]
pub struct StreamTarget(pub SocketAddr);
impl Default for StreamTarget {
fn default() -> Self {
Self("0.0.0.0:0".parse().unwrap())
}
}
impl Serialize for StreamTarget {
fn serialize<S: serde::Serializer>(
&self,
serializer: S,
) -> Result<S::Ok, S::Error> {
let mut display: String<30> = String::new();
write!(&mut display, "{}", self.0).unwrap();
serializer.serialize_str(&display)
}
}
impl core::str::FromStr for StreamTarget {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let addr = SocketAddr::from_str(s)
.map_err(|_| "Invalid socket address format")?;
Ok(Self(addr))
}
}
/// Specifies the format of streamed data
#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoPrimitive)]
pub enum StreamFormat {
/// Reserved, unused format specifier.
Unknown = 0,
/// ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format.
///
/// # Example
/// With a batch size of 2, the serialization would take the following form:
/// ```
/// <ADC0[0]> <ADC0[1]> <ADC1[0]> <ADC1[1]> <DAC0[0]> <DAC0[1]> <DAC1[0]> <DAC1[1]>
/// ```
AdcDacData = 1,
/// FLS (fiber length stabilization) format. See the FLS application.
Fls = 2,
/// Thermostat-EEM data. See `thermostat-eem` repo and application.
ThermostatEem = 3,
}
/// Configure streaming on a device.
///
/// # Args
/// * `stack` - A reference to the shared network stack.
///
/// # Returns
/// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The
/// `stream` is the logically consumer (UDP transmitter) of the enqueued data.
pub fn setup_streaming(
stack: NetworkReference,
) -> (FrameGenerator, DataStream) {
// The queue needs to be at least as large as the frame count to ensure that every allocated
// frame can potentially be enqueued for transmission.
let queue =
cortex_m::singleton!(: Queue<StreamFrame, FRAME_QUEUE_SIZE> = Queue::new())
.unwrap();
let (producer, consumer) = queue.split();
#[allow(clippy::declare_interior_mutable_const)]
const FRAME: BoxBlock<Frame> = BoxBlock::new();
let memory =
cortex_m::singleton!(FRAME_DATA: [BoxBlock<Frame>; FRAME_COUNT] =
[FRAME; FRAME_COUNT])
.unwrap();
for block in memory.iter_mut() {
FRAME_POOL.manage(block);
}
let generator = FrameGenerator::new(producer);
let stream = DataStream::new(stack, consumer);
(generator, stream)
}
#[derive(Debug)]
struct StreamFrame {
buffer: Box<FRAME_POOL>,
offset: usize,
batches: u8,
}
impl StreamFrame {
pub fn new(
mut buffer: Box<FRAME_POOL>,
format_id: u8,
sequence_number: u32,
) -> Self {
for (byte, buf) in MAGIC
.to_le_bytes()
.iter()
.chain(&[format_id, 0])
.chain(sequence_number.to_le_bytes().iter())
.zip(buffer.iter_mut())
{
buf.write(*byte);
}
Self {
buffer,
offset: HEADER_SIZE,
batches: 0,
}
}
pub fn add_batch<F>(&mut self, mut f: F) -> usize
where
F: FnMut(&mut [MaybeUninit<u8>]) -> usize,
{
let len = f(&mut self.buffer[self.offset..]);
self.offset += len;
self.batches += 1;
len
}
pub fn is_full(&self, len: usize) -> bool {
self.offset + len > self.buffer.len()
}
pub fn finish(&mut self) -> &[MaybeUninit<u8>] {
self.buffer[3].write(self.batches);
&self.buffer[..self.offset]
}
}
/// The data generator for a stream.
pub struct FrameGenerator {
queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
current_frame: Option<StreamFrame>,
sequence_number: u32,
format: u8,
}
impl FrameGenerator {
fn new(queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>) -> Self {
Self {
queue,
format: StreamFormat::Unknown.into(),
current_frame: None,
sequence_number: 0,
}
}
/// Configure the format of the stream.
///
/// # Note:
/// This function shall only be called once upon initializing streaming
///
/// # Args
/// * `format` - The desired format of the stream.
#[doc(hidden)]
pub(crate) fn configure(&mut self, format: impl Into<u8>) {
self.format = format.into();
}
/// Add a batch to the current stream frame.
///
/// # Args
/// * `f` - A closure that will be provided the buffer to write batch data into.
/// Returns the number of bytes written.
pub fn add<F>(&mut self, func: F)
where
F: FnMut(&mut [MaybeUninit<u8>]) -> usize,
{
let sequence_number = self.sequence_number;
self.sequence_number = self.sequence_number.wrapping_add(1);
let current_frame = match self.current_frame.as_mut() {
None => {
if let Ok(buffer) =
FRAME_POOL.alloc([MaybeUninit::uninit(); FRAME_SIZE])
{
self.current_frame.insert(StreamFrame::new(
buffer,
self.format,
sequence_number,
))
} else {
return;
}
}
Some(frame) => frame,
};
let len = current_frame.add_batch(func);
if current_frame.is_full(len) {
// Note(unwrap): The queue is designed to be at least as large as the frame buffer
// count, so this enqueue should always succeed.
if let Some(frame) = self.current_frame.take() {
self.queue.enqueue(frame).unwrap();
}
}
}
}
/// The "consumer" portion of the data stream.
///
/// # Note
/// This is responsible for consuming data and sending it over UDP.
pub struct DataStream {
stack: NetworkReference,
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
remote: StreamTarget,
}
impl DataStream {
/// Construct a new data streamer.
///
/// # Args
/// * `stack` - A reference to the shared network stack.
/// * `consumer` - The read side of the queue containing data to transmit.
/// * `frame_pool` - The Pool to return stream frame objects into.
fn new(
stack: NetworkReference,
consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
) -> Self {
Self {
stack,
socket: None,
remote: StreamTarget::default(),
queue: consumer,
}
}
fn close(&mut self) {
if let Some(socket) = self.socket.take() {
log::info!("Closing stream");
// Note(unwrap): We guarantee that the socket is available above.
self.stack.close(socket).unwrap();
}
}
// Open new socket.
fn open(&mut self) -> Result<(), ()> {
// If there is already a socket of if remote address is unspecified,
// do not open a new socket.
if self.socket.is_some() || self.remote.0.ip().is_unspecified() {
return Err(());
}
let mut socket = self.stack.socket().or(Err(()))?;
// We may fail to connect if we don't have an IP address yet.
if self.stack.connect(&mut socket, self.remote.0).is_err() {
self.stack.close(socket).unwrap();
return Err(());
}
self.socket.replace(socket);
log::info!("Opening stream");
Ok(())
}
/// Configure the remote endpoint of the stream.
///
/// # Args
/// * `remote` - The destination to send stream data to.
pub fn set_remote(&mut self, remote: StreamTarget) {
// Close socket to be reopened if the remote has changed.
if remote != self.remote {
self.close();
}
self.remote = remote;
}
/// Process any data for transmission.
pub fn process(&mut self) {
match self.socket.as_mut() {
None => {
// If there's no socket available, try to connect to our remote.
if self.open().is_ok() {
// If we just successfully opened the socket, flush old data from queue.
while let Some(frame) = self.queue.dequeue() {
drop(frame.buffer);
}
}
}
Some(handle) => {
if let Some(mut frame) = self.queue.dequeue() {
// Transmit the frame and return it to the pool.
let buf = frame.finish();
let data = unsafe {
core::slice::from_raw_parts(
buf.as_ptr() as *const u8,
size_of_val(buf),
)
};
// If we fail to send, it can only be because the socket got closed on us (i.e.
// address update due to DHCP). If this happens, reopen the socket.
match self.stack.send(handle, data) {
Ok(_) => {},
// Our IP address may have changedm so handle reopening the UDP stream.
Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::Unaddressable))) => {
log::warn!( "IP address updated during stream. Reopening socket");
let socket = self.socket.take().unwrap();
self.stack.close(socket).unwrap();
}
// The buffer should clear up once ICMP resolves the IP address, so ignore
// this error.
Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::BufferFull))) => {}
Err(other) => {
log::warn!("Unexpected UDP error during data stream: {other:?}");
}
}
drop(frame.buffer)
}
}
}
}
}