Skip to content

Commit

Permalink
Merge pull request #450 from divviup/timg/protocol-message-decoder
Browse files Browse the repository at this point in the history
`janus_cli`: add DAP message decoder
  • Loading branch information
tgeoghegan authored Sep 1, 2022
2 parents 54c9643 + d2f0e96 commit 03c32a4
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 62 deletions.
157 changes: 127 additions & 30 deletions janus_server/src/bin/janus_cli.rs
Original file line number Diff line number Diff line change
@@ -1,100 +1,144 @@
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use base64::STANDARD_NO_PAD;
use deadpool_postgres::Pool;
use janus_core::time::{Clock, RealClock};
use janus_core::{
message::{HpkeConfig, Report},
time::{Clock, RealClock},
};
use janus_server::{
binary_utils::{database_pool, datastore, read_config, BinaryOptions, CommonBinaryOptions},
binary_utils::{database_pool, datastore, read_config, CommonBinaryOptions},
config::{BinaryConfig, CommonConfig},
datastore::{self, Datastore},
message::{
AggregateContinueReq, AggregateContinueResp, AggregateInitializeReq,
AggregateInitializeResp, AggregateShareReq, AggregateShareResp, CollectReq, CollectResp,
},
metrics::install_metrics_exporter,
task::Task,
trace::install_trace_subscriber,
trace::{install_trace_subscriber, TraceConfiguration},
};
use k8s_openapi::api::core::v1::Secret;
use kube::api::{ObjectMeta, PostParams};
use prio::codec::Decode;
use rand::{thread_rng, Rng};
use ring::aead::AES_128_GCM;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
fmt::Debug,
fs::File,
io::{stdin, Cursor, Read},
path::{Path, PathBuf},
sync::Arc,
};
use structopt::StructOpt;
use tokio::fs;
use tracing::info;
use tracing::{debug, info};

static SCHEMA: &str = include_str!("../../../db/schema.sql");

#[tokio::main]
async fn main() -> Result<()> {
// Parse options, then read & parse config.
let options = Options::from_args();
let config: Config = read_config(&options)?;

// Install tracing/metrics handlers.
install_trace_subscriber(&config.common_config.logging_config)
.context("couldn't install tracing subscriber")?;
let _metrics_exporter = install_metrics_exporter(&config.common_config.metrics_config)
.context("failed to install metrics exporter")?;

info!(common_options = ?options.common_options(), ?config, "Starting up");
debug!(?options, "Starting up");

options.cmd.execute(&options, &config).await
options.cmd.execute().await
}

#[derive(Debug, StructOpt)]
enum Command {
/// Write the Janus database schema to the database.
WriteSchema,
WriteSchema {
#[structopt(flatten)]
common_options: CommonBinaryOptions,
},

/// Write a set of tasks identified in a file to the datastore.
ProvisionTasks {
#[structopt(flatten)]
common_options: CommonBinaryOptions,

/// A YAML file containing a list of tasks to be written. Existing tasks (matching by task
/// ID) will be overwritten.
tasks_file: PathBuf,
},

/// Create a datastore key and write it to a Kubernetes secret.
CreateDatastoreKey {
#[structopt(flatten)]
common_options: CommonBinaryOptions,

/// The Kubernetes namespace to create the datastore key secret in.
k8s_namespace: String,

/// The name of the Kubernetes secret to place the datastore key in.
k8s_secret_name: String,
},

/// Decode a single Distributed Aggregation Protocol message.
DecodeDapMessage {
/// Path to file containing message to debug. Pass "-" to read from stdin.
message_file: String,

/// Media type of the message to decode.
#[structopt(long, short = "t", required = true, possible_values(&[
"hpke-config",
"report",
"aggregate-initialize-req",
"aggregate-initialize-resp",
"aggregate-continue-req",
"aggregate-continue-resp",
"aggregate-share-req",
"aggregate-share-resp",
"collect-req",
"collect-resp",
]))]
media_type: String,
},
}

impl Command {
async fn execute(&self, options: &Options, config: &Config) -> Result<()> {
async fn execute(&self) -> Result<()> {
// Note: to keep this function reasonably-readable, individual command handlers should
// generally create the command's dependencies based on options/config, then call another
// function with the main command logic.
match self {
Command::WriteSchema => {
Command::WriteSchema { common_options } => {
let config: Config = read_config(common_options)?;
install_tracing_and_metrics_handlers(config.common_config())?;
let pool = database_pool(
&config.common_config.database,
options.common.database_password.as_deref(),
common_options.database_password.as_deref(),
)
.await?;
write_schema(&pool).await
}

Command::ProvisionTasks { tasks_file } => {
Command::ProvisionTasks {
common_options,
tasks_file,
} => {
let config: Config = read_config(common_options)?;
install_tracing_and_metrics_handlers(config.common_config())?;
let pool = database_pool(
&config.common_config.database,
options.common.database_password.as_deref(),
common_options.database_password.as_deref(),
)
.await?;
let datastore =
datastore(pool, RealClock::default(), &options.common.datastore_keys)?;
datastore(pool, RealClock::default(), &common_options.datastore_keys)?;
provision_tasks(&datastore, tasks_file).await
}

Command::CreateDatastoreKey {
common_options,
k8s_namespace,
k8s_secret_name,
} => {
let config: Config = read_config(common_options)?;
install_tracing_and_metrics_handlers(config.common_config())?;
create_datastore_key(
kube::Client::try_default()
.await
Expand All @@ -104,10 +148,29 @@ impl Command {
)
.await
}

Command::DecodeDapMessage {
message_file,
media_type,
} => {
install_trace_subscriber(&TraceConfiguration::default())?;
let decoded = decode_dap_message(message_file, media_type)?;
println!("{decoded:#?}");
Ok(())
}
}
}
}

fn install_tracing_and_metrics_handlers(config: &CommonConfig) -> Result<()> {
install_trace_subscriber(&config.logging_config)
.context("couldn't install tracing subscriber")?;
let _metrics_exporter = install_metrics_exporter(&config.metrics_config)
.context("failed to install metrics exporter")?;

Ok(())
}

async fn write_schema(pool: &Pool) -> Result<()> {
info!("Writing database schema");
let db_client = pool.get().await.context("couldn't get database client")?;
Expand Down Expand Up @@ -189,6 +252,49 @@ async fn create_datastore_key(
Ok(())
}

/// Decode the contents of `message_file` as a DAP message with `media_type`, returning the decoded
/// object.
fn decode_dap_message(message_file: &str, media_type: &str) -> Result<Box<dyn Debug>> {
let mut reader = if message_file.eq("-") {
Box::new(stdin()) as Box<dyn Read>
} else {
Box::new(File::open(message_file)?) as Box<dyn Read>
};

let mut message_buf = vec![];
reader.read_to_end(&mut message_buf)?;

let mut binary_message = Cursor::new(message_buf.as_slice());

let decoded = match media_type {
"hpke-config" => Box::new(HpkeConfig::decode(&mut binary_message)?) as Box<dyn Debug>,
"report" => Box::new(Report::decode(&mut binary_message)?) as Box<dyn Debug>,
"aggregate-initialize-req" => {
Box::new(AggregateInitializeReq::decode(&mut binary_message)?) as Box<dyn Debug>
}
"aggregate-initialize-resp" => {
Box::new(AggregateInitializeResp::decode(&mut binary_message)?) as Box<dyn Debug>
}
"aggregate-continue-req" => {
Box::new(AggregateContinueReq::decode(&mut binary_message)?) as Box<dyn Debug>
}
"aggregate-continue-resp" => {
Box::new(AggregateContinueResp::decode(&mut binary_message)?) as Box<dyn Debug>
}
"aggregate-share-req" => {
Box::new(AggregateShareReq::decode(&mut binary_message)?) as Box<dyn Debug>
}
"aggregate-share-resp" => {
Box::new(AggregateShareResp::decode(&mut binary_message)?) as Box<dyn Debug>
}
"collect-req" => Box::new(CollectReq::decode(&mut binary_message)?) as Box<dyn Debug>,
"collect-resp" => Box::new(CollectResp::decode(&mut binary_message)?) as Box<dyn Debug>,
_ => return Err(anyhow!("unknown media type")),
};

Ok(decoded)
}

#[derive(Debug, StructOpt)]
#[structopt(
name = "janus_cli",
Expand All @@ -197,19 +303,10 @@ async fn create_datastore_key(
version = env!("CARGO_PKG_VERSION"),
)]
struct Options {
#[structopt(flatten)]
common: CommonBinaryOptions,

#[structopt(subcommand)]
cmd: Command,
}

impl BinaryOptions for Options {
fn common_options(&self) -> &CommonBinaryOptions {
&self.common
}
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
struct Config {
#[serde(flatten)]
Expand Down
46 changes: 14 additions & 32 deletions janus_server/src/binary_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,47 +31,29 @@ use tokio_postgres::NoTls;
use tracing::info;
use warp::Filter;

/// Reads, parses, and returns the config referenced by the given options.
pub fn read_config<Options: BinaryOptions, Config: BinaryConfig>(
options: &Options,
) -> Result<Config> {
let config_content =
fs::read_to_string(&options.common_options().config_file).with_context(|| {
format!(
"couldn't read config file {:?}",
options.common_options().config_file
)
})?;
let mut config: Config = serde_yaml::from_str(&config_content).with_context(|| {
format!(
"couldn't parse config file {:?}",
options.common_options().config_file
)
})?;
/// Reads, parses, and returns the config referenced by the given options, or None if no config file
/// path was set.
pub fn read_config<Config: BinaryConfig>(options: &CommonBinaryOptions) -> Result<Config> {
let config_content = fs::read_to_string(&options.config_file)
.with_context(|| format!("couldn't read config file {:?}", options.config_file))?;
let mut config: Config = serde_yaml::from_str(&config_content)
.with_context(|| format!("couldn't parse config file {:?}", options.config_file))?;

if let Some(OpenTelemetryTraceConfiguration::Otlp(otlp_config)) = &mut config
.common_config_mut()
.logging_config
.open_telemetry_config
{
otlp_config.metadata.extend(
options
.common_options()
.otlp_tracing_metadata
.iter()
.cloned(),
);
otlp_config
.metadata
.extend(options.otlp_tracing_metadata.iter().cloned());
}
if let Some(MetricsExporterConfiguration::Otlp(otlp_config)) =
&mut config.common_config_mut().metrics_config.exporter
{
otlp_config.metadata.extend(
options
.common_options()
.otlp_metrics_metadata
.iter()
.cloned(),
);
otlp_config
.metadata
.extend(options.otlp_metrics_metadata.iter().cloned());
}

Ok(config)
Expand Down Expand Up @@ -266,7 +248,7 @@ where
{
// Parse arguments, then read & parse config.
let options = Options::from_args();
let config: Config = read_config(&options)?;
let config: Config = read_config(options.common_options())?;

// Install tracing/metrics handlers.
install_trace_subscriber(&config.common_config().logging_config)
Expand Down

0 comments on commit 03c32a4

Please sign in to comment.