Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
capture token
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Sep 6, 2023
1 parent a2e0cba commit 41ae7ed
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 24 deletions.
67 changes: 43 additions & 24 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use anyhow::{anyhow, Result};
use bytes::Bytes;

use axum::{http::StatusCode, Json};
use axum::body::HttpBody;
// TODO: stream this instead
use axum::extract::{Query, State};
use axum::http::HeaderMap;
use axum_client_ip::InsecureClientIp;
use base64::Engine;
use serde_json::Value;

use crate::{
api::{CaptureResponse,CaptureResponseCode},
Expand Down Expand Up @@ -61,7 +63,10 @@ pub async fn event(
if events.is_empty() {
return Err((StatusCode::BAD_REQUEST, String::from("No events in batch")));
}

match extract_and_verify_token(&events) {
Ok(token) => meta.token = Some(token),
Err(msg) => return Err((StatusCode::UNAUTHORIZED, msg)),
}
let processed = process_events(state.sink.clone(), &events, &meta).await;

if let Err(msg) = processed {
Expand Down Expand Up @@ -94,10 +99,41 @@ pub fn process_single_event(event: &RawEvent, query: &EventQuery) -> Result<Proc
data: String::from("hallo I am some data 😊"),
now: query.now.clone().unwrap_or_default(),
sent_at: String::new(),
token: String::from("tokentokentoken"),
token: query.token.clone().unwrap_or_default(),
})
}

pub fn extract_and_verify_token(
events: &[RawEvent]
) -> Result<String, String> {
let mut request_token: Option<String> = None;

// Collect the token from the batch, detect multiples to reject request
for event in events {
let event_token = match &event.token {
Some(value) => value.clone(),
None => match event.properties.get("token").map(Value::as_str) {
Some(Some(value)) => value.to_string(),
_ => {
return Err("event with no token".into());
}
}
};
if let Some(token) = &request_token {
if !token.eq(&event_token) {
return Err("mismatched tokens in batch".into());
}
} else {
if let Err(invalid) = token::validate_token(event_token.as_str()) {
return Err(invalid.reason().to_string());
}
request_token = Some(event_token);
}
}
request_token.ok_or("no token found in request".into())
}


pub async fn process_events(
sink: Arc<dyn sink::EventSink + Send + Sync>,
events: &[RawEvent],
Expand Down Expand Up @@ -155,24 +191,13 @@ pub async fn process_events(

#[cfg(test)]
mod tests {
use crate::sink;
use std::collections::HashMap;
use std::sync::Arc;

use serde_json::json;

use super::process_events;
use crate::event::{EventQuery, RawEvent};
use crate::router::State;
use crate::capture::extract_and_verify_token;
use crate::event::RawEvent;

#[tokio::test]
async fn all_events_have_same_token() {
let state = State {
sink: Arc::new(sink::PrintSink {}),
timesource: Arc::new(crate::time::SystemTime {}),
};
let meta = EventQuery::default();

let events = vec![
RawEvent {
token: Some(String::from("hello")),
Expand All @@ -190,18 +215,12 @@ mod tests {
},
];

let processed = process_events(state.sink, &events, &meta).await;
assert_eq!(processed.is_ok(), true);
let processed = extract_and_verify_token(&events);
assert_eq!(processed.is_ok(), true, "{:?}", processed);
}

#[tokio::test]
async fn all_events_have_different_token() {
let state = State {
sink: Arc::new(sink::PrintSink {}),
timesource: Arc::new(crate::time::SystemTime {}),
};
let meta = EventQuery::default();

let events = vec![
RawEvent {
token: Some(String::from("hello")),
Expand All @@ -219,7 +238,7 @@ mod tests {
},
];

let processed = process_events(state.sink, &events, &meta).await;
let processed = extract_and_verify_token(&events);
assert_eq!(processed.is_err(), true);
}
}
4 changes: 4 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub struct EventQuery {
#[serde(alias = "_")]
pub sent_at: Option<i64>,

#[serde(skip_serializing)]
pub token: Option<String>, // Filled by handler

#[serde(skip_serializing)]
pub now: Option<String>, // Filled by handler from timesource

Expand Down Expand Up @@ -116,6 +119,7 @@ mod tests {
compression: Some(Compression::GzipJs),
lib_version: None,
sent_at: None,
token: None,
now: None,
client_ip: None,
},
Expand Down

0 comments on commit 41ae7ed

Please sign in to comment.