Skip to content

Commit

Permalink
Implement Roughtime client pseudo-node (#1086)
Browse files Browse the repository at this point in the history
  • Loading branch information
conradgrobler authored Jun 8, 2020
1 parent bfc3a51 commit a2f74a4
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 32 deletions.
2 changes: 1 addition & 1 deletion examples/abitest/module_0/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl FrontendNode {
}),
misconfigured_roughtime: oak::roughtime::Roughtime::new(
&RoughtimeClientConfiguration {
min_overlapping_intervals: 99,
min_overlapping_intervals: Some(99),
..Default::default()
},
),
Expand Down
1 change: 1 addition & 0 deletions oak/proto/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package(
proto_library(
name = "application_proto",
srcs = ["application.proto"],
deps = ["@com_google_protobuf//:wrappers_proto"],
)

cc_proto_library(
Expand Down
12 changes: 7 additions & 5 deletions oak/proto/application.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ syntax = "proto3";

package oak.application;

import "google/protobuf/wrappers.proto";

// An ApplicationConfiguration represents a unit of deployment in Oak.
//
// A running Oak Application instance is built from a collection of
Expand Down Expand Up @@ -118,11 +120,11 @@ message RoughtimeClientConfiguration {
// will be used if this is empty.
repeated RoughtimeServer servers = 1;
// Connection parameters; default values will be used if any parameter is
// zero.
int32 min_overlapping_intervals = 2;
int32 timeout_seconds = 3;
int32 server_retries = 4;
uint32 max_radius_microseconds = 5;
// unset.
google.protobuf.UInt32Value min_overlapping_intervals = 2;
google.protobuf.UInt32Value timeout_seconds = 3;
google.protobuf.UInt32Value server_retries = 4;
google.protobuf.UInt32Value max_radius_microseconds = 5;
}

// Information to identify a particular Roughtime server.
Expand Down
8 changes: 4 additions & 4 deletions oak/proto/roughtime_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ syntax = "proto3";

package oak.roughtime;

message RoughTimeRequest {}
message GetRoughtimeRequest {}

message RoughTimeResponse {
message Roughtime {
// Time is UTC and is given as microseconds since the UNIX epoch (00:00:00 UTC
// on 1 January 1970). Leap seconds are linearly smeared over a 24-hour
// period. That is, the smear extends from UTC noon to noon over 86,401 or
// 86,399 SI seconds, and all the smeared seconds are the same length.
uint64 rough_time_usec = 1;
uint64 roughtime_usec = 1;
}

// Interface exposed by the Roughtime client pseudo-Node to other nodes over a
// pair of Oak Channels.
service RoughtimeService {
rpc GetRoughTime(RoughTimeRequest) returns (RoughTimeResponse);
rpc GetRoughtime(GetRoughtimeRequest) returns (Roughtime);
}
2 changes: 1 addition & 1 deletion oak/server/rust/oak_runtime/src/node/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use oak_abi::proto::google::rpc;
pub mod client;
mod codec;
mod invocation;
pub mod invocation;
pub mod server;

/// Converts [`oak_abi::proto::google::rpc::Status`] to [`tonic::Status`].
Expand Down
6 changes: 3 additions & 3 deletions oak/server/rust/oak_runtime/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ pub fn create_node(
.clone(),
)?))
}
Some(ConfigType::RoughtimeClientConfig(_config)) => {
Ok(Box::new(roughtime::RoughtimeClientNode::new(node_name)))
}
Some(ConfigType::RoughtimeClientConfig(config)) => Ok(Box::new(
roughtime::RoughtimeClientNode::new(node_name, config),
)),
Some(ConfigType::StorageConfig(_config)) => {
Ok(Box::new(storage::StorageNode::new(node_name)))
}
Expand Down
143 changes: 137 additions & 6 deletions oak/server/rust/oak_runtime/src/node/roughtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,163 @@

//! Roughtime client pseudo-Node functionality.
use crate::runtime::RuntimeProxy;
use log::{info, warn};
use crate::{
io::Receiver,
node::grpc::invocation::Invocation,
runtime::RuntimeProxy,
time::{
get_default_servers, RoughtimeClient, RoughtimeServer, DEFAULT_MAX_RADIUS_MICROSECONDS,
DEFAULT_MIN_OVERLAPPING_INTERVALS, DEFAULT_SERVER_RETRIES, DEFAULT_TIMEOUT_SECONDS,
},
};
use log::{debug, error, info};
use oak_abi::{
proto::{
google::rpc::Code,
oak::{
application::RoughtimeClientConfiguration,
encap::GrpcResponse,
roughtime::{GetRoughtimeRequest, Roughtime},
},
},
OakStatus,
};
use prost::Message;
use tokio::sync::oneshot;

/// Roughtime client pseudo-Node.
pub struct RoughtimeClientNode {
node_name: String,
client: RoughtimeClient,
}

impl RoughtimeClientNode {
/// Creates a new [`RoughtimeClientNode`] instance, but does not start it.
pub fn new(node_name: &str) -> Self {
pub fn new(node_name: &str, config: &RoughtimeClientConfiguration) -> Self {
let timeout_seconds = config
.timeout_seconds
.map_or(DEFAULT_TIMEOUT_SECONDS, |value| value as u64);
let server_retries = config
.server_retries
.map_or(DEFAULT_SERVER_RETRIES, |value| value as usize);
let min_overlapping_intervals = config
.min_overlapping_intervals
.map_or(DEFAULT_MIN_OVERLAPPING_INTERVALS, |value| value as usize);
let max_radius_microseconds = config
.max_radius_microseconds
.unwrap_or(DEFAULT_MAX_RADIUS_MICROSECONDS);
let servers = if config.servers.is_empty() {
get_default_servers()
} else {
config
.servers
.iter()
.map(|server| {
RoughtimeServer::new(
&server.name,
&server.host,
server.port as u16,
&server.public_key_base64,
)
})
.collect()
};

Self {
node_name: node_name.to_string(),
client: RoughtimeClient::new(
servers,
min_overlapping_intervals,
max_radius_microseconds,
timeout_seconds,
server_retries,
),
}
}

/// Processes a gRPC invocation.
fn process_invocation(&self, runtime: &RuntimeProxy, invocation: &Invocation) {
match invocation.receive_request(runtime) {
Ok(request) => {
// TODO(#1113): Generate this code automatically.
if request.method_name != "/oak.roughtime.RoughtimeService/GetRoughtime" {
let message = format!("Unknown method_name: {}", request.method_name);
invocation.send_error(Code::NotFound, &message, runtime);
}
if GetRoughtimeRequest::decode(request.req_msg.as_slice()).is_err() {
invocation.send_error(
Code::InvalidArgument,
"Could not parse request.",
runtime,
);
}
}
Err(error) => {
let message = format!("Error reading request: {:?}", error);
invocation.send_error(Code::InvalidArgument, &message, runtime);
}
};

match self.client.get_roughtime() {
Ok(time) => {
let response = Roughtime {
roughtime_usec: time,
};
let mut message = Vec::new();
match response.encode(&mut message) {
Ok(_) => {
let grpc_response = GrpcResponse {
rsp_msg: message,
status: None,
last: true,
};
let _ = invocation
.send_response(grpc_response, runtime)
.map_err(|error| {
error!("Couldn't send the response: {:?}", error);
});
}
Err(error) => {
let message = format!("Error encoding response: {:?}", error);
invocation.send_error(Code::Internal, &message, runtime);
}
}
}
Err(error) => {
let message = format!("Error getting Roughtime: {:?}", error);
invocation.send_error(Code::Internal, &message, runtime);
}
};
}
}

impl super::Node for RoughtimeClientNode {
/// Runs the node.
fn run(
self: Box<Self>,
_runtime: RuntimeProxy,
_handle: oak_abi::Handle,
runtime: RuntimeProxy,
handle: oak_abi::Handle,
_notify_receiver: oneshot::Receiver<()>,
) {
info!("{}: Starting Roughtime pseudo-Node", self.node_name);
warn!("No Roughtime support implemented!");
// Create a [`Receiver`] used for reading gRPC invocations.
let receiver = Receiver::<Invocation>::new(handle);
loop {
debug!("Waiting for gRPC invocation");
// Read a gRPC invocation from the [`Receiver`].
match receiver.receive(&runtime) {
Ok(invocation) => {
self.process_invocation(&runtime, &invocation);
invocation.close(&runtime);
}
Err(OakStatus::ErrTerminated) => {
break;
}
Err(error) => {
error!("Couldn't receive the invocation: {:?}", error);
break;
}
}
}
}
}
10 changes: 10 additions & 0 deletions oak/server/rust/oak_runtime/src/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,16 @@ impl RoughtimeClient {
}

impl RoughtimeServer {
/// Creates a new instance.
pub fn new(name: &str, host: &str, port: u16, public_key_base64: &str) -> Self {
RoughtimeServer {
name: name.to_owned(),
host: host.to_owned(),
port,
public_key_base64: public_key_base64.to_owned(),
}
}

/// Sends a request to a Roughtime server using UDP.
async fn send_roughtime_request(
&self,
Expand Down
5 changes: 4 additions & 1 deletion oak_abi/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
//

fn main() {
oak_utils::compile_protos(
// Exclude generation of service code, as it would require a reference to the Oak SDK to
// compile.
oak_utils::compile_protos_without_services(
&[
"../oak/proto/application.proto",
"../oak/proto/grpc_encap.proto",
"../oak/proto/label.proto",
"../oak/proto/log.proto",
"../oak/proto/oak_abi.proto",
"../oak/proto/roughtime_service.proto",
"../third_party/google/rpc/code.proto",
"../third_party/google/rpc/status.proto",
],
Expand Down
4 changes: 4 additions & 0 deletions oak_abi/src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ pub mod oak {
pub mod log {
include!(concat!(env!("OUT_DIR"), "/oak.log.rs"));
}

pub mod roughtime {
include!(concat!(env!("OUT_DIR"), "/oak.roughtime.rs"));
}
}
20 changes: 17 additions & 3 deletions oak_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ where
{
compile_protos_with(
prost_build::Config::new().out_dir(out_dir),
true,
inputs,
includes,
);
Expand All @@ -178,20 +179,33 @@ pub fn compile_protos<P>(inputs: &[P], includes: &[P])
where
P: AsRef<std::path::Path>,
{
compile_protos_with(&mut prost_build::Config::new(), inputs, includes);
compile_protos_with(&mut prost_build::Config::new(), true, inputs, includes);
}

fn compile_protos_with<P>(prost_config: &mut prost_build::Config, inputs: &[P], includes: &[P])
pub fn compile_protos_without_services<P>(inputs: &[P], includes: &[P])
where
P: AsRef<std::path::Path>,
{
compile_protos_with(&mut prost_build::Config::new(), false, inputs, includes);
}

fn compile_protos_with<P>(
prost_config: &mut prost_build::Config,
generate_services: bool,
inputs: &[P],
includes: &[P],
) where
P: AsRef<std::path::Path>,
{
for input in inputs {
// Tell cargo to rerun this build script if the proto file has changed.
// https://doc.rust-lang.org/cargo/reference/build-scripts.html#cargorerun-if-changedpath
println!("cargo:rerun-if-changed={}", input.as_ref().display());
}
if generate_services {
prost_config.service_generator(Box::new(OakServiceGenerator));
}
prost_config
.service_generator(Box::new(OakServiceGenerator))
// We require label-related types to be comparable and hashable so that they can be used in
// hash-based collections.
.type_attribute(".oak.label", "#[derive(Eq, Hash)]")
Expand Down
4 changes: 1 addition & 3 deletions runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ fn run_examples(opt: &RunExamples) -> Step {
Example {
name: "abitest".to_string(),
rust_module_names: vec!["module_0".to_string(), "module_1".to_string()],
// TODO(#730): reinstate Roughtime tests when Rust runtime supports them.
// TODO(#1040): reinstate storage tests when Rust runtime supports them.
// TODO(#953): reinstate gRPC server server-streaming tests when Rust runtime
// supports them.
Expand All @@ -94,8 +93,7 @@ fn run_examples(opt: &RunExamples) -> Step {
.to_string(),
"--private_key=../../../../../../../../examples/certs/local/local.key"
.to_string(),
"--test_exclude=(Roughtime|Storage|GrpcServerServerStreamingMethod)"
.to_string(),
"--test_exclude=(Storage|GrpcServerServerStreamingMethod)".to_string(),
],
},
Example {
Expand Down
3 changes: 1 addition & 2 deletions scripts/run_example
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ if [[ "${server}" != "none" ]]; then
fi

if [[ "${EXAMPLE}" == 'abitest' ]]; then
# TODO(#730): reinstate Roughtime tests when Rust runtime supports them.
# TODO(#1040): reinstate storage tests when Rust runtime supports them.
# TODO(#953): reinstate gRPC server server-streaming tests when Rust runtime supports them.
readonly ADDITIONAL_ARGS=('--test_exclude=(Roughtime|Storage|GrpcServerServerStreamingMethod)')
readonly ADDITIONAL_ARGS=('--test_exclude=(Storage|GrpcServerServerStreamingMethod)')
elif [[ "${EXAMPLE}" == 'aggregator' ]]; then
readonly ADDITIONAL_ARGS=(
'--bucket=test'
Expand Down
6 changes: 3 additions & 3 deletions sdk/rust/oak/src/roughtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::{
grpc,
proto::oak::roughtime::{RoughTimeRequest, RoughtimeServiceClient},
proto::oak::roughtime::{GetRoughtimeRequest, RoughtimeServiceClient},
};
use oak_abi::proto::oak::application::{
node_configuration::ConfigType, NodeConfiguration, RoughtimeClientConfiguration,
Expand All @@ -44,7 +44,7 @@ impl Roughtime {
/// Get the current Roughtime value as a Duration since UNIX epoch.
/// Note that leap seconds are linearly smeared over 24h.
pub fn get_roughtime(&self) -> grpc::Result<std::time::Duration> {
let rsp = self.client.get_rough_time(RoughTimeRequest {})?;
Ok(std::time::Duration::from_micros(rsp.rough_time_usec))
let rsp = self.client.get_roughtime(GetRoughtimeRequest {})?;
Ok(std::time::Duration::from_micros(rsp.roughtime_usec))
}
}

0 comments on commit a2f74a4

Please sign in to comment.