Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add id method #39

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ All user visible changes to `tigerbeetle-unofficial`, `tigerbeetle-unofficial-co



## [master] · unreleased
[master]: /../../tree/v0.5.0%2B0.16.11

[Diff](/../../compare/v0.5.0%2B0.16.11...master) | [Milestone](/../../milestone/2)

### Added

- `id()` function generating [TigerBeetle Time-Based Identifiers](https://docs.tigerbeetle.com/coding/data-modeling#tigerbeetle-time-based-identifiers-recommended). ([#39])

[#39]: /../../pull/39




## [0.5.0+0.16.11] · 2024-12-02
[0.5.0+0.16.11]: /../../tree/v0.5.0%2B0.16.11

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tokio-rt-multi-thread = ["core/tokio-rt-multi-thread"]
[dependencies]
bytemuck = { version = "1.16", features = ["extern_crate_alloc"] }
core = { version = "=0.5.0+0.16.11", package = "tigerbeetle-unofficial-core", path = "core" }
fastrand = "2.3"
tokio = { version = "1.28.1", features = ["sync"] }

[dev-dependencies]
Expand Down
158 changes: 158 additions & 0 deletions src/id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//! [TigerBeetle Time-Based Identifier][0] implementation.
//!
//! [0]: https://docs.tigerbeetle.com/coding/data-modeling#tigerbeetle-time-based-identifiers-recommended

use std::{sync::Mutex, time::UNIX_EPOCH};

/// Returns the current timestamp in milliseconds since [`UNIX_EPOCH`].
///
/// # Panics
///
/// - If the [`SystemTime`] clock went backwards beyond [`UNIX_EPOCH`].
/// - If milliseconds since [`UNIX_EPOCH`] overflow [`u64`].
fn get_current_timestamp() -> u64 {
UNIX_EPOCH
.elapsed()
.unwrap_or_else(|e| panic!("`SystemTime` went backwards beyond `UNIX_EPOCH`: {e}"))
.as_millis()
.try_into()
.unwrap_or_else(|e| panic!("milliseconds since `UNIX_EPOCH` overflow `u64`: {e}"))
}

/// Generates and returns 10 random bytes.
fn generate_random_bytes() -> [u8; 10] {
let mut bytes = [0u8; 10];
fastrand::fill(&mut bytes);
bytes
}

/// Generates a new [TigerBeetle Time-Based Identifier][0].
///
/// [TigerBeetle Time-Based Identifier][0] consists of:
/// - 48 bits of (millisecond) timestamp (high-order bits)
/// - 80 bits of randomness (low-order bits)
///
/// [0]: https://docs.tigerbeetle.com/coding/data-modeling#tigerbeetle-time-based-identifiers-recommended
#[must_use]
pub fn id() -> u128 {
static LAST: Mutex<(u64, [u8; 10])> = Mutex::new((0, [0; 10]));

let (timestamp, random) = {
let timestamp = get_current_timestamp();

// Lock the `Mutex` to ensure that `last_timestamp` is monotonically increasing and
// `last_random` changes each millisecond.
let (last_timestamp, last_random) = &mut *LAST.lock().unwrap();
if timestamp > *last_timestamp {
*last_timestamp = timestamp;
*last_random = generate_random_bytes();
}

// Read out a `u80` from the `last_random` as a `u64` and `u16`.
// PANIC: Unwrapping is OK here, since `mem::size_of<u64>() == 8` and
// `mem::size_of<u16>() == 2`.
let mut random_lo = u64::from_le_bytes(last_random[..8].try_into().unwrap());
let mut random_hi = u16::from_le_bytes(last_random[8..].try_into().unwrap());

// Increment the random bits as a `u80` together, checking for overflow.
random_lo = random_lo.wrapping_add(1);
if random_lo == 0 {
random_hi = random_hi.wrapping_add(1);
if random_hi == 0 {
*last_timestamp = last_timestamp.wrapping_add(1);
}
}

// Write incremented `u80` back to the `last_random`.
last_random[..8].copy_from_slice(&random_lo.to_le_bytes());
last_random[8..].copy_from_slice(&random_hi.to_le_bytes());

(*last_timestamp, *last_random)
};

// Create `u128` from new `timestamp` and `random`.
let mut id = [0u8; 16];
id[0..10].copy_from_slice(&random);
id[10..16].copy_from_slice(&timestamp.to_le_bytes()[..6]);
u128::from_le_bytes(id)
}

#[cfg(test)]
mod id_spec {
use std::{sync::Barrier, thread, time::Duration};

use super::id;

#[test]
fn unique() {
let id1 = id();
let id2 = id();
assert_ne!(id1, id2, "expected: {id1} != {id2}");
}

#[test]
fn monotonic_between_millis() {
let id1 = id();
thread::sleep(Duration::from_millis(2));
let id2 = id();
assert!(id1 < id2, "expected: {id1} < {id2}");
}

#[test]
fn monotonic_within_millis() {
let id1 = id();
thread::sleep(Duration::from_micros(1));
let id2 = id();
assert!(id1 < id2, "expected: {id1} < {id2}");
}

#[test]
fn monotonic_immediately() {
let id1 = id();
let id2 = id();
assert!(id1 < id2, "expected: {id1} < {id2}");
}

// Port of upstream test:
// https://github.com/tigerbeetle/tigerbeetle/blob/0.16.11/src/clients/go/pkg/types/main_test.go#L75-L115
#[test]
fn monotonic_fuzz() {
fn verifier() {
let mut id1 = id();
for i in 0..1_000_000 {
if i % 1_000 == 0 {
thread::sleep(Duration::from_millis(1));
}
let id2 = id();

assert!(id1 < id2, "expected: {id1} < {id2}");

id1 = id2;
}
}

// Verify monotonic IDs locally.
verifier();

// Verify monotonic IDs across multiple threads.
let n = 10;
let barrier = Barrier::new(n);
thread::scope(|s| {
let threads = (0..n)
.map(|i| {
thread::Builder::new()
.name(i.to_string())
.spawn_scoped(s, || {
// Sync up all threads before `verifier()` to maximize contention.
barrier.wait();
verifier();
})
.unwrap()
})
.collect::<Vec<_>>();
for t in threads {
t.join().unwrap();
}
});
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![forbid(unsafe_code)]

mod id;
mod reply;

use error::NewClientError;
Expand All @@ -13,6 +14,8 @@ use core::{

pub use core::{self, account, error, transfer, Account, Packet, QueryFilter, Transfer};

pub use self::id::id;

pub struct Client {
inner: core::Client<&'static Callbacks>,
}
Expand Down
Loading