diff --git a/raphtory/src/core/utils/errors.rs b/raphtory/src/core/utils/errors.rs index 0da4486e41..b6b95d6962 100644 --- a/raphtory/src/core/utils/errors.rs +++ b/raphtory/src/core/utils/errors.rs @@ -61,6 +61,18 @@ pub enum LoadError { MissingTimeError, } +#[cfg(feature = "proto")] +#[derive(thiserror::Error, Debug)] +pub enum WriteError { + #[cfg(feature = "proto")] + #[error("Unrecoverable disk error: {0}, resetting file size failed: {1}")] + FatalWriteError(io::Error, io::Error), + + #[cfg(feature = "proto")] + #[error("Failed to write delta to cache: {0}")] + WriteError(io::Error), +} + #[derive(thiserror::Error, Debug)] pub enum GraphError { #[error("You cannot set ‘{0}’ and ‘{1}’ at the same time. Please pick one or the other.")] @@ -151,7 +163,7 @@ pub enum GraphError { #[error("IO operation failed")] IOError { #[from] - source: std::io::Error, + source: io::Error, }, #[cfg(feature = "arrow")] @@ -200,10 +212,14 @@ pub enum GraphError { "Cannot recover from write failure {write_err}, new updates are invalid: {decode_err}" )] FatalDecodeError { - write_err: io::Error, + write_err: WriteError, decode_err: prost::DecodeError, }, + #[cfg(feature = "proto")] + #[error("Cache write error: {0}")] + CacheWriteError(#[from] WriteError), + #[cfg(feature = "proto")] #[error("Protobuf decode error{0}")] EncodeError(#[from] prost::EncodeError), diff --git a/raphtory/src/serialise/incremental.rs b/raphtory/src/serialise/incremental.rs index 5269ed37d6..c7a6a08302 100644 --- a/raphtory/src/serialise/incremental.rs +++ b/raphtory/src/serialise/incremental.rs @@ -1,5 +1,8 @@ use crate::{ - core::{utils::errors::GraphError, Prop, PropType}, + core::{ + utils::errors::{GraphError, WriteError}, + Prop, PropType, + }, db::{ api::{storage::storage::Storage, view::MaterializedGraph}, graph::views::deletion_graph::PersistentGraph, @@ -19,7 +22,7 @@ use raphtory_api::core::{ use std::{ fmt::Debug, fs::{File, OpenOptions}, - io::Write, + io::{Seek, SeekFrom, Write}, mem, ops::DerefMut, path::Path, @@ -31,6 +34,18 @@ pub struct GraphWriter { proto_delta: Mutex, } +fn try_write(writer: &mut File, bytes: &[u8]) -> Result<(), WriteError> { + let pos = writer + .seek(SeekFrom::End(0)) + .map_err(WriteError::WriteError)?; + writer + .write_all(bytes) + .map_err(|write_err| match writer.set_len(pos) { + Ok(_) => WriteError::WriteError(write_err), + Err(reset_err) => WriteError::FatalWriteError(write_err, reset_err), + }) +} + impl GraphWriter { pub fn new(file: File) -> Self { Self { @@ -38,11 +53,14 @@ impl GraphWriter { proto_delta: Default::default(), } } + pub fn write(&self) -> Result<(), GraphError> { let mut proto = mem::take(self.proto_delta.lock().deref_mut()); let bytes = proto.encode_to_vec(); if !bytes.is_empty() { - if let Err(write_err) = self.writer.lock().write_all(&bytes) { + let mut writer = self.writer.lock(); + + if let Err(write_err) = try_write(&mut writer, &bytes) { // If the write fails, try to put the updates back let mut new_delta = self.proto_delta.lock(); let bytes = new_delta.encode_to_vec(); @@ -58,6 +76,7 @@ impl GraphWriter { } return Err(write_err.into()); } + // should we flush the file? } Ok(()) } @@ -276,3 +295,27 @@ impl CacheOps for G { Ok(graph) } } + +#[cfg(test)] +mod test { + use crate::serialise::incremental::GraphWriter; + use raphtory_api::core::{ + entities::{GidRef, VID}, + storage::dict_mapper::MaybeNew, + }; + use std::fs::File; + use tempfile::NamedTempFile; + + #[test] + fn test_write_failure() { + let tmp_file = NamedTempFile::new().unwrap(); + let read_only = File::open(tmp_file.path()).unwrap(); + + let cache = GraphWriter::new(read_only); + cache.resolve_node(MaybeNew::New(VID(0)), GidRef::Str("0")); + let res = cache.write(); + println!("{res:?}"); + assert!(res.is_err()); + assert_eq!(cache.proto_delta.lock().nodes.len(), 1); + } +}