Skip to content

Commit

Permalink
[CLI] Basic support for journal v2
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Feb 6, 2025
1 parent 7df62da commit c1f5be8
Show file tree
Hide file tree
Showing 10 changed files with 386 additions and 173 deletions.
85 changes: 56 additions & 29 deletions cli/src/clients/datafusion_helpers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::{collections::HashMap, fmt::Display, str::FromStr};

use super::DataFusionHttpClient;
use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Duration, Local};
use clap::ValueEnum;
use restate_admin_rest_model::version::AdminApiVersion;
use restate_types::journal_v2::Entry;
use restate_types::{
identifiers::{AwakeableIdentifier, DeploymentId, ServiceId},
invocation::ServiceType,
};
use serde::Deserialize;
use serde_with::{serde_as, DeserializeAs};

use super::DataFusionHttpClient;

mod v1;
mod v2;

Expand Down Expand Up @@ -183,14 +183,32 @@ impl InvocationCompletion {
}

#[derive(Debug, Clone)]
pub struct JournalEntry {
pub enum JournalEntry {
V1(JournalEntryV1),
V2(JournalEntryV2),
}

impl JournalEntry {
pub fn should_present(&self) -> bool {
match self {
JournalEntry::V1(v1) => v1.should_present(),
JournalEntry::V2(_) => {
// For now in V2 we show all the entries
true
}
}
}
}

#[derive(Debug, Clone)]
pub struct JournalEntryV1 {
pub seq: u32,
pub entry_type: JournalEntryType,
pub entry_type: JournalEntryTypeV1,
completed: bool,
pub name: Option<String>,
}

impl JournalEntry {
impl JournalEntryV1 {
pub fn is_completed(&self) -> bool {
if self.entry_type.is_completable() {
self.completed
Expand All @@ -205,7 +223,7 @@ impl JournalEntry {
}

#[derive(Debug, Clone)]
pub enum JournalEntryType {
pub enum JournalEntryTypeV1 {
Sleep {
wakeup_at: Option<chrono::DateTime<Local>>,
},
Expand All @@ -222,48 +240,57 @@ pub enum JournalEntryType {
Other(String),
}

impl JournalEntryType {
impl JournalEntryTypeV1 {
fn is_completable(&self) -> bool {
matches!(
self,
JournalEntryType::Sleep { .. }
| JournalEntryType::Call(_)
| JournalEntryType::Awakeable(_)
| JournalEntryType::GetState
| JournalEntryType::GetPromise(_)
JournalEntryTypeV1::Sleep { .. }
| JournalEntryTypeV1::Call(_)
| JournalEntryTypeV1::Awakeable(_)
| JournalEntryTypeV1::GetState
| JournalEntryTypeV1::GetPromise(_)
)
}

fn should_present(&self) -> bool {
matches!(
self,
JournalEntryType::Sleep { .. }
| JournalEntryType::Call(_)
| JournalEntryType::OneWayCall(_)
| JournalEntryType::Awakeable(_)
| JournalEntryType::Run
| JournalEntryType::GetPromise(_)
JournalEntryTypeV1::Sleep { .. }
| JournalEntryTypeV1::Call(_)
| JournalEntryTypeV1::OneWayCall(_)
| JournalEntryTypeV1::Awakeable(_)
| JournalEntryTypeV1::Run
| JournalEntryTypeV1::GetPromise(_)
)
}
}

impl Display for JournalEntryType {
impl Display for JournalEntryTypeV1 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
JournalEntryType::Sleep { .. } => write!(f, "Sleep"),
JournalEntryType::Call(_) => write!(f, "Call"),
JournalEntryType::OneWayCall(_) => write!(f, "Send"),
JournalEntryType::Awakeable(_) => write!(f, "Awakeable"),
JournalEntryType::GetState => write!(f, "GetState"),
JournalEntryType::SetState => write!(f, "SetState"),
JournalEntryType::ClearState => write!(f, "ClearState"),
JournalEntryType::Run => write!(f, "Run"),
JournalEntryType::GetPromise(_) => write!(f, "Promise"),
JournalEntryType::Other(s) => write!(f, "{s}"),
JournalEntryTypeV1::Sleep { .. } => write!(f, "Sleep"),
JournalEntryTypeV1::Call(_) => write!(f, "Call"),
JournalEntryTypeV1::OneWayCall(_) => write!(f, "Send"),
JournalEntryTypeV1::Awakeable(_) => write!(f, "Awakeable"),
JournalEntryTypeV1::GetState => write!(f, "GetState"),
JournalEntryTypeV1::SetState => write!(f, "SetState"),
JournalEntryTypeV1::ClearState => write!(f, "ClearState"),
JournalEntryTypeV1::Run => write!(f, "Run"),
JournalEntryTypeV1::GetPromise(_) => write!(f, "Promise"),
JournalEntryTypeV1::Other(s) => write!(f, "{s}"),
}
}
}

#[derive(Debug, Clone)]
pub struct JournalEntryV2 {
pub seq: u32,
pub entry_type: String,
pub name: Option<String>,
pub entry: Option<Entry>,
pub appended_at: Option<chrono::DateTime<Local>>,
}

#[derive(Debug, Clone)]
pub struct OutgoingInvoke {
pub invocation_id: Option<String>,
Expand Down
26 changes: 13 additions & 13 deletions cli/src/clients/datafusion_helpers/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::clients::DataFusionHttpClient;

use super::{
HandlerStateStats, Invocation, InvocationCompletion, InvocationState, JournalEntry,
JournalEntryType, LockedKeyInfo, OutgoingInvoke, ServiceHandlerLockedKeysMap,
JournalEntryTypeV1, JournalEntryV1, LockedKeyInfo, OutgoingInvoke, ServiceHandlerLockedKeysMap,
ServiceHandlerUsage, ServiceStatusMap, SimpleInvocation,
};

Expand Down Expand Up @@ -672,34 +672,34 @@ pub async fn get_invocation_journal(
.map(|row| {
let index = row.index.expect("index");
let entry_type = match row.entry_type.expect("entry_type").as_str() {
"Sleep" => JournalEntryType::Sleep {
"Sleep" => JournalEntryTypeV1::Sleep {
wakeup_at: row.sleep_wakeup_at.map(Into::into),
},
"Call" => JournalEntryType::Call(OutgoingInvoke {
"Call" => JournalEntryTypeV1::Call(OutgoingInvoke {
invocation_id: row.invoked_id,
invoked_target: row.invoked_target,
}),
"OneWayCall" => JournalEntryType::OneWayCall(OutgoingInvoke {
"OneWayCall" => JournalEntryTypeV1::OneWayCall(OutgoingInvoke {
invocation_id: row.invoked_id,
invoked_target: row.invoked_target,
}),
"Awakeable" => {
JournalEntryType::Awakeable(AwakeableIdentifier::new(my_invocation_id, index))
JournalEntryTypeV1::Awakeable(AwakeableIdentifier::new(my_invocation_id, index))
}
"GetState" => JournalEntryType::GetState,
"SetState" => JournalEntryType::SetState,
"ClearState" => JournalEntryType::ClearState,
"Run" => JournalEntryType::Run,
"GetPromise" => JournalEntryType::GetPromise(row.promise_name),
t => JournalEntryType::Other(t.to_owned()),
"GetState" => JournalEntryTypeV1::GetState,
"SetState" => JournalEntryTypeV1::SetState,
"ClearState" => JournalEntryTypeV1::ClearState,
"Run" => JournalEntryTypeV1::Run,
"GetPromise" => JournalEntryTypeV1::GetPromise(row.promise_name),
t => JournalEntryTypeV1::Other(t.to_owned()),
};

JournalEntry {
JournalEntry::V1(JournalEntryV1 {
seq: index,
entry_type,
completed: row.completed.unwrap_or_default(),
name: row.name,
}
})
})
.collect();

Expand Down
98 changes: 63 additions & 35 deletions cli/src/clients/datafusion_helpers/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Local};

use crate::clients::DataFusionHttpClient;
use restate_types::identifiers::DeploymentId;
use restate_types::identifiers::{AwakeableIdentifier, InvocationId, ServiceId};
use serde::Deserialize;
use serde_with::serde_as;

use crate::clients::DataFusionHttpClient;

use super::{
HandlerStateStats, Invocation, InvocationCompletion, InvocationState, JournalEntry,
JournalEntryType, LockedKeyInfo, OutgoingInvoke, ServiceHandlerLockedKeysMap,
ServiceHandlerUsage, ServiceStatusMap, SimpleInvocation,
JournalEntryTypeV1, JournalEntryV1, JournalEntryV2, LockedKeyInfo, OutgoingInvoke,
ServiceHandlerLockedKeysMap, ServiceHandlerUsage, ServiceStatusMap, SimpleInvocation,
};

static JOURNAL_QUERY_LIMIT: usize = 100;
Expand Down Expand Up @@ -413,6 +412,11 @@ struct JournalQueryResult {
sleep_wakeup_at: Option<DateTime<Local>>,
name: Option<String>,
promise_name: Option<String>,

// --- V2 columns
version: u32,
appended_at: Option<DateTime<Local>>,
entry_json: Option<String>,
}

pub async fn get_invocation_journal(
Expand All @@ -427,6 +431,14 @@ pub async fn get_invocation_journal(
} else {
"CAST(NULL as STRING) AS promise_name"
};
let has_restate_1_2_columns = client
.check_columns_exists("sys_journal", &["version", "entry_json", "appended_at"])
.await?;
let select_restate_1_2_columns = if has_restate_1_2_columns {
"sj.version, sj.entry_json, sj.appended_at"
} else {
"CAST(1 as INT UNSIGNED) AS version, CAST(NULL as STRING) AS entry_json, CAST(NULL as TIMESTAMP) AS appended_at"
};

// We are only looking for one...
// Let's get journal details.
Expand All @@ -439,7 +451,8 @@ pub async fn get_invocation_journal(
sj.invoked_target,
sj.sleep_wakeup_at,
sj.name,
{select_promise_column}
{select_promise_column},
{select_restate_1_2_columns}
FROM sys_journal sj
WHERE
sj.id = '{invocation_id}'
Expand All @@ -454,38 +467,53 @@ pub async fn get_invocation_journal(
.await?
.into_iter()
.map(|row| {
let entry_type = match row.entry_type.as_str() {
"Sleep" => JournalEntryType::Sleep {
wakeup_at: row.sleep_wakeup_at.map(Into::into),
},
"Call" => JournalEntryType::Call(OutgoingInvoke {
invocation_id: row.invoked_id,
invoked_target: row.invoked_target,
}),
"OneWayCall" => JournalEntryType::OneWayCall(OutgoingInvoke {
invocation_id: row.invoked_id,
invoked_target: row.invoked_target,
}),
"Awakeable" => JournalEntryType::Awakeable(AwakeableIdentifier::new(
my_invocation_id,
row.index,
)),
"GetState" => JournalEntryType::GetState,
"SetState" => JournalEntryType::SetState,
"ClearState" => JournalEntryType::ClearState,
"Run" => JournalEntryType::Run,
"GetPromise" => JournalEntryType::GetPromise(row.promise_name),
t => JournalEntryType::Other(t.to_owned()),
};

JournalEntry {
seq: row.index,
entry_type,
completed: row.completed,
name: row.name,
if row.version == 1 {
let entry_type = match row.entry_type.as_str() {
"Sleep" => JournalEntryTypeV1::Sleep {
wakeup_at: row.sleep_wakeup_at.map(Into::into),
},
"Call" => JournalEntryTypeV1::Call(OutgoingInvoke {
invocation_id: row.invoked_id,
invoked_target: row.invoked_target,
}),
"OneWayCall" => JournalEntryTypeV1::OneWayCall(OutgoingInvoke {
invocation_id: row.invoked_id,
invoked_target: row.invoked_target,
}),
"Awakeable" => JournalEntryTypeV1::Awakeable(AwakeableIdentifier::new(
my_invocation_id,
row.index,
)),
"GetState" => JournalEntryTypeV1::GetState,
"SetState" => JournalEntryTypeV1::SetState,
"ClearState" => JournalEntryTypeV1::ClearState,
"Run" => JournalEntryTypeV1::Run,
"GetPromise" => JournalEntryTypeV1::GetPromise(row.promise_name),
t => JournalEntryTypeV1::Other(t.to_owned()),
};

Ok(JournalEntry::V1(JournalEntryV1 {
seq: row.index,
entry_type,
completed: row.completed,
name: row.name,
}))
} else if row.version == 2 {
Ok(JournalEntry::V2(JournalEntryV2 {
seq: row.index,
entry_type: row.entry_type,
name: row.name,
entry: row.entry_json.and_then(|j| serde_json::from_str(&j).ok()),
appended_at: row.appended_at.map(Into::into),
}))
} else {
anyhow::bail!(
"The row version is unknown, cannot parse the journal: {}",
row.version
)
}
})
.collect();
.collect::<Result<Vec<_>, _>>()?;

// Sort by seq.
journal.reverse();
Expand Down
Loading

0 comments on commit c1f5be8

Please sign in to comment.