diff --git a/cli/src/clients/datafusion_helpers/mod.rs b/cli/src/clients/datafusion_helpers/mod.rs index d4ba29522f..1c64ad3d18 100644 --- a/cli/src/clients/datafusion_helpers/mod.rs +++ b/cli/src/clients/datafusion_helpers/mod.rs @@ -1,10 +1,12 @@ use std::{collections::HashMap, fmt::Display, str::FromStr}; +use super::DataFusionHttpClient; use anyhow::Result; use bytes::Bytes; use chrono::{DateTime, Duration, Local}; use clap::ValueEnum; use restate_admin_rest_model::version::AdminApiVersion; +use restate_types::journal_v2::Entry; use restate_types::{ identifiers::{AwakeableIdentifier, DeploymentId, ServiceId}, invocation::ServiceType, @@ -12,8 +14,6 @@ use restate_types::{ use serde::Deserialize; use serde_with::{serde_as, DeserializeAs}; -use super::DataFusionHttpClient; - mod v1; mod v2; @@ -183,14 +183,32 @@ impl InvocationCompletion { } #[derive(Debug, Clone)] -pub struct JournalEntry { +pub enum JournalEntry { + V1(JournalEntryV1), + V2(JournalEntryV2), +} + +impl JournalEntry { + pub fn should_present(&self) -> bool { + match self { + JournalEntry::V1(v1) => v1.should_present(), + JournalEntry::V2(_) => { + // For now in V2 we show all the entries + true + } + } + } +} + +#[derive(Debug, Clone)] +pub struct JournalEntryV1 { pub seq: u32, - pub entry_type: JournalEntryType, + pub entry_type: JournalEntryTypeV1, completed: bool, pub name: Option, } -impl JournalEntry { +impl JournalEntryV1 { pub fn is_completed(&self) -> bool { if self.entry_type.is_completable() { self.completed @@ -205,7 +223,7 @@ impl JournalEntry { } #[derive(Debug, Clone)] -pub enum JournalEntryType { +pub enum JournalEntryTypeV1 { Sleep { wakeup_at: Option>, }, @@ -222,48 +240,57 @@ pub enum JournalEntryType { Other(String), } -impl JournalEntryType { +impl JournalEntryTypeV1 { fn is_completable(&self) -> bool { matches!( self, - JournalEntryType::Sleep { .. } - | JournalEntryType::Call(_) - | JournalEntryType::Awakeable(_) - | JournalEntryType::GetState - | JournalEntryType::GetPromise(_) + JournalEntryTypeV1::Sleep { .. } + | JournalEntryTypeV1::Call(_) + | JournalEntryTypeV1::Awakeable(_) + | JournalEntryTypeV1::GetState + | JournalEntryTypeV1::GetPromise(_) ) } fn should_present(&self) -> bool { matches!( self, - JournalEntryType::Sleep { .. } - | JournalEntryType::Call(_) - | JournalEntryType::OneWayCall(_) - | JournalEntryType::Awakeable(_) - | JournalEntryType::Run - | JournalEntryType::GetPromise(_) + JournalEntryTypeV1::Sleep { .. } + | JournalEntryTypeV1::Call(_) + | JournalEntryTypeV1::OneWayCall(_) + | JournalEntryTypeV1::Awakeable(_) + | JournalEntryTypeV1::Run + | JournalEntryTypeV1::GetPromise(_) ) } } -impl Display for JournalEntryType { +impl Display for JournalEntryTypeV1 { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - JournalEntryType::Sleep { .. } => write!(f, "Sleep"), - JournalEntryType::Call(_) => write!(f, "Call"), - JournalEntryType::OneWayCall(_) => write!(f, "Send"), - JournalEntryType::Awakeable(_) => write!(f, "Awakeable"), - JournalEntryType::GetState => write!(f, "GetState"), - JournalEntryType::SetState => write!(f, "SetState"), - JournalEntryType::ClearState => write!(f, "ClearState"), - JournalEntryType::Run => write!(f, "Run"), - JournalEntryType::GetPromise(_) => write!(f, "Promise"), - JournalEntryType::Other(s) => write!(f, "{s}"), + JournalEntryTypeV1::Sleep { .. } => write!(f, "Sleep"), + JournalEntryTypeV1::Call(_) => write!(f, "Call"), + JournalEntryTypeV1::OneWayCall(_) => write!(f, "Send"), + JournalEntryTypeV1::Awakeable(_) => write!(f, "Awakeable"), + JournalEntryTypeV1::GetState => write!(f, "GetState"), + JournalEntryTypeV1::SetState => write!(f, "SetState"), + JournalEntryTypeV1::ClearState => write!(f, "ClearState"), + JournalEntryTypeV1::Run => write!(f, "Run"), + JournalEntryTypeV1::GetPromise(_) => write!(f, "Promise"), + JournalEntryTypeV1::Other(s) => write!(f, "{s}"), } } } +#[derive(Debug, Clone)] +pub struct JournalEntryV2 { + pub seq: u32, + pub entry_type: String, + pub name: Option, + pub entry: Option, + pub appended_at: Option>, +} + #[derive(Debug, Clone)] pub struct OutgoingInvoke { pub invocation_id: Option, diff --git a/cli/src/clients/datafusion_helpers/v1.rs b/cli/src/clients/datafusion_helpers/v1.rs index 40c2560d05..f7ca9bb14d 100644 --- a/cli/src/clients/datafusion_helpers/v1.rs +++ b/cli/src/clients/datafusion_helpers/v1.rs @@ -28,7 +28,7 @@ use crate::clients::DataFusionHttpClient; use super::{ HandlerStateStats, Invocation, InvocationCompletion, InvocationState, JournalEntry, - JournalEntryType, LockedKeyInfo, OutgoingInvoke, ServiceHandlerLockedKeysMap, + JournalEntryTypeV1, JournalEntryV1, LockedKeyInfo, OutgoingInvoke, ServiceHandlerLockedKeysMap, ServiceHandlerUsage, ServiceStatusMap, SimpleInvocation, }; @@ -672,34 +672,34 @@ pub async fn get_invocation_journal( .map(|row| { let index = row.index.expect("index"); let entry_type = match row.entry_type.expect("entry_type").as_str() { - "Sleep" => JournalEntryType::Sleep { + "Sleep" => JournalEntryTypeV1::Sleep { wakeup_at: row.sleep_wakeup_at.map(Into::into), }, - "Call" => JournalEntryType::Call(OutgoingInvoke { + "Call" => JournalEntryTypeV1::Call(OutgoingInvoke { invocation_id: row.invoked_id, invoked_target: row.invoked_target, }), - "OneWayCall" => JournalEntryType::OneWayCall(OutgoingInvoke { + "OneWayCall" => JournalEntryTypeV1::OneWayCall(OutgoingInvoke { invocation_id: row.invoked_id, invoked_target: row.invoked_target, }), "Awakeable" => { - JournalEntryType::Awakeable(AwakeableIdentifier::new(my_invocation_id, index)) + JournalEntryTypeV1::Awakeable(AwakeableIdentifier::new(my_invocation_id, index)) } - "GetState" => JournalEntryType::GetState, - "SetState" => JournalEntryType::SetState, - "ClearState" => JournalEntryType::ClearState, - "Run" => JournalEntryType::Run, - "GetPromise" => JournalEntryType::GetPromise(row.promise_name), - t => JournalEntryType::Other(t.to_owned()), + "GetState" => JournalEntryTypeV1::GetState, + "SetState" => JournalEntryTypeV1::SetState, + "ClearState" => JournalEntryTypeV1::ClearState, + "Run" => JournalEntryTypeV1::Run, + "GetPromise" => JournalEntryTypeV1::GetPromise(row.promise_name), + t => JournalEntryTypeV1::Other(t.to_owned()), }; - JournalEntry { + JournalEntry::V1(JournalEntryV1 { seq: index, entry_type, completed: row.completed.unwrap_or_default(), name: row.name, - } + }) }) .collect(); diff --git a/cli/src/clients/datafusion_helpers/v2.rs b/cli/src/clients/datafusion_helpers/v2.rs index 737170866c..18460a9ebd 100644 --- a/cli/src/clients/datafusion_helpers/v2.rs +++ b/cli/src/clients/datafusion_helpers/v2.rs @@ -16,17 +16,16 @@ use anyhow::Result; use bytes::Bytes; use chrono::{DateTime, Local}; +use crate::clients::DataFusionHttpClient; use restate_types::identifiers::DeploymentId; use restate_types::identifiers::{AwakeableIdentifier, InvocationId, ServiceId}; use serde::Deserialize; use serde_with::serde_as; -use crate::clients::DataFusionHttpClient; - use super::{ HandlerStateStats, Invocation, InvocationCompletion, InvocationState, JournalEntry, - JournalEntryType, LockedKeyInfo, OutgoingInvoke, ServiceHandlerLockedKeysMap, - ServiceHandlerUsage, ServiceStatusMap, SimpleInvocation, + JournalEntryTypeV1, JournalEntryV1, JournalEntryV2, LockedKeyInfo, OutgoingInvoke, + ServiceHandlerLockedKeysMap, ServiceHandlerUsage, ServiceStatusMap, SimpleInvocation, }; static JOURNAL_QUERY_LIMIT: usize = 100; @@ -413,6 +412,11 @@ struct JournalQueryResult { sleep_wakeup_at: Option>, name: Option, promise_name: Option, + + // --- V2 columns + version: u32, + appended_at: Option>, + entry_json: Option, } pub async fn get_invocation_journal( @@ -427,6 +431,14 @@ pub async fn get_invocation_journal( } else { "CAST(NULL as STRING) AS promise_name" }; + let has_restate_1_2_columns = client + .check_columns_exists("sys_journal", &["version", "entry_json", "appended_at"]) + .await?; + let select_restate_1_2_columns = if has_restate_1_2_columns { + "sj.version, sj.entry_json, sj.appended_at" + } else { + "CAST(1 as INT UNSIGNED) AS version, CAST(NULL as STRING) AS entry_json, CAST(NULL as TIMESTAMP) AS appended_at" + }; // We are only looking for one... // Let's get journal details. @@ -439,7 +451,8 @@ pub async fn get_invocation_journal( sj.invoked_target, sj.sleep_wakeup_at, sj.name, - {select_promise_column} + {select_promise_column}, + {select_restate_1_2_columns} FROM sys_journal sj WHERE sj.id = '{invocation_id}' @@ -454,38 +467,53 @@ pub async fn get_invocation_journal( .await? .into_iter() .map(|row| { - let entry_type = match row.entry_type.as_str() { - "Sleep" => JournalEntryType::Sleep { - wakeup_at: row.sleep_wakeup_at.map(Into::into), - }, - "Call" => JournalEntryType::Call(OutgoingInvoke { - invocation_id: row.invoked_id, - invoked_target: row.invoked_target, - }), - "OneWayCall" => JournalEntryType::OneWayCall(OutgoingInvoke { - invocation_id: row.invoked_id, - invoked_target: row.invoked_target, - }), - "Awakeable" => JournalEntryType::Awakeable(AwakeableIdentifier::new( - my_invocation_id, - row.index, - )), - "GetState" => JournalEntryType::GetState, - "SetState" => JournalEntryType::SetState, - "ClearState" => JournalEntryType::ClearState, - "Run" => JournalEntryType::Run, - "GetPromise" => JournalEntryType::GetPromise(row.promise_name), - t => JournalEntryType::Other(t.to_owned()), - }; - - JournalEntry { - seq: row.index, - entry_type, - completed: row.completed, - name: row.name, + if row.version == 1 { + let entry_type = match row.entry_type.as_str() { + "Sleep" => JournalEntryTypeV1::Sleep { + wakeup_at: row.sleep_wakeup_at.map(Into::into), + }, + "Call" => JournalEntryTypeV1::Call(OutgoingInvoke { + invocation_id: row.invoked_id, + invoked_target: row.invoked_target, + }), + "OneWayCall" => JournalEntryTypeV1::OneWayCall(OutgoingInvoke { + invocation_id: row.invoked_id, + invoked_target: row.invoked_target, + }), + "Awakeable" => JournalEntryTypeV1::Awakeable(AwakeableIdentifier::new( + my_invocation_id, + row.index, + )), + "GetState" => JournalEntryTypeV1::GetState, + "SetState" => JournalEntryTypeV1::SetState, + "ClearState" => JournalEntryTypeV1::ClearState, + "Run" => JournalEntryTypeV1::Run, + "GetPromise" => JournalEntryTypeV1::GetPromise(row.promise_name), + t => JournalEntryTypeV1::Other(t.to_owned()), + }; + + Ok(JournalEntry::V1(JournalEntryV1 { + seq: row.index, + entry_type, + completed: row.completed, + name: row.name, + })) + } else if row.version == 2 { + Ok(JournalEntry::V2(JournalEntryV2 { + seq: row.index, + entry_type: row.entry_type, + name: row.name, + entry: row.entry_json.and_then(|j| serde_json::from_str(&j).ok()), + appended_at: row.appended_at.map(Into::into), + })) + } else { + anyhow::bail!( + "The row version is unknown, cannot parse the journal: {}", + row.version + ) } }) - .collect(); + .collect::, _>>()?; // Sort by seq. journal.reverse(); diff --git a/cli/src/ui/invocations.rs b/cli/src/ui/invocations.rs index 5f93d0705b..e220fadcdf 100644 --- a/cli/src/ui/invocations.rs +++ b/cli/src/ui/invocations.rs @@ -8,6 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::clients::datafusion_helpers::{ + Invocation, InvocationState, JournalEntry, JournalEntryV2, +}; +use crate::clients::datafusion_helpers::{InvocationCompletion, JournalEntryTypeV1}; +use crate::clients::datafusion_helpers::{JournalEntryV1, SimpleInvocation}; use chrono_humanize::Tense; use comfy_table::{Attribute, Cell, Table}; use dialoguer::console::Style as DStyle; @@ -19,10 +24,14 @@ use restate_cli_util::c_println; use restate_cli_util::ui::console::Icon; use restate_cli_util::ui::console::StyledTable; use restate_cli_util::ui::duration_to_human_precise; - -use crate::clients::datafusion_helpers::{Invocation, InvocationState}; -use crate::clients::datafusion_helpers::{InvocationCompletion, JournalEntryType}; -use crate::clients::datafusion_helpers::{JournalEntry, SimpleInvocation}; +use restate_types::invocation::InvocationQuery; +use restate_types::journal_v2::{ + AttachInvocationCommand, CallCommand, ClearStateCommand, Command, CompleteAwakeableCommand, + CompletePromiseCommand, Entry, GetEagerStateCommand, GetInvocationOutputCommand, + GetLazyStateCommand, GetPromiseCommand, OneWayCallCommand, PeekPromiseCommand, + SendSignalCommand, SetStateCommand, SleepCommand, +}; +use std::time::SystemTime; pub fn invocation_status_note(invocation: &Invocation) -> String { let mut msg = String::new(); @@ -273,9 +282,16 @@ pub fn render_invocation_compact(invocation: &Invocation) { } pub fn format_journal_entry(entry: &JournalEntry) -> String { + match entry { + JournalEntry::V1(v1) => format_journal_entry_v1(v1), + JournalEntry::V2(v2) => format_journal_entry_v2(v2), + } +} + +pub fn format_journal_entry_v1(entry: &JournalEntryV1) -> String { let state_icon = if entry.is_completed() { Icon("☑️ ", "[DONE]") - } else if matches!(entry.entry_type, JournalEntryType::Sleep { .. }) { + } else if matches!(entry.entry_type, JournalEntryTypeV1::Sleep { .. }) { Icon("⏰", "[PENDING]") } else { Icon("⏸️ ", "[PENDING]") @@ -298,13 +314,13 @@ pub fn format_journal_entry(entry: &JournalEntry) -> String { state_icon, type_style.apply_to(seq), type_style.apply_to(entry_ty_and_name), - format_entry_type_details(&entry.entry_type) + format_entry_type_v1_details(&entry.entry_type) ) } -pub fn format_entry_type_details(entry_type: &JournalEntryType) -> String { +fn format_entry_type_v1_details(entry_type: &JournalEntryTypeV1) -> String { match entry_type { - JournalEntryType::Sleep { + JournalEntryTypeV1::Sleep { wakeup_at: Some(wakeup_at), } => { let left = wakeup_at.signed_duration_since(chrono::Local::now()); @@ -315,19 +331,96 @@ pub fn format_entry_type_details(entry_type: &JournalEntryType) -> String { format!("until {}", style(wakeup_at).dim()) } } - JournalEntryType::Call(inv) | JournalEntryType::OneWayCall(inv) => { + JournalEntryTypeV1::Call(inv) | JournalEntryTypeV1::OneWayCall(inv) => { format!( "{} {}", inv.invoked_target.as_ref().unwrap(), inv.invocation_id.as_deref().unwrap_or(""), ) } - JournalEntryType::Awakeable(awakeable_id) => { + JournalEntryTypeV1::Awakeable(awakeable_id) => { format!("{}", style(awakeable_id.to_string()).cyan()) } - JournalEntryType::GetPromise(Some(promise_name)) => { + JournalEntryTypeV1::GetPromise(Some(promise_name)) => { format!("{}", style(promise_name).cyan()) } _ => String::new(), } } + +pub fn format_journal_entry_v2(entry: &JournalEntryV2) -> String { + let seq_and_time = if let Some(timestamp) = &entry.appended_at { + let date_style = DStyle::new().dim(); + format!("#{} [{}]", entry.seq, date_style.apply_to(timestamp)) + } else { + format!("#{}", entry.seq) + }; + let entry_ty_and_name = if let Some(name) = &entry.name { + if !name.is_empty() { + format!("{} [{}]", entry.entry_type, name) + } else { + entry.entry_type.clone() + } + } else { + entry.entry_type.clone() + }; + + format!( + " {} {} {}", + seq_and_time, + entry_ty_and_name, + format_entry_type_v2_details(&entry.entry) + ) +} + +fn format_entry_type_v2_details(entry: &Option) -> String { + if entry.is_none() { + return "".to_owned(); + } + match entry.as_ref().unwrap() { + Entry::Command(Command::Sleep(SleepCommand { wake_up_time, .. })) => { + let wakeup_at: chrono::DateTime = + chrono::DateTime::from(SystemTime::from(*wake_up_time)); + let left = wakeup_at.signed_duration_since(chrono::Local::now()); + if left.num_milliseconds() >= 0 { + let left = duration_to_human_precise(left, Tense::Present); + format!("until {} ({} left)", wakeup_at, style(left).cyan()) + } else { + format!("until {}", style(wakeup_at).dim()) + } + } + Entry::Command(Command::Call(CallCommand { request, .. })) + | Entry::Command(Command::OneWayCall(OneWayCallCommand { request, .. })) => { + format!("{} {}", request.invocation_target, request.invocation_id) + } + Entry::Command(Command::CompleteAwakeable(CompleteAwakeableCommand { id, .. })) => { + format!("id {}", id) + } + Entry::Command(Command::SendSignal(SendSignalCommand { + target_invocation_id, + signal_id, + .. + })) => { + format!("signal {} to {}", signal_id, target_invocation_id) + } + Entry::Command(Command::GetLazyState(GetLazyStateCommand { key, .. })) + | Entry::Command(Command::GetEagerState(GetEagerStateCommand { key, .. })) + | Entry::Command(Command::SetState(SetStateCommand { key, .. })) + | Entry::Command(Command::ClearState(ClearStateCommand { key, .. })) + | Entry::Command(Command::GetPromise(GetPromiseCommand { key, .. })) + | Entry::Command(Command::PeekPromise(PeekPromiseCommand { key, .. })) + | Entry::Command(Command::CompletePromise(CompletePromiseCommand { key, .. })) => { + format!("key {}", style(key).cyan()) + } + Entry::Command(Command::AttachInvocation(AttachInvocationCommand { target, .. })) + | Entry::Command(Command::GetInvocationOutput(GetInvocationOutputCommand { + target, .. + })) => { + format!( + "{}", + InvocationQuery::from(target.clone()).to_invocation_id() + ) + } + _ => String::new(), + } +} diff --git a/crates/storage-query-datafusion/src/journal/row.rs b/crates/storage-query-datafusion/src/journal/row.rs index b6d5d3cf4d..c97b7813da 100644 --- a/crates/storage-query-datafusion/src/journal/row.rs +++ b/crates/storage-query-datafusion/src/journal/row.rs @@ -9,19 +9,18 @@ // by the Apache License, Version 2.0. use crate::journal::schema::SysJournalBuilder; - +use crate::log_data_corruption_error; +use crate::table_util::format_using; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec; use restate_storage_api::journal_table::JournalEntry; use restate_types::identifiers::{JournalEntryId, WithInvocationId, WithPartitionKey}; use restate_types::journal::enriched::EnrichedEntryHeader; -use restate_types::journal::{CompletePromiseEntry, GetPromiseEntry, PeekPromiseEntry}; - -use crate::log_data_corruption_error; -use crate::table_util::format_using; use restate_types::journal::Entry; +use restate_types::journal::{CompletePromiseEntry, GetPromiseEntry, PeekPromiseEntry}; use restate_types::journal_v2; use restate_types::journal_v2::raw::RawEntry; +use restate_types::journal_v2::CommandMetadata; use restate_types::journal_v2::EntryMetadata; #[inline] @@ -147,16 +146,6 @@ pub(crate) fn append_journal_row_v2( journal_entry_id: JournalEntryId, raw_entry: RawEntry, ) { - let Ok(entry) = raw_entry.decode::() else { - log_data_corruption_error!( - "sys_journal", - &journal_entry_id.invocation_id(), - "entry", - "The entry should decode correctly" - ); - return; - }; - let mut row = builder.row(); row.version(2); @@ -170,11 +159,30 @@ pub(crate) fn append_journal_row_v2( row.entry_type(format_using(output, &raw_entry.ty())); } - if row.is_entry_json_defined() { - if let Ok(json) = serde_json::to_string(&entry) { - row.entry_json(json); + row.appended_at(raw_entry.header().append_time.as_u64() as i64); + + if row.is_entry_json_defined() || row.is_name_defined() { + // We need to parse the entry + let Ok(entry) = raw_entry.decode::() else { + log_data_corruption_error!( + "sys_journal", + &journal_entry_id.invocation_id(), + "entry", + "The entry should decode correctly" + ); + return; + }; + + if row.is_entry_json_defined() { + if let Ok(json) = serde_json::to_string(&entry) { + row.entry_json(json); + } } - } - row.appended_at(raw_entry.header().append_time.as_u64() as i64); + if row.is_name_defined() { + if let journal_v2::Entry::Command(cmd) = entry { + row.name(cmd.name()); + } + } + } } diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index ec4df46578..0fe4776d40 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -14,7 +14,6 @@ use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; use bytestring::ByteString; use rand::RngCore; -use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::fmt; use std::fmt::{Display, Formatter}; @@ -961,7 +960,9 @@ ulid_backed_id!(Subscription @with_resource_id); ulid_backed_id!(PartitionProcessorRpcRequest); ulid_backed_id!(Snapshot @with_resource_id); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive( + Debug, Clone, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr, +)] pub struct AwakeableIdentifier { invocation_id: InvocationId, entry_index: EntryIndex, @@ -1040,7 +1041,9 @@ impl Display for AwakeableIdentifier { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive( + Debug, Clone, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr, +)] pub struct ExternalSignalIdentifier { invocation_id: InvocationId, signal_index: u32, diff --git a/crates/types/src/journal_v2/command.rs b/crates/types/src/journal_v2/command.rs index 45d551c0fd..a2a9f05790 100644 --- a/crates/types/src/journal_v2/command.rs +++ b/crates/types/src/journal_v2/command.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::errors::IdDecodeError; use crate::identifiers::{ AwakeableIdentifier, ExternalSignalIdentifier, IdempotencyId, InvocationId, ServiceId, }; @@ -21,17 +22,19 @@ use crate::time::MillisSinceEpoch; use bytes::Bytes; use bytestring::ByteString; use enum_dispatch::enum_dispatch; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::fmt; +use std::str::FromStr; use std::time::Duration; #[enum_dispatch] pub trait CommandMetadata { fn related_completion_ids(&self) -> Vec; + fn name(&self) -> &str; } #[enum_dispatch(EntryMetadata, CommandMetadata)] -#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants, Serialize, Deserialize)] #[strum_discriminants(vis(pub))] #[strum_discriminants(name(CommandType))] #[strum_discriminants(derive(serde::Serialize, serde::Deserialize))] @@ -119,6 +122,10 @@ macro_rules! impl_command_accessors { fn related_completion_ids(&self) -> Vec { vec![] } + + fn name(&self) -> &str { + &self.name + } } impl_command_accessors!($ty -> [$($tail)*]); }; @@ -127,6 +134,10 @@ macro_rules! impl_command_accessors { fn related_completion_ids(&self) -> Vec { vec![self.completion_id] } + + fn name(&self) -> &str { + &self.name + } } impl_command_accessors!($ty -> [$($tail)*]); }; @@ -139,7 +150,7 @@ macro_rules! impl_command_accessors { // --- Actual implementation of individual commands -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct InputCommand { pub headers: Vec
, pub payload: Bytes, @@ -147,19 +158,19 @@ pub struct InputCommand { } impl_command_accessors!(Input -> [@metadata @from_entry @no_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct OutputCommand { pub result: OutputResult, pub name: ByteString, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum OutputResult { Success(Bytes), Failure(Failure), } impl_command_accessors!(Output -> [@metadata @from_entry @no_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetLazyStateCommand { pub key: ByteString, pub completion_id: CompletionId, @@ -167,7 +178,7 @@ pub struct GetLazyStateCommand { } impl_command_accessors!(GetLazyState -> [@metadata @from_entry @result_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SetStateCommand { pub key: ByteString, pub value: Bytes, @@ -175,27 +186,27 @@ pub struct SetStateCommand { } impl_command_accessors!(SetState -> [@metadata @from_entry @no_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ClearStateCommand { pub key: ByteString, pub name: ByteString, } impl_command_accessors!(ClearState -> [@metadata @from_entry @no_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ClearAllStateCommand { pub name: ByteString, } impl_command_accessors!(ClearAllState -> [@metadata @from_entry @no_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetLazyStateKeysCommand { pub completion_id: CompletionId, pub name: ByteString, } impl_command_accessors!(GetLazyStateKeys -> [@metadata @from_entry @result_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetEagerStateCommand { pub key: ByteString, pub result: GetStateResult, @@ -203,14 +214,14 @@ pub struct GetEagerStateCommand { } impl_command_accessors!(GetEagerState -> [@metadata @from_entry @no_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetEagerStateKeysCommand { pub state_keys: Vec, pub name: ByteString, } impl_command_accessors!(GetEagerStateKeys -> [@metadata @from_entry @no_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetPromiseCommand { pub key: ByteString, pub completion_id: CompletionId, @@ -218,7 +229,7 @@ pub struct GetPromiseCommand { } impl_command_accessors!(GetPromise -> [@metadata @from_entry @result_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PeekPromiseCommand { pub key: ByteString, pub completion_id: CompletionId, @@ -226,21 +237,21 @@ pub struct PeekPromiseCommand { } impl_command_accessors!(PeekPromise -> [@metadata @from_entry @result_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct CompletePromiseCommand { pub key: ByteString, pub value: CompletePromiseValue, pub completion_id: CompletionId, pub name: ByteString, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum CompletePromiseValue { Success(Bytes), Failure(Failure), } impl_command_accessors!(CompletePromise -> [@metadata @from_entry @result_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SleepCommand { pub wake_up_time: MillisSinceEpoch, pub completion_id: CompletionId, @@ -248,7 +259,7 @@ pub struct SleepCommand { } impl_command_accessors!(Sleep -> [@metadata @from_entry @result_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct CallRequest { pub invocation_id: InvocationId, pub invocation_target: InvocationTarget, @@ -259,7 +270,7 @@ pub struct CallRequest { pub completion_retention_duration: Duration, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct CallCommand { pub request: CallRequest, pub invocation_id_completion_id: CompletionId, @@ -271,9 +282,13 @@ impl CommandMetadata for CallCommand { fn related_completion_ids(&self) -> Vec { vec![self.invocation_id_completion_id, self.result_completion_id] } + + fn name(&self) -> &str { + &self.name + } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct OneWayCallCommand { pub request: CallRequest, pub invoke_time: MillisSinceEpoch, @@ -285,9 +300,13 @@ impl CommandMetadata for OneWayCallCommand { fn related_completion_ids(&self) -> Vec { vec![self.invocation_id_completion_id] } + + fn name(&self) -> &str { + &self.name + } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SendSignalCommand { pub target_invocation_id: InvocationId, pub signal_id: SignalId, @@ -296,14 +315,14 @@ pub struct SendSignalCommand { } impl_command_accessors!(SendSignal -> [@metadata @from_entry @no_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RunCommand { pub completion_id: CompletionId, pub name: ByteString, } impl_command_accessors!(Run -> [@metadata @from_entry @result_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum AttachInvocationTarget { InvocationId(InvocationId), IdempotentRequest(IdempotencyId), @@ -328,7 +347,7 @@ impl From for AttachInvocationTarget { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AttachInvocationCommand { pub target: AttachInvocationTarget, pub completion_id: CompletionId, @@ -336,7 +355,7 @@ pub struct AttachInvocationCommand { } impl_command_accessors!(AttachInvocation -> [@metadata @from_entry @result_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetInvocationOutputCommand { pub target: AttachInvocationTarget, pub completion_id: CompletionId, @@ -344,7 +363,7 @@ pub struct GetInvocationOutputCommand { } impl_command_accessors!(GetInvocationOutput -> [@metadata @from_entry @result_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct CompleteAwakeableCommand { pub id: CompleteAwakeableId, pub result: CompleteAwakeableResult, @@ -352,11 +371,14 @@ pub struct CompleteAwakeableCommand { } impl_command_accessors!(CompleteAwakeable -> [@metadata @from_entry @no_completion]); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive( + Debug, Clone, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr, +)] pub enum CompleteAwakeableId { Old(AwakeableIdentifier), New(ExternalSignalIdentifier), } + impl fmt::Display for CompleteAwakeableId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -366,7 +388,17 @@ impl fmt::Display for CompleteAwakeableId { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +impl FromStr for CompleteAwakeableId { + type Err = IdDecodeError; + + fn from_str(s: &str) -> Result { + AwakeableIdentifier::from_str(s) + .map(CompleteAwakeableId::Old) + .or_else(|_| ExternalSignalIdentifier::from_str(s).map(CompleteAwakeableId::New)) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum CompleteAwakeableResult { Success(Bytes), Failure(Failure), diff --git a/crates/types/src/journal_v2/event.rs b/crates/types/src/journal_v2/event.rs index dea1d8328b..7be05b1ed0 100644 --- a/crates/types/src/journal_v2/event.rs +++ b/crates/types/src/journal_v2/event.rs @@ -11,19 +11,18 @@ use crate::journal_v2::raw::{TryFromEntry, TryFromEntryError}; use crate::journal_v2::{Entry, EntryMetadata, EntryType}; use bytestring::ByteString; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use strum::EnumString; -#[derive( - Debug, Clone, PartialEq, Eq, EnumString, strum::Display, serde::Serialize, serde::Deserialize, -)] +#[derive(Debug, Clone, PartialEq, Eq, EnumString, strum::Display, Serialize, Deserialize)] pub enum EventType { Lifecycle, #[strum(default)] Other(String), } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Event { pub ty: EventType, pub metadata: HashMap, diff --git a/crates/types/src/journal_v2/mod.rs b/crates/types/src/journal_v2/mod.rs index 5ea3d4a289..d820961bfd 100644 --- a/crates/types/src/journal_v2/mod.rs +++ b/crates/types/src/journal_v2/mod.rs @@ -28,7 +28,7 @@ use bytestring::ByteString; use enum_dispatch::enum_dispatch; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::fmt; pub mod command; @@ -64,7 +64,14 @@ pub enum EntryType { impl fmt::Display for EntryType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - EntryType::Command(cmd) => fmt::Display::fmt(cmd, f), + EntryType::Command(cmd) => { + write!(f, "Command: ")?; + fmt::Display::fmt(cmd, f) + } + EntryType::Notification(notif) => { + write!(f, "Notification: ")?; + fmt::Display::fmt(notif, f) + } e => fmt::Debug::fmt(e, f), } } @@ -79,7 +86,7 @@ pub trait EntryMetadata { /// Root enum representing a decoded entry. #[enum_dispatch(EntryMetadata)] -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum Entry { Command(Command), Notification(Notification), diff --git a/crates/types/src/journal_v2/notification.rs b/crates/types/src/journal_v2/notification.rs index 5c849bfa1c..39de3a878b 100644 --- a/crates/types/src/journal_v2/notification.rs +++ b/crates/types/src/journal_v2/notification.rs @@ -15,11 +15,11 @@ use crate::journal_v2::{ }; use bytes::Bytes; use enum_dispatch::enum_dispatch; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::fmt; /// See [`Notification`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum NotificationId { CompletionId(CompletionId), SignalIndex(SignalIndex), @@ -54,7 +54,7 @@ impl fmt::Display for NotificationId { } } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum NotificationType { Completion(CompletionType), Signal, @@ -81,7 +81,7 @@ pub trait NotificationMetadata { } #[enum_dispatch(NotificationMetadata, EntryMetadata)] -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum Notification { Completion(Completion), Signal(Signal), @@ -102,7 +102,7 @@ impl Notification { } #[enum_dispatch(NotificationMetadata, EntryMetadata)] -#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants, Serialize, Deserialize)] #[strum_discriminants(vis(pub))] #[strum_discriminants(name(CompletionType))] #[strum_discriminants(derive(serde::Serialize, serde::Deserialize))] @@ -189,43 +189,43 @@ macro_rules! impl_completion_accessors { // --- Actual implementation of individual notifications -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetLazyStateCompletion { pub completion_id: CompletionId, pub result: GetStateResult, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum GetStateResult { Void, Success(Bytes), } impl_completion_accessors!(GetLazyState); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetLazyStateKeysCompletion { pub completion_id: CompletionId, pub state_keys: Vec, } impl_completion_accessors!(GetLazyStateKeys); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetPromiseCompletion { pub completion_id: CompletionId, pub result: GetPromiseResult, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum GetPromiseResult { Success(Bytes), Failure(Failure), } impl_completion_accessors!(GetPromise); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PeekPromiseCompletion { pub completion_id: CompletionId, pub result: PeekPromiseResult, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum PeekPromiseResult { Void, Success(Bytes), @@ -233,73 +233,73 @@ pub enum PeekPromiseResult { } impl_completion_accessors!(PeekPromise); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct CompletePromiseCompletion { pub completion_id: CompletionId, pub result: CompletePromiseResult, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum CompletePromiseResult { Void, Failure(Failure), } impl_completion_accessors!(CompletePromise); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SleepCompletion { pub completion_id: CompletionId, } impl_completion_accessors!(Sleep); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct CallInvocationIdCompletion { pub completion_id: CompletionId, pub invocation_id: InvocationId, } impl_completion_accessors!(CallInvocationId); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct CallCompletion { pub completion_id: CompletionId, pub result: CallResult, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum CallResult { Success(Bytes), Failure(Failure), } impl_completion_accessors!(Call); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RunCompletion { pub completion_id: CompletionId, pub result: RunResult, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum RunResult { Success(Bytes), Failure(Failure), } impl_completion_accessors!(Run); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AttachInvocationCompletion { pub completion_id: CompletionId, pub result: AttachInvocationResult, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum AttachInvocationResult { Success(Bytes), Failure(Failure), } impl_completion_accessors!(AttachInvocation); -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GetInvocationOutputCompletion { pub completion_id: CompletionId, pub result: GetInvocationOutputResult, } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum GetInvocationOutputResult { Void, Success(Bytes), @@ -310,6 +310,7 @@ impl_completion_accessors!(GetInvocationOutput); // Signal #[repr(u32)] +#[derive(Debug, strum::FromRepr)] pub enum BuiltInSignal { Cancel = 1, } @@ -319,12 +320,27 @@ pub const CANCEL_SIGNAL: Notification = Notification::Signal(Signal::new( SignalResult::Void, )); -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum SignalId { Index(SignalIndex), Name(SignalName), } +impl fmt::Display for SignalId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SignalId::Index(idx) => { + if let Some(built_in_signal) = BuiltInSignal::from_repr(*idx) { + write!(f, "{:?}", built_in_signal) + } else { + write!(f, "index {}", idx) + } + } + SignalId::Name(name) => write!(f, "{}", name), + } + } +} + impl SignalId { pub const fn for_builtin_signal(signal: BuiltInSignal) -> Self { Self::for_index(signal as u32) @@ -339,7 +355,7 @@ impl SignalId { } } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Signal { pub id: SignalId, pub result: SignalResult, @@ -381,7 +397,7 @@ impl From for Entry { } } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum SignalResult { Void, Success(Bytes),