Skip to content

Commit

Permalink
refactor: bump fusio to 0.35
Browse files Browse the repository at this point in the history
  • Loading branch information
crwen committed Feb 6, 2025
1 parent 88bcd76 commit 7274831
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 28 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42.2.0", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio", rev = "92fd436b75f7cc496ec16d1e2be62481fce52e45", version = "0.3.4", package = "fusio", features = [
fusio = { version = "0.3.5", package = "fusio", features = [
"dyn",
"fs",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio", rev = "92fd436b75f7cc496ec16d1e2be62481fce52e45", version = "0.3.4", package = "fusio-dispatch" }
fusio-log = { git = "https://github.com/tonbo-io/fusio", rev = "92fd436b75f7cc496ec16d1e2be62481fce52e45", version = "0.3.4", package = "fusio-log" , default-features = false, features = ["bytes"] }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio", rev = "92fd436b75f7cc496ec16d1e2be62481fce52e45", version = "0.3.4", package = "fusio-parquet" }
fusio-dispatch = { version = "0.3.5", package = "fusio-dispatch" }
fusio-log = { version = "0.3.5", package = "fusio-log" , default-features = false, features = ["bytes"] }
fusio-parquet = { version = "0.3.5", package = "fusio-parquet" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ crate-type = ["cdylib"]
[workspace]

[dependencies]
fusio = { git = "https://github.com/tonbo-io/fusio", rev = "92fd436b75f7cc496ec16d1e2be62481fce52e45", version = "0.3.4", package = "fusio", features = [
fusio = { version = "0.3.5", package = "fusio", features = [
"aws",
"tokio",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio", rev = "92fd436b75f7cc496ec16d1e2be62481fce52e45", version = "0.3.4", package = "fusio-dispatch", features = [
fusio-dispatch = { version = "0.3.5", package = "fusio-dispatch", features = [
"aws",
"tokio",
] }
Expand Down
35 changes: 19 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,25 @@ where
record_schema: Arc<R::Schema>,
manager: &StoreManager,
) -> Result<Self, DbError<R>> {
let base_fs = manager.base_fs();
let wal_dir_path = option.wal_dir_path();
let mut transaction_map = HashMap::new();
let mut wal_ids = Vec::new();

let wal_metas = {
let mut wal_metas = Vec::new();
let mut wal_stream = base_fs.list(&wal_dir_path).await?;

while let Some(file_meta) = wal_stream.next().await {
let file_meta = file_meta?;
if file_meta.path.as_ref().ends_with("wal") {
wal_metas.push(file_meta);
}
}
wal_metas.sort_by(|meta_a, meta_b| meta_a.path.cmp(&meta_b.path));
wal_metas
};

let trigger = TriggerFactory::create(option.trigger_type);
let mut schema = DbStorage {
mutable: Mutable::new(
Expand All @@ -500,22 +519,6 @@ where
record_schema,
};

let base_fs = manager.base_fs();
let wal_dir_path = option.wal_dir_path();
let mut transaction_map = HashMap::new();
let mut wal_ids = Vec::new();

let wal_metas = {
let mut wal_metas = Vec::new();
let mut wal_stream = base_fs.list(&wal_dir_path).await?;

while let Some(file_meta) = wal_stream.next().await {
wal_metas.push(file_meta?);
}
wal_metas.sort_by(|meta_a, meta_b| meta_a.path.cmp(&meta_b.path));
wal_metas
};

for wal_meta in wal_metas {
let wal_path = wal_meta.path;

Expand Down
33 changes: 27 additions & 6 deletions src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ pub(crate) struct WalFile<R>
where
R: Record,
{
file: Logger<Log<R>>,
file: Option<Logger<Log<R>>>,
file_id: FileId,
path: Path,
wal_buffer_size: usize,
fs: Arc<dyn DynFs>,
}

impl<R> WalFile<R>
Expand All @@ -29,13 +32,19 @@ where
wal_buffer_size: usize,
file_id: FileId,
) -> Self {
let file = Options::new(path)
let file = Options::new(path.clone())
.buf_size(wal_buffer_size)
.build_with_fs::<Log<R>>(fs)
.build_with_fs::<Log<R>>(fs.clone())
.await
.unwrap();

Self { file, file_id }
Self {
file: Some(file),
file_id,
path,
wal_buffer_size,
fs,
}
}

pub(crate) fn file_id(&self) -> FileId {
Expand All @@ -48,11 +57,23 @@ where
R: Record,
{
pub(crate) async fn write<'r>(&mut self, data: &Log<R>) -> Result<(), LogError> {
self.file.write(data).await
if self.file.is_none() {
self.file = Some(
Options::new(self.path.clone())
.buf_size(self.wal_buffer_size)
.build_with_fs::<Log<R>>(self.fs.clone())
.await?,
);
}

self.file.as_mut().unwrap().write(data).await
}

pub(crate) async fn flush(&mut self) -> Result<(), LogError> {
self.file.flush().await
match self.file.take() {
Some(mut file) => file.close().await,
None => Ok(()),
}
}
}

Expand Down
1 change: 1 addition & 0 deletions tests/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ mod tests {
}
}
}
db.flush_wal().await.unwrap();
drop(db);
remove("opfs_dir").await;
}
Expand Down

0 comments on commit 7274831

Please sign in to comment.