Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pop logs in log segment after cleaning up logs #3064

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//!
//!

use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{BufRead, BufReader, Cursor};
use std::sync::Arc;

Expand All @@ -34,10 +34,12 @@ use super::{
Action, Add, AddCDCFile, CommitInfo, DataType, Metadata, Protocol, Remove, StructField,
Transaction,
};
use crate::checkpoints::cleanup_expired_logs_for;
use crate::kernel::parse::read_cdf_adds;
use crate::kernel::{ActionType, StructType};
use crate::logstore::LogStore;
use crate::operations::transaction::CommitData;
use crate::protocol::ProtocolError;
use crate::table::config::TableConfig;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

Expand Down Expand Up @@ -567,6 +569,44 @@ impl EagerSnapshot {
))
}

pub async fn clean_up_logs(
&mut self,
until_version: i64,
log_store: &dyn LogStore,
cutoff_timestamp: i64,
) -> Result<usize, ProtocolError> {
let mut deleted =
cleanup_expired_logs_for(until_version, log_store, cutoff_timestamp).await?;
let mut survived_files = VecDeque::new();

while !deleted.is_empty() {
if deleted.is_empty() {
break;
}
if self.snapshot.log_segment.commit_files.is_empty() {
break;
}

match self.snapshot.log_segment.commit_files.pop_back() {
Some(end) => {
if let Ok(idx) = deleted.binary_search(&end.location) {
deleted.remove(idx);
} else {
survived_files.push_front(end.clone());
}
}
None => (),
}
}

self.snapshot
.log_segment
.commit_files
.append(&mut survived_files);

Ok(deleted.len())
}

/// Advance the snapshot based on the given commit actions
pub fn advance<'a>(
&mut self,
Expand Down
20 changes: 11 additions & 9 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ use serde_json::Value;
use tracing::warn;

use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for};
use crate::checkpoints::create_checkpoint_for;
use crate::errors::DeltaTableError;
use crate::kernel::{
Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Transaction,
Expand Down Expand Up @@ -664,7 +664,7 @@ impl PostCommit<'_> {
} else {
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState { snapshot };
let mut state = DeltaTableState { snapshot };
// Execute each hook
if self.create_checkpoint {
self.create_checkpoint(&state, &self.log_store, self.version)
Expand All @@ -677,13 +677,15 @@ impl PostCommit<'_> {
};

if cleanup_logs {
cleanup_expired_logs_for(
self.version,
self.log_store.as_ref(),
Utc::now().timestamp_millis()
- state.table_config().log_retention_duration().as_millis() as i64,
)
.await?;
state
.snapshot
.clean_up_logs(
self.version,
self.log_store.as_ref(),
Utc::now().timestamp_millis()
- state.table_config().log_retention_duration().as_millis() as i64,
)
.await?;
}
Ok(state)
} else {
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl VacuumBuilder {
self.log_store.object_store().clone(),
)
.await?;

let valid_files = self.snapshot.file_paths_iter().collect::<HashSet<Path>>();

let mut files_to_delete = vec![];
Expand Down
41 changes: 25 additions & 16 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use chrono::{Datelike, NaiveDate, NaiveDateTime, Utc};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use lazy_static::lazy_static;
use object_store::path::Path;
use object_store::{Error, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
Expand All @@ -27,7 +28,7 @@ use crate::kernel::{
use crate::logstore::LogStore;
use crate::table::state::DeltaTableState;
use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder};
use crate::{open_table_with_version, DeltaTable};
use crate::{open_table_with_version, DeltaTable, DeltaTableError};

type SchemaPath = Vec<String>;

Expand Down Expand Up @@ -96,22 +97,28 @@ pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError>
Ok(())
}

/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<usize, ProtocolError> {
// / Delete expires log files before given version from table. The table log retention is based on
// / the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(
table: &DeltaTable,
) -> Result<(Option<DeltaTableState>, usize), ProtocolError> {
let log_retention_timestamp = Utc::now().timestamp_millis()
- table
.snapshot()
.map_err(|_| ProtocolError::NoMetaData)?
.table_config()
.log_retention_duration()
.as_millis() as i64;
cleanup_expired_logs_for(
table.version(),
table.log_store.as_ref(),
log_retention_timestamp,
)
.await
let version = table.version();

let mut state: Option<DeltaTableState> = table.state.clone();
let size = state
.as_mut()
.ok_or(ProtocolError::NoMetaData)?
.snapshot
.clean_up_logs(version, table.log_store.as_ref(), log_retention_timestamp)
.await?;
Ok((state, size))
}

/// Loads table from given `table_uri` at given `version` and creates checkpoint for it.
Expand All @@ -132,7 +139,7 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
cleanup.unwrap_or_else(|| snapshot.table_config().enable_expired_log_cleanup());

if table.version() >= 0 && enable_expired_log_cleanup {
let deleted_log_num = cleanup_metadata(&table).await?;
let (_, deleted_log_num) = cleanup_metadata(&table).await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}

Expand Down Expand Up @@ -198,7 +205,7 @@ pub async fn cleanup_expired_logs_for(
until_version: i64,
log_store: &dyn LogStore,
cutoff_timestamp: i64,
) -> Result<usize, ProtocolError> {
) -> Result<Vec<Path>, ProtocolError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap();
Expand All @@ -210,7 +217,7 @@ pub async fn cleanup_expired_logs_for(
.await;

if let Err(Error::NotFound { path: _, source: _ }) = maybe_last_checkpoint {
return Ok(0);
return Ok(vec![]);
}

let last_checkpoint = maybe_last_checkpoint?.bytes().await?;
Expand Down Expand Up @@ -255,7 +262,7 @@ pub async fn cleanup_expired_logs_for(
.await?;

debug!("Deleted {} expired logs", deleted.len());
Ok(deleted.len())
Ok(deleted)
}

fn parquet_bytes_from_state(
Expand Down Expand Up @@ -889,7 +896,8 @@ mod tests {
log_retention_timestamp,
)
.await
.unwrap();
.unwrap()
.len();
assert_eq!(count, 0);
println!("{:?}", count);

Expand Down Expand Up @@ -917,7 +925,8 @@ mod tests {
log_retention_timestamp,
)
.await
.unwrap();
.unwrap()
.len();
assert_eq!(count, 1);

let log_store = table.log_store();
Expand Down
16 changes: 15 additions & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,21 @@ class RawDeltaTable:
ending_timestamp: Optional[str] = None,
allow_out_of_range: bool = False,
) -> pyarrow.RecordBatchReader: ...
def write(
self,
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
commit_properties: Optional[CommitProperties],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def transaction_versions(self) -> Dict[str, Transaction]: ...
def __datafusion_table_provider__(self) -> Any: ...

Expand All @@ -243,7 +258,6 @@ def write_to_deltalake(
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
table: Optional[RawDeltaTable],
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
Expand Down
49 changes: 31 additions & 18 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,25 +320,38 @@ def write_deltalake(
conversion_mode=ArrowSchemaConversionMode.PASSTHROUGH,
)
data = RecordBatchReader.from_batches(schema, (batch for batch in data))
write_deltalake_rust(
table_uri=table_uri,
data=data,
partition_by=partition_by,
mode=mode,
table=table._table if table is not None else None,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
storage_options=storage_options,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
if table:
table.update_incremental()
table._table.write(
data=data,
partition_by=partition_by,
mode=mode,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
else:
write_deltalake_rust(
table_uri=table_uri,
data=data,
partition_by=partition_by,
mode=mode,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
storage_options=storage_options,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
elif engine == "pyarrow":
warnings.warn(
"pyarrow engine is deprecated and will be removed in v1.0",
Expand Down
Loading
Loading