From 9c95b4eeef06eb0955a722cfef4b49d04888f34e Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 23 May 2023 17:35:39 +0100 Subject: [PATCH] pass uuid and distinct_id --- Cargo.lock | 2 +- Cargo.toml | 3 ++- src/api.rs | 1 - src/capture.rs | 38 +++++++++++++++++++++++++++----------- src/event.rs | 3 ++- src/lib.rs | 1 + src/main.rs | 1 + src/utils.rs | 38 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 72 insertions(+), 15 deletions(-) create mode 100644 src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index bf8e80f..a73eb07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,6 +185,7 @@ dependencies = [ "flate2", "governor", "mockall", + "rand", "rdkafka", "serde", "serde_json", @@ -1621,7 +1622,6 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ - "getrandom", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 6d505b8..3cfaf5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,9 +20,10 @@ bytes = "1" anyhow = "1.0" flate2 = "1.0" base64 = "0.21.1" -uuid = { version = "1.3.3", features = ["serde", "v4"] } +uuid = { version = "1.3.3", features = ["serde"] } async-trait = "0.1.68" serde_urlencoded = "0.7.1" +rand = "0.8.5" rdkafka = { version = "0.34", features = ["cmake-build"] } [dev-dependencies] diff --git a/src/api.rs b/src/api.rs index 09e79ae..6207083 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; - use serde::{Deserialize, Serialize}; use serde_json::Value; diff --git a/src/capture.rs b/src/capture.rs index 541ef31..577a710 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use std::ops::Deref; use std::sync::Arc; -use anyhow::Result; +use anyhow::{anyhow, Result}; use bytes::Bytes; use axum::{http::StatusCode, Json}; @@ -10,15 +10,12 @@ use axum::{http::StatusCode, Json}; use axum::extract::{Query, State}; use axum::http::HeaderMap; use base64::Engine; -use uuid::Uuid; - -use crate::api::CaptureResponseCode; -use crate::event::ProcessedEvent; use crate::{ - api::CaptureResponse, - event::{Event, EventFormData, EventQuery}, + api::{CaptureResponse,CaptureResponseCode}, + event::{Event, EventFormData, EventQuery, ProcessedEvent}, router, sink, token, + utils::uuid_v7 }; pub async fn event( @@ -71,11 +68,22 @@ pub async fn event( })) } -pub fn process_single_event(_event: &Event) -> Result { - // TODO: Put actual data in here and transform it properly +pub fn process_single_event(event: &Event) -> Result { + let distinct_id = match &event.distinct_id { + Some(id) => id, + None => { + match event.properties.get("distinct_id").map(|v| v.as_str()) { + Some(Some(id)) => id, + _ => { + return Err(anyhow!("missing distinct_id")) + }, + } + } + }; + Ok(ProcessedEvent { - uuid: Uuid::new_v4(), - distinct_id: Uuid::new_v4().simple().to_string(), + uuid: event.uuid.unwrap_or_else(uuid_v7), + distinct_id: distinct_id.to_string(), ip: String::new(), site_url: String::new(), data: String::from("hallo I am some data 😊"), @@ -159,11 +167,15 @@ mod tests { let events = vec![ Event { token: Some(String::from("hello")), + distinct_id: Some("testing".to_string()), + uuid: None, event: String::new(), properties: HashMap::new(), }, Event { token: None, + distinct_id: Some("testing".to_string()), + uuid: None, event: String::new(), properties: HashMap::from([(String::from("token"), json!("hello"))]), }, @@ -183,11 +195,15 @@ mod tests { let events = vec![ Event { token: Some(String::from("hello")), + distinct_id: Some("testing".to_string()), + uuid: None, event: String::new(), properties: HashMap::new(), }, Event { token: None, + distinct_id: Some("testing".to_string()), + uuid: None, event: String::new(), properties: HashMap::from([(String::from("token"), json!("goodbye"))]), }, diff --git a/src/event.rs b/src/event.rs index 8cac0ec..db15019 100644 --- a/src/event.rs +++ b/src/event.rs @@ -37,7 +37,8 @@ pub struct EventFormData { pub struct Event { #[serde(alias = "$token", alias = "api_key")] pub token: Option, - + pub distinct_id: Option, + pub uuid: Option, pub event: String, pub properties: HashMap, } diff --git a/src/lib.rs b/src/lib.rs index 641385f..96e9875 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,3 +5,4 @@ pub mod router; pub mod sink; pub mod time; mod token; +mod utils; diff --git a/src/main.rs b/src/main.rs index 77d1e8d..ded32c1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ mod router; mod sink; mod time; mod token; +mod utils; #[tokio::main] async fn main() { diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..6eedc3f --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,38 @@ +use rand::RngCore; +use uuid::Uuid; + +pub fn random_bytes() -> [u8; N] { + let mut ret = [0u8; N]; + rand::thread_rng().fill_bytes(&mut ret); + ret +} + +// basically just ripped from the uuid crate. they have it as unstable, but we can use it fine. +const fn encode_unix_timestamp_millis(millis: u64, random_bytes: &[u8; 10]) -> Uuid { + let millis_high = ((millis >> 16) & 0xFFFF_FFFF) as u32; + let millis_low = (millis & 0xFFFF) as u16; + + let random_and_version = + (random_bytes[0] as u16 | ((random_bytes[1] as u16) << 8) & 0x0FFF) | (0x7 << 12); + + let mut d4 = [0; 8]; + + d4[0] = (random_bytes[2] & 0x3F) | 0x80; + d4[1] = random_bytes[3]; + d4[2] = random_bytes[4]; + d4[3] = random_bytes[5]; + d4[4] = random_bytes[6]; + d4[5] = random_bytes[7]; + d4[6] = random_bytes[8]; + d4[7] = random_bytes[9]; + + Uuid::from_fields(millis_high, millis_low, random_and_version, &d4) +} + +pub fn uuid_v7() -> Uuid { + let bytes = random_bytes(); + let now = time::OffsetDateTime::now_utc(); + let now_millis: u64 = now.unix_timestamp() as u64 * 1_000 + now.millisecond() as u64; + + encode_unix_timestamp_millis(now_millis, &bytes) +} \ No newline at end of file