Skip to content

Commit 69befbc

Browse files
s373rsergiimk
andauthored
MetadataChainVisitor: continuing integration & address review comments (#543)
* Initial changes - ValidateSequenceNumbersIntegrityVisitor: remove extra lifetime declaration - MetadataVisitorDecision: add documentation - Validators: use type aliases - MetadataChainExt: document accept_*() methods - MetadataChainExt::accept_by_hash_with_decisions(): assert lengths - SearchSingleTypedBlockVisitor: introduce - QueryServiceImpl::tail(): use the accept API - TransformServiceImpl::get_vocab(): use the accept API - Dataset::created_at(): use the accept API - SearchSeedVisitor & SearchSetPollingSourceVisitor: add - SearchSingleTypedBlockVisitor::into_block(): add - metadata_chain_visitors: move to domain scope - PollingIngestService::get_active_polling_source(): use the accept API - TransformServiceImpl::get_active_transform(): use the accept API - Add: From<IterBlocksError> for InternalError - Build fixes - DatasetDataHelper::get_last_set_data_schema_block(): use the accept API - ValidateLogicalStructureVisitor: implement & use - SearchSetDataSchemaVisitor: add - MetadataChainVisitor: rename into_found_hashed_block() -> into_hashed_block() - ValidateLogicalStructureVisitor: shorter names of subtypes - Fix a lint warning - MetadataChainImpl::validate_append_event_logical_structure(): remove - test_append_prev_block_not_found(): fix - ValidateLogicalStructureVisitor: fix unit-tests - test_append_system_time_non_monotonic(): fix - ValidateLogicalStructureVisitor: use type shorthands - SearchDataBlocksVisitor: implement - Visitors: use default generic error types - DataWriterDataFusionMetaDataStateVisitor: use type alias - DatasetMetadata::current_watermark(): use the accept API - QueryServiceImpl::get_schema_impl(): use the accept API - TransformServiceImpl::is_never_pulled(): use the accept API - DatasetChangesServiceImpl::make_increment_from_interval(): use the accept API - TransformServiceImpl::get_transform_input_from_query_input(): use the accept API - ListCommand::run(): use the accept API - MetadataChainExt::accept_by_interval(): introduce - DatasetChangesServiceImpl::make_increment_from_interval(): use the accept API - QueryServiceImpl::collect_data_file_hashes(): use the accept API - TransformServiceImpl::get_transform_input(): use the accept API - TransformServiceImpl::get_next_operation(): use the accept API [2] - SearchSingleTypedBlockVisitor::into_event(): add & use - DatasetImpl::compute_summary_increment(): use the accept API - MetadataChainExt::last_of_type(): remove - MetadataChainExt::accept_by_interval_with_decisions(): update decision as well - DatasetMetadata::get_last_block_of_type(): use the accept API - DatasetMetadataMut::get_last_block_of_type(): use the accept API - TransformServiceImpl::get_transform_input_from_query_input(): fix unit-test - Visual separation between Visitors - SearchDataBlocksVisitor: provide several visitor variants - SearchDataBlocksVisitor: replace callback with kind - SearchDataBlocksVisitor: use updated methods - SearchDataBlocksVisitor: use into_event() - SearchDataBlocksVisitor: fix a lint warning - Dataset::last_updated_at(): use the accept API - ListCommand::run(): correct data_block_visitor's kind - ListCommand::run(): correct num_blocks calculation - TransformServiceImpl::get_verification_plan(): use the accept API * Fix PR comments, Iter. 1 - MetadataVisitorDecision: fix type in doc examples - Use generated MetadataEventTypeFlags - Queries: use last() - Derive DataBlockAnalysisVisitorState with Default * Fix PR comments, Iter. 2 - TransformServiceImpl::get_vocab(): use accept() - TransformServiceImpl::is_never_pulled(): use accept() - QueryServiceImpl::tail(): use accept() - DatasetDataHelper::get_last_set_data_schema_block(): use accept() - Dataset: use accept() - QueryServiceImpl::get_schema_impl(): use accept() - TransformServiceImpl::get_active_transform(): use accept() - PollingIngestServiceImpl::get_active_polling_source(): use accept() - MetadataChainExt::accept_one(): add - DatasetMetadataMut: use accept_one() - DatasetMetadata: use accept_one() - GenericCallbackVisitor: add "initial_decision" argument - MetadataChainExt::accept_by_interval(): use MetadataVisitorDecision::initial_decision() - Visitors: do not return Decision in ctors - ValidateSeedBlockOrderVisitor::initial_decision(): implement - MetadataChainVisitor::initial_decision(): introduce * Fix PR comments, Iter. 3 - Pre-review... code-review - Refresh unit-tests - GenericCallbackVisitor: flatten callback args - SearchDataBlocksVisitor: replace by reduce API - TransformServiceImpl::get_transform_input(): use reduce API - TransformServiceImpl::is_never_pulled(): use reduce API - DatasetChangesServiceImpl::make_increment_from_interval(): use reduce API - DatasetMetadata::current_watermark(): use reduce API - DataWriterDataFusionMetaDataStateVisitor -> SourceEventVisitor - DataWriterDataFusionMetaDataStateVisitor: extract other add_data things - DataWriterDataFusionMetaDataStateVisitor: extract maybe_source_event - DataWriterDataFusionMetaDataStateVisitor: remove Seed validation - DataWriterDataFusionMetaDataStateVisitor: remove maybe_schema - DataWriterDataFusionMetaDataStateVisitor: remove maybe_set_vocab - MetadataChainImpl::append(): use splitted validators - DatasetChangesServiceImpl::make_increment_from_interval(): use reduce() API - DatasetImpl::compute_summary_increment(): use reduce() API - ListCommand::run(): use reduce() API - Dataset::last_updated_at(): use get_block_by_ref() - QueryServiceImpl::collect_data_file_hashes(): use reduce API - MetadataChainExt::reduce(): introduce - ValidateEventIsNotEmpty: introduce - ValidateExecuteTransform: introduce - ValidateAddData: introduce - ValidateSetTransform: introduce - ValidateAddPushSource & ValidateSetPollingSource: introduce - MetadataChainVisitor::initial_decision(): make infallible * Fix PR comments, Iter. 4 - Fix doc - DataWriterMetadataState::with_metadata_state_scanned(): use SearchAddDataVisitor to simplify logic - MetadataChainVisitor::finish(): add - DatasetImpl::compute_summary_increment(): revert a custom visitor - DatasetChangesServiceImpl::make_increment_from_interval(): revert a custom visitor - Remove consts visitors - ListCommand::run(): simplify - MetadataChainExt: separate reduce_*() to fallible & infallible variants - DatasetChangesServiceImpl::make_increment_from_interval(): use accept_one_by_hash() - GenericFallibleCallbackVisitor(): rename from GenericCallbackVisitor - TransformServiceImpl::get_transform_input)_: use accept_one_by_hash() - last_data_block_with_new_data(): add - Use last_data_block() instead of reduce() - DatasetMetadata: simplify * Fix PR comments, Iter. 5 - Fix doc - Extract SetDataSchemaVisitor - MetadataChainVisitorHolder: implement - Remove InternalError restriction - Drop Sync bound - Use int_err() - MetadataChainVisitorHolderImpl -> MetadataChainVisitorHolder - Remove extra "<", ">" - Fix a unit-test - After rebase fixes - KamuTable: use accept_*() API * Update changelog * Visitor::map_err() ergonomics * Narrow down the validator visitor error type * Rename visitor::create() to new() for consistency * Rename source state visitor --------- Co-authored-by: Sergii Mikhtoniuk <mikhtoniuk@gmail.com>
1 parent 4b904d9 commit 69befbc

29 files changed

+2016
-1120
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88
### Added
99
- Support `ArrowJson` schema output format in QGL API and CLI commands
1010
- New `kamu system compact <dataset>` command that compacts dataslices for the given dataset
11+
### Changed
12+
- Next batch of optimizations of metadata chain traversal through API using Visitors
1113

1214
## [0.170.0] - 2024-03-29
1315
### Added

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

+3
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ codegen-odf-serde:
101101
| rustfmt > $(ODF_CRATE_DIR)/src/dtos/dtos_generated.rs
102102
python $(ODF_SPEC_DIR)/tools/jsonschema_to_rust_traits.py $(ODF_SPEC_DIR)/schemas \
103103
| rustfmt > $(ODF_CRATE_DIR)/src/dtos/dtos_dyntraits_generated.rs
104+
python $(ODF_SPEC_DIR)/tools/jsonschema_to_rust_dto_enum_flags.py $(ODF_SPEC_DIR)/schemas \
105+
| rustfmt > $(ODF_CRATE_DIR)/src/dtos/dtos_enum_flags_generated.rs
104106
python $(ODF_SPEC_DIR)/tools/jsonschema_to_rust_serde_yaml.py $(ODF_SPEC_DIR)/schemas \
105107
| rustfmt > $(ODF_CRATE_DIR)/src/serde/yaml/derivations_generated.rs
106108
python $(ODF_SPEC_DIR)/tools/jsonschema_to_rust_flatbuffers.py $(ODF_SPEC_DIR)/schemas \
@@ -109,6 +111,7 @@ codegen-odf-serde:
109111
$(call add_license_header, "$(ODF_CRATE_DIR)/src/serde/flatbuffers/proxies_generated.rs")
110112
$(call add_license_header, "$(ODF_CRATE_DIR)/src/dtos/dtos_generated.rs")
111113
$(call add_license_header, "$(ODF_CRATE_DIR)/src/dtos/dtos_dyntraits_generated.rs")
114+
$(call add_license_header, "$(ODF_CRATE_DIR)/src/dtos/dtos_enum_flags_generated.rs")
112115
$(call add_license_header, "$(ODF_CRATE_DIR)/src/serde/yaml/derivations_generated.rs")
113116
$(call add_license_header, "$(ODF_CRATE_DIR)/src/serde/flatbuffers/convertors_generated.rs")
114117

src/adapter/graphql/src/mutations/dataset_metadata_mut.rs

+10-23
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10-
use kamu_core::{self as domain, MetadataChainExt, TryStreamExtExt};
10+
use kamu_core::{self as domain, MetadataChainExt, SearchSetAttachmentsVisitor};
1111
use opendatafabric as odf;
12-
use opendatafabric::{AsTypedBlock, VariantOf};
1312

1413
use super::{CommitResultAppendError, CommitResultSuccess, NoChanges};
1514
use crate::mutations::MetadataChainMut;
@@ -38,22 +37,6 @@ impl DatasetMetadataMut {
3837
Ok(dataset)
3938
}
4039

41-
#[graphql(skip)]
42-
async fn get_last_block_of_type<T: VariantOf<odf::MetadataEvent>>(
43-
&self,
44-
ctx: &Context<'_>,
45-
) -> Result<Option<odf::MetadataBlockTyped<T>>> {
46-
let dataset = self.get_dataset(ctx).await?;
47-
let block = dataset
48-
.as_metadata_chain()
49-
.iter_blocks_ref(&domain::BlockRef::Head)
50-
.filter_map_ok(|(_, b)| b.into_typed::<T>())
51-
.try_first()
52-
.await
53-
.int_err()?;
54-
Ok(block)
55-
}
56-
5740
/// Access to the mutable metadata chain of the dataset
5841
async fn chain(&self) -> MetadataChainMut {
5942
MetadataChainMut::new(self.dataset_handle.clone())
@@ -70,11 +53,15 @@ impl DatasetMetadataMut {
7053

7154
let dataset = self.get_dataset(ctx).await?;
7255

73-
let old_attachments = self
74-
.get_last_block_of_type::<odf::SetAttachments>(ctx)
75-
.await?
76-
.map(|b| {
77-
let odf::Attachments::Embedded(at) = b.event.attachments;
56+
let old_attachments = dataset
57+
.as_metadata_chain()
58+
.accept_one(SearchSetAttachmentsVisitor::new())
59+
.await
60+
.int_err()?
61+
.into_event()
62+
.map(|e| {
63+
let odf::Attachments::Embedded(at) = e.attachments;
64+
7865
at
7966
});
8067

src/adapter/graphql/src/queries/datasets/dataset.rs

+12-16
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
// by the Apache License, Version 2.0.
99

1010
use chrono::prelude::*;
11-
use futures::TryStreamExt;
12-
use kamu_core::{self as domain, MetadataChainExt, ServerUrlConfig, TryStreamExtExt};
11+
use kamu_core::{self as domain, MetadataChainExt, SearchSeedVisitor, ServerUrlConfig};
1312
use opendatafabric as odf;
1413

1514
use crate::prelude::*;
@@ -108,29 +107,26 @@ impl Dataset {
108107
/// Creation time of the first metadata block in the chain
109108
async fn created_at(&self, ctx: &Context<'_>) -> Result<DateTime<Utc>> {
110109
let dataset = self.get_dataset(ctx).await?;
111-
let seed = dataset
110+
111+
Ok(dataset
112112
.as_metadata_chain()
113-
.iter_blocks_ref(&domain::BlockRef::Head)
114-
.map_ok(|(_, b)| b)
115-
.try_last()
113+
.accept_one(SearchSeedVisitor::new())
116114
.await
117115
.int_err()?
118-
.expect("Dataset without blocks");
119-
Ok(seed.system_time)
116+
.into_block()
117+
.expect("Dataset without blocks")
118+
.system_time)
120119
}
121120

122121
/// Creation time of the most recent metadata block in the chain
123122
async fn last_updated_at(&self, ctx: &Context<'_>) -> Result<DateTime<Utc>> {
124123
let dataset = self.get_dataset(ctx).await?;
125-
let head = dataset
124+
125+
Ok(dataset
126126
.as_metadata_chain()
127-
.iter_blocks_ref(&domain::BlockRef::Head)
128-
.map_ok(|(_, b)| b)
129-
.try_first()
130-
.await
131-
.int_err()?
132-
.expect("Dataset without blocks");
133-
Ok(head.system_time)
127+
.get_block_by_ref(&domain::BlockRef::Head)
128+
.await?
129+
.system_time)
134130
}
135131

136132
/// Permissions of the current user

src/adapter/graphql/src/queries/datasets/dataset_metadata.rs

+56-49
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,15 @@
88
// by the Apache License, Version 2.0.
99

1010
use chrono::prelude::*;
11-
use kamu_core::{self as domain, MetadataChainExt, TryStreamExtExt};
11+
use kamu_core::{
12+
self as domain,
13+
MetadataChainExt,
14+
SearchSetAttachmentsVisitor,
15+
SearchSetInfoVisitor,
16+
SearchSetLicenseVisitor,
17+
SearchSetVocabVisitor,
18+
};
1219
use opendatafabric as odf;
13-
use opendatafabric::{AsTypedBlock, VariantOf};
1420

1521
use crate::prelude::*;
1622
use crate::queries::*;
@@ -36,38 +42,22 @@ impl DatasetMetadata {
3642
Ok(dataset)
3743
}
3844

39-
#[graphql(skip)]
40-
async fn get_last_block_of_type<T: VariantOf<odf::MetadataEvent>>(
41-
&self,
42-
ctx: &Context<'_>,
43-
) -> Result<Option<odf::MetadataBlockTyped<T>>> {
44-
let dataset = self.get_dataset(ctx).await?;
45-
let block = dataset
46-
.as_metadata_chain()
47-
.iter_blocks_ref(&domain::BlockRef::Head)
48-
.filter_map_ok(|(_, b)| b.into_typed::<T>())
49-
.try_first()
50-
.await
51-
.int_err()?;
52-
Ok(block)
53-
}
54-
5545
/// Access to the temporal metadata chain of the dataset
5646
async fn chain(&self) -> MetadataChain {
5747
MetadataChain::new(self.dataset_handle.clone())
5848
}
5949

6050
/// Last recorded watermark
6151
async fn current_watermark(&self, ctx: &Context<'_>) -> Result<Option<DateTime<Utc>>> {
62-
let ds = self.get_dataset(ctx).await?;
63-
Ok(ds
52+
let dataset = self.get_dataset(ctx).await?;
53+
54+
Ok(dataset
6455
.as_metadata_chain()
65-
.iter_blocks_ref(&domain::BlockRef::Head)
66-
.filter_data_stream_blocks()
67-
.filter_map_ok(|(_, b)| b.event.new_watermark)
68-
.try_first()
56+
.last_data_block()
6957
.await
70-
.int_err()?)
58+
.int_err()?
59+
.into_block()
60+
.and_then(|b| b.event.new_watermark))
7161
}
7262

7363
/// Latest data schema
@@ -197,51 +187,68 @@ impl DatasetMetadata {
197187

198188
/// Current descriptive information about the dataset
199189
async fn current_info(&self, ctx: &Context<'_>) -> Result<SetInfo> {
200-
Ok(self
201-
.get_last_block_of_type::<odf::SetInfo>(ctx)
202-
.await?
190+
let dataset = self.get_dataset(ctx).await?;
191+
192+
Ok(dataset
193+
.as_metadata_chain()
194+
.accept_one(SearchSetInfoVisitor::new())
195+
.await
196+
.int_err()?
197+
.into_event()
203198
.map_or(
204199
SetInfo {
205200
description: None,
206201
keywords: None,
207202
},
208-
|b| b.event.into(),
203+
Into::into,
209204
))
210205
}
211206

212207
/// Current readme file as discovered from attachments associated with the
213208
/// dataset
214209
async fn current_readme(&self, ctx: &Context<'_>) -> Result<Option<String>> {
215-
if let Some(attachments) = self
216-
.get_last_block_of_type::<odf::SetAttachments>(ctx)
217-
.await?
218-
{
219-
match attachments.event.attachments {
220-
odf::Attachments::Embedded(embedded) => Ok(embedded
221-
.items
210+
let dataset = self.get_dataset(ctx).await?;
211+
212+
Ok(dataset
213+
.as_metadata_chain()
214+
.accept_one(SearchSetAttachmentsVisitor::new())
215+
.await
216+
.int_err()?
217+
.into_event()
218+
.and_then(|e| {
219+
let odf::Attachments::Embedded(at) = e.attachments;
220+
221+
at.items
222222
.into_iter()
223223
.filter(|i| i.path == "README.md")
224224
.map(|i| i.content)
225-
.next()),
226-
}
227-
} else {
228-
Ok(None)
229-
}
225+
.next()
226+
}))
230227
}
231228

232229
/// Current license associated with the dataset
233230
async fn current_license(&self, ctx: &Context<'_>) -> Result<Option<SetLicense>> {
234-
Ok(self
235-
.get_last_block_of_type::<odf::SetLicense>(ctx)
236-
.await?
237-
.map(|b| b.event.into()))
231+
let dataset = self.get_dataset(ctx).await?;
232+
233+
Ok(dataset
234+
.as_metadata_chain()
235+
.accept_one(SearchSetLicenseVisitor::new())
236+
.await
237+
.int_err()?
238+
.into_event()
239+
.map(Into::into))
238240
}
239241

240242
/// Current vocabulary associated with the dataset
241243
async fn current_vocab(&self, ctx: &Context<'_>) -> Result<Option<SetVocab>> {
242-
Ok(self
243-
.get_last_block_of_type::<odf::SetVocab>(ctx)
244-
.await?
245-
.map(|b| b.event.into()))
244+
let dataset = self.get_dataset(ctx).await?;
245+
246+
Ok(dataset
247+
.as_metadata_chain()
248+
.accept_one(SearchSetVocabVisitor::new())
249+
.await
250+
.int_err()?
251+
.into_event()
252+
.map(Into::into))
246253
}
247254
}

src/app/cli/src/commands/list_command.rs

+15-14
Original file line numberDiff line numberDiff line change
@@ -258,24 +258,25 @@ impl Command for ListCommand {
258258
size.push(summary.data_size);
259259

260260
if self.detail_level > 0 {
261-
// TODO: Should be precomputed
262-
let mut num_blocks = 0;
263-
let mut last_watermark: Option<DateTime<Utc>> = None;
264-
let mut blocks_stream = dataset.as_metadata_chain().iter_blocks();
265-
while let Some((_, b)) = blocks_stream.try_next().await? {
266-
if num_blocks == 0 {
267-
num_blocks = b.sequence_number + 1;
268-
}
269-
if let Some(b) = b.as_data_stream_block() {
270-
last_watermark = b.event.new_watermark.copied();
271-
break;
272-
}
273-
}
261+
let num_blocks = dataset
262+
.as_metadata_chain()
263+
.get_block(&current_head)
264+
.await
265+
.int_err()?
266+
.sequence_number
267+
+ 1;
268+
let last_watermark = dataset
269+
.as_metadata_chain()
270+
.last_data_block()
271+
.await
272+
.int_err()?
273+
.into_event()
274+
.and_then(|event| event.new_watermark.map(|t| t.timestamp_micros()));
274275

275276
id.push(hdl.id.as_did_str().to_string());
276277
head.push(current_head.as_multibase().to_string());
277278
blocks.push(num_blocks);
278-
watermark.push(last_watermark.map(|t| t.timestamp_micros()));
279+
watermark.push(last_watermark);
279280
}
280281
}
281282

src/domain/core/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ opendatafabric = { workspace = true }
2929
async-stream = { version = "0.3", default-features = false }
3030
async-trait = { version = "0.1", default-features = false }
3131
bytes = { version = "1", default-features = false }
32-
bitflags = { version = "2.4", default-features = false }
3332
chrono = { version = "0.4", default-features = false }
3433
dill = { version = "0.8", default-features = false }
3534
futures = { version = "0.3", default-features = false }

0 commit comments

Comments
 (0)