From 41ae7edc4068c32a0bc1fd199fcb2ff81731cd2c Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 6 Sep 2023 17:37:57 +0200 Subject: [PATCH] capture token --- src/capture.rs | 67 ++++++++++++++++++++++++++++++++------------------ src/event.rs | 4 +++ 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/src/capture.rs b/src/capture.rs index a52f588..e1d6ca8 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -6,11 +6,13 @@ use anyhow::{anyhow, Result}; use bytes::Bytes; use axum::{http::StatusCode, Json}; +use axum::body::HttpBody; // TODO: stream this instead use axum::extract::{Query, State}; use axum::http::HeaderMap; use axum_client_ip::InsecureClientIp; use base64::Engine; +use serde_json::Value; use crate::{ api::{CaptureResponse,CaptureResponseCode}, @@ -61,7 +63,10 @@ pub async fn event( if events.is_empty() { return Err((StatusCode::BAD_REQUEST, String::from("No events in batch"))); } - + match extract_and_verify_token(&events) { + Ok(token) => meta.token = Some(token), + Err(msg) => return Err((StatusCode::UNAUTHORIZED, msg)), + } let processed = process_events(state.sink.clone(), &events, &meta).await; if let Err(msg) = processed { @@ -94,10 +99,41 @@ pub fn process_single_event(event: &RawEvent, query: &EventQuery) -> Result Result { + let mut request_token: Option = None; + + // Collect the token from the batch, detect multiples to reject request + for event in events { + let event_token = match &event.token { + Some(value) => value.clone(), + None => match event.properties.get("token").map(Value::as_str) { + Some(Some(value)) => value.to_string(), + _ => { + return Err("event with no token".into()); + } + } + }; + if let Some(token) = &request_token { + if !token.eq(&event_token) { + return Err("mismatched tokens in batch".into()); + } + } else { + if let Err(invalid) = token::validate_token(event_token.as_str()) { + return Err(invalid.reason().to_string()); + } + request_token = Some(event_token); + } + } + request_token.ok_or("no token found in request".into()) +} + + pub async fn process_events( sink: Arc, events: &[RawEvent], @@ -155,24 +191,13 @@ pub async fn process_events( #[cfg(test)] mod tests { - use crate::sink; use std::collections::HashMap; - use std::sync::Arc; - use serde_json::json; - - use super::process_events; - use crate::event::{EventQuery, RawEvent}; - use crate::router::State; + use crate::capture::extract_and_verify_token; + use crate::event::RawEvent; #[tokio::test] async fn all_events_have_same_token() { - let state = State { - sink: Arc::new(sink::PrintSink {}), - timesource: Arc::new(crate::time::SystemTime {}), - }; - let meta = EventQuery::default(); - let events = vec![ RawEvent { token: Some(String::from("hello")), @@ -190,18 +215,12 @@ mod tests { }, ]; - let processed = process_events(state.sink, &events, &meta).await; - assert_eq!(processed.is_ok(), true); + let processed = extract_and_verify_token(&events); + assert_eq!(processed.is_ok(), true, "{:?}", processed); } #[tokio::test] async fn all_events_have_different_token() { - let state = State { - sink: Arc::new(sink::PrintSink {}), - timesource: Arc::new(crate::time::SystemTime {}), - }; - let meta = EventQuery::default(); - let events = vec![ RawEvent { token: Some(String::from("hello")), @@ -219,7 +238,7 @@ mod tests { }, ]; - let processed = process_events(state.sink, &events, &meta).await; + let processed = extract_and_verify_token(&events); assert_eq!(processed.is_err(), true); } } diff --git a/src/event.rs b/src/event.rs index 6957bb2..2246ea0 100644 --- a/src/event.rs +++ b/src/event.rs @@ -27,6 +27,9 @@ pub struct EventQuery { #[serde(alias = "_")] pub sent_at: Option, + #[serde(skip_serializing)] + pub token: Option, // Filled by handler + #[serde(skip_serializing)] pub now: Option, // Filled by handler from timesource @@ -116,6 +119,7 @@ mod tests { compression: Some(Compression::GzipJs), lib_version: None, sent_at: None, + token: None, now: None, client_ip: None, },