Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
pass uuid and distinct_id
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Sep 6, 2023
1 parent a7c9e89 commit 9c95b4e
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ bytes = "1"
anyhow = "1.0"
flate2 = "1.0"
base64 = "0.21.1"
uuid = { version = "1.3.3", features = ["serde", "v4"] }
uuid = { version = "1.3.3", features = ["serde"] }
async-trait = "0.1.68"
serde_urlencoded = "0.7.1"
rand = "0.8.5"
rdkafka = { version = "0.34", features = ["cmake-build"] }

[dev-dependencies]
Expand Down
1 change: 0 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down
38 changes: 27 additions & 11 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,20 @@ use std::collections::HashSet;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::Result;
use anyhow::{anyhow, Result};
use bytes::Bytes;

use axum::{http::StatusCode, Json};
// TODO: stream this instead
use axum::extract::{Query, State};
use axum::http::HeaderMap;
use base64::Engine;
use uuid::Uuid;

use crate::api::CaptureResponseCode;
use crate::event::ProcessedEvent;

use crate::{
api::CaptureResponse,
event::{Event, EventFormData, EventQuery},
api::{CaptureResponse,CaptureResponseCode},
event::{Event, EventFormData, EventQuery, ProcessedEvent},
router, sink, token,
utils::uuid_v7
};

pub async fn event(
Expand Down Expand Up @@ -71,11 +68,22 @@ pub async fn event(
}))
}

pub fn process_single_event(_event: &Event) -> Result<ProcessedEvent> {
// TODO: Put actual data in here and transform it properly
pub fn process_single_event(event: &Event) -> Result<ProcessedEvent> {
let distinct_id = match &event.distinct_id {
Some(id) => id,
None => {
match event.properties.get("distinct_id").map(|v| v.as_str()) {
Some(Some(id)) => id,
_ => {
return Err(anyhow!("missing distinct_id"))
},
}
}
};

Ok(ProcessedEvent {
uuid: Uuid::new_v4(),
distinct_id: Uuid::new_v4().simple().to_string(),
uuid: event.uuid.unwrap_or_else(uuid_v7),
distinct_id: distinct_id.to_string(),
ip: String::new(),
site_url: String::new(),
data: String::from("hallo I am some data 😊"),
Expand Down Expand Up @@ -159,11 +167,15 @@ mod tests {
let events = vec![
Event {
token: Some(String::from("hello")),
distinct_id: Some("testing".to_string()),
uuid: None,
event: String::new(),
properties: HashMap::new(),
},
Event {
token: None,
distinct_id: Some("testing".to_string()),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("hello"))]),
},
Expand All @@ -183,11 +195,15 @@ mod tests {
let events = vec![
Event {
token: Some(String::from("hello")),
distinct_id: Some("testing".to_string()),
uuid: None,
event: String::new(),
properties: HashMap::new(),
},
Event {
token: None,
distinct_id: Some("testing".to_string()),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("goodbye"))]),
},
Expand Down
3 changes: 2 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ pub struct EventFormData {
pub struct Event {
#[serde(alias = "$token", alias = "api_key")]
pub token: Option<String>,

pub distinct_id: Option<String>,
pub uuid: Option<Uuid>,
pub event: String,
pub properties: HashMap<String, Value>,
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pub mod router;
pub mod sink;
pub mod time;
mod token;
mod utils;
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod router;
mod sink;
mod time;
mod token;
mod utils;

#[tokio::main]
async fn main() {
Expand Down
38 changes: 38 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use rand::RngCore;
use uuid::Uuid;

pub fn random_bytes<const N: usize>() -> [u8; N] {
let mut ret = [0u8; N];
rand::thread_rng().fill_bytes(&mut ret);
ret
}

// basically just ripped from the uuid crate. they have it as unstable, but we can use it fine.
const fn encode_unix_timestamp_millis(millis: u64, random_bytes: &[u8; 10]) -> Uuid {
let millis_high = ((millis >> 16) & 0xFFFF_FFFF) as u32;
let millis_low = (millis & 0xFFFF) as u16;

let random_and_version =
(random_bytes[0] as u16 | ((random_bytes[1] as u16) << 8) & 0x0FFF) | (0x7 << 12);

let mut d4 = [0; 8];

d4[0] = (random_bytes[2] & 0x3F) | 0x80;
d4[1] = random_bytes[3];
d4[2] = random_bytes[4];
d4[3] = random_bytes[5];
d4[4] = random_bytes[6];
d4[5] = random_bytes[7];
d4[6] = random_bytes[8];
d4[7] = random_bytes[9];

Uuid::from_fields(millis_high, millis_low, random_and_version, &d4)
}

pub fn uuid_v7() -> Uuid {
let bytes = random_bytes();
let now = time::OffsetDateTime::now_utc();
let now_millis: u64 = now.unix_timestamp() as u64 * 1_000 + now.millisecond() as u64;

encode_unix_timestamp_millis(now_millis, &bytes)
}

0 comments on commit 9c95b4e

Please sign in to comment.