Skip to content

Commit 601690b

Browse files
committed
Fix review comments Iter 1 - add account normalization
1 parent 1994b9a commit 601690b

File tree

4 files changed

+135
-126
lines changed

4 files changed

+135
-126
lines changed

src/adapter/graphql/tests/tests/test_gql_data.rs

+3-67
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async fn create_test_dataset(
110110
#[test_log::test(tokio::test)]
111111
async fn test_dataset_schema_local_fs() {
112112
let tempdir = tempfile::tempdir().unwrap();
113-
let catalog = create_catalog_with_local_workspace(tempdir.path(), false);
113+
let catalog = create_catalog_with_local_workspace(tempdir.path(), true);
114114
create_test_dataset(&catalog, tempdir.path(), None).await;
115115

116116
let schema = kamu_adapter_graphql::schema_quiet();
@@ -165,75 +165,11 @@ async fn test_dataset_schema_local_fs() {
165165

166166
/////////////////////////////////////////////////////////////////////////////////////////
167167

168-
// #[test_group::group(engine, datafusion)]
169-
#[test_log::test(tokio::test)]
170-
async fn test_dataset_case_insensetive_schema_local_fs() {
171-
let tempdir = tempfile::tempdir().unwrap();
172-
let catalog = create_catalog_with_local_workspace(tempdir.path(), true);
173-
create_test_dataset(
174-
&catalog,
175-
tempdir.path(),
176-
Some(AccountName::new_unchecked("KaMu")),
177-
)
178-
.await;
179-
180-
let schema = kamu_adapter_graphql::schema_quiet();
181-
let res = schema
182-
.execute(
183-
async_graphql::Request::new(indoc::indoc!(
184-
r#"
185-
{
186-
datasets {
187-
byOwnerAndName(accountName: "kAmU", datasetName: "FOo") {
188-
name
189-
data {
190-
tail(limit: 1, schemaFormat: PARQUET_JSON, dataFormat: JSON) {
191-
... on DataQueryResultSuccess {
192-
schema { content }
193-
}
194-
}
195-
}
196-
}
197-
}
198-
}
199-
"#
200-
))
201-
.data(catalog),
202-
)
203-
.await;
204-
assert!(res.is_ok(), "{res:?}");
205-
let json = serde_json::to_string(&res.data).unwrap();
206-
let json = serde_json::from_str::<serde_json::Value>(&json).unwrap();
207-
let data_schema = &json["datasets"]["byOwnerAndName"]["data"]["tail"]["schema"]["content"];
208-
let data_schema =
209-
serde_json::from_str::<serde_json::Value>(data_schema.as_str().unwrap()).unwrap();
210-
assert_eq!(
211-
data_schema,
212-
serde_json::json!({
213-
"name": "arrow_schema",
214-
"type": "struct",
215-
"fields": [{
216-
"name": "offset",
217-
"repetition": "REQUIRED",
218-
"type": "INT64",
219-
"logicalType": "INTEGER(64,false)"
220-
}, {
221-
"name": "blah",
222-
"repetition": "REQUIRED",
223-
"type": "BYTE_ARRAY",
224-
"logicalType": "STRING"
225-
}]
226-
})
227-
);
228-
}
229-
230-
/////////////////////////////////////////////////////////////////////////////////////////
231-
232168
#[test_group::group(engine, datafusion)]
233169
#[test_log::test(tokio::test)]
234170
async fn test_dataset_tail_local_fs() {
235171
let tempdir = tempfile::tempdir().unwrap();
236-
let catalog = create_catalog_with_local_workspace(tempdir.path(), false);
172+
let catalog = create_catalog_with_local_workspace(tempdir.path(), true);
237173
create_test_dataset(&catalog, tempdir.path(), None).await;
238174

239175
let schema = kamu_adapter_graphql::schema_quiet();
@@ -274,7 +210,7 @@ async fn test_dataset_tail_local_fs() {
274210
#[test_log::test(tokio::test)]
275211
async fn test_dataset_tail_empty_local_fs() {
276212
let tempdir = tempfile::tempdir().unwrap();
277-
let catalog = create_catalog_with_local_workspace(tempdir.path(), false);
213+
let catalog = create_catalog_with_local_workspace(tempdir.path(), true);
278214
create_test_dataset(&catalog, tempdir.path(), None).await;
279215

280216
let schema = kamu_adapter_graphql::schema_quiet();

src/infra/core/src/remote_repository_registry_impl.rs

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ impl RemoteRepositoryRegistryImpl {
4141
let file_path = self.repos_dir.join(repo_name);
4242

4343
if !file_path.exists() {
44+
// run full scan to support case-insensetive matches
4445
let all_repositories_stream = self.get_all_repositories();
4546
for repository_name in all_repositories_stream {
4647
if &repository_name == repo_name {

src/infra/core/src/repos/dataset_repository_local_fs.rs

+102-57
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10-
use std::path::PathBuf;
10+
use std::path::{Path, PathBuf};
1111
use std::sync::Arc;
1212

1313
use async_trait::async_trait;
@@ -114,6 +114,19 @@ impl DatasetRepositoryLocalFs {
114114
self.storage_strategy.get_dataset_path(&dataset_handle),
115115
))
116116
}
117+
118+
fn get_canonical_path_param(dataset_path: &Path) -> Result<(PathBuf, String), InternalError> {
119+
let canonical_dataset_path = std::fs::canonicalize(dataset_path).int_err()?;
120+
let dataset_name_str = canonical_dataset_path
121+
.to_str()
122+
.unwrap()
123+
.split('/')
124+
.last()
125+
.unwrap()
126+
.to_owned();
127+
128+
Ok((canonical_dataset_path, dataset_name_str))
129+
}
117130
}
118131

119132
/////////////////////////////////////////////////////////////////////////////////////////
@@ -241,7 +254,21 @@ impl DatasetRepository for DatasetRepositoryLocalFs {
241254

242255
// It's okay to create a new dataset by this point
243256
let dataset_id = seed_block.event.dataset_id.clone();
244-
let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone());
257+
let dataset_handle = if let Some(account_name) = &dataset_alias.account_name {
258+
let (_, canonical_account_name) = self
259+
.storage_strategy
260+
.resolve_account_dir(account_name)
261+
.int_err()?;
262+
let canonical_dataset_alias = DatasetAlias::new(
263+
Some(canonical_account_name),
264+
dataset_alias.dataset_name.clone(),
265+
);
266+
267+
DatasetHandle::new(dataset_id, canonical_dataset_alias)
268+
} else {
269+
DatasetHandle::new(dataset_id, dataset_alias.clone())
270+
};
271+
245272
let dataset_path = self.storage_strategy.get_dataset_path(&dataset_handle);
246273
let layout = DatasetLayout::create(&dataset_path).int_err()?;
247274
let dataset = Self::build_dataset(layout, self.event_bus.clone());
@@ -430,6 +457,11 @@ trait DatasetStorageStrategy: Sync + Send {
430457
dataset_handle: &DatasetHandle,
431458
new_name: &DatasetName,
432459
) -> Result<(), InternalError>;
460+
461+
fn resolve_account_dir(
462+
&self,
463+
account_name: &AccountName,
464+
) -> Result<(PathBuf, AccountName), ResolveDatasetError>;
433465
}
434466

435467
#[derive(thiserror::Error, Debug)]
@@ -476,10 +508,11 @@ impl DatasetSingleTenantStorageStrategy {
476508
&self,
477509
dataset_path: &PathBuf,
478510
dataset_alias: &DatasetAlias,
479-
) -> Result<DatasetSummary, ResolveDatasetError> {
511+
) -> Result<(DatasetSummary, DatasetAlias), ResolveDatasetError> {
480512
let layout = DatasetLayout::new(dataset_path);
481513
let dataset = DatasetRepositoryLocalFs::build_dataset(layout, self.event_bus.clone());
482-
dataset
514+
515+
let dataset_summary = dataset
483516
.get_summary(GetSummaryOpts::default())
484517
.await
485518
.map_err(|e| {
@@ -490,40 +523,31 @@ impl DatasetSingleTenantStorageStrategy {
490523
} else {
491524
ResolveDatasetError::Internal(e.int_err())
492525
}
493-
})
494-
}
526+
})?;
495527

496-
async fn attempt_resolve_dataset_alias(
497-
&self,
498-
dataset_alias: &DatasetAlias,
499-
) -> Result<DatasetHandle, ResolveDatasetError> {
500-
assert!(
501-
!dataset_alias.is_multi_tenant()
502-
|| dataset_alias.account_name.as_ref().unwrap() == DEFAULT_ACCOUNT_NAME,
503-
"Multi-tenant refs shouldn't have reached down to here with earlier validations"
504-
);
505-
506-
let dataset_path = self.dataset_path_impl(dataset_alias);
507-
if !dataset_path.exists() {
508-
return Err(ResolveDatasetError::NotFound(DatasetNotFoundError {
509-
dataset_ref: dataset_alias.as_local_ref(),
510-
}));
511-
}
528+
let (_, canonical_dataset_name) =
529+
DatasetRepositoryLocalFs::get_canonical_path_param(dataset_path)?;
530+
let canonical_dataset_alias = DatasetAlias {
531+
dataset_name: DatasetName::new_unchecked(canonical_dataset_name.as_str()),
532+
account_name: None,
533+
};
512534

513-
self.resolve_dataset_handle(&dataset_path, dataset_alias)
514-
.await
535+
Ok((dataset_summary, canonical_dataset_alias))
515536
}
516537

517538
async fn resolve_dataset_handle(
518539
&self,
519540
dataset_path: &PathBuf,
520541
dataset_alias: &DatasetAlias,
521542
) -> Result<DatasetHandle, ResolveDatasetError> {
522-
let summary = self
543+
let (summary, canonical_dataset_alias) = self
523544
.attempt_resolving_summary_via_path(dataset_path, dataset_alias)
524545
.await?;
525546

526-
Ok(DatasetHandle::new(summary.id, dataset_alias.clone()))
547+
Ok(DatasetHandle::new(
548+
summary.id,
549+
canonical_dataset_alias.clone(),
550+
))
527551
}
528552
}
529553

@@ -550,7 +574,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy {
550574
}
551575
let dataset_name = DatasetName::try_from(&dataset_dir_entry.file_name()).int_err()?;
552576
let dataset_alias = DatasetAlias::new(None, dataset_name);
553-
match self.attempt_resolve_dataset_alias(&dataset_alias).await {
577+
match self.resolve_dataset_handle(&dataset_dir_entry.path(), &dataset_alias).await {
554578
Ok(hdl) => { yield hdl; Ok(()) }
555579
Err(ResolveDatasetError::NotFound(_)) => Ok(()),
556580
Err(e) => Err(e.int_err())
@@ -623,12 +647,12 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy {
623647

624648
let dataset_path = self.dataset_path_impl(&alias);
625649

626-
let summary = self
650+
let (summary, canonical_dataset_alias) = self
627651
.attempt_resolving_summary_via_path(&dataset_path, &alias)
628652
.await?;
629653

630654
if summary.id == *dataset_id {
631-
return Ok(DatasetHandle::new(summary.id, alias));
655+
return Ok(DatasetHandle::new(summary.id, canonical_dataset_alias));
632656
}
633657
}
634658

@@ -656,6 +680,16 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy {
656680
std::fs::rename(old_dataset_path, new_dataset_path).int_err()?;
657681
Ok(())
658682
}
683+
684+
fn resolve_account_dir(
685+
&self,
686+
_account_name: &AccountName,
687+
) -> Result<(PathBuf, AccountName), ResolveDatasetError> {
688+
Ok((
689+
self.root.join(DEFAULT_ACCOUNT_NAME),
690+
AccountName::new_unchecked(DEFAULT_ACCOUNT_NAME),
691+
))
692+
}
659693
}
660694

661695
/////////////////////////////////////////////////////////////////////////////////////////
@@ -772,32 +806,6 @@ impl DatasetMultiTenantStorageStrategy {
772806
}
773807
})
774808
}
775-
776-
fn resolve_account_dir(
777-
&self,
778-
account_name: &AccountName,
779-
) -> Result<PathBuf, ResolveDatasetError> {
780-
let account_dataset_dir_path = self.root.join(account_name);
781-
782-
if !account_dataset_dir_path.is_dir() {
783-
let read_accout_dirs = std::fs::read_dir(self.root.as_path()).int_err()?;
784-
785-
for read_accout_dir in read_accout_dirs {
786-
let account_dir_name = AccountName::new_unchecked(
787-
read_accout_dir
788-
.int_err()?
789-
.file_name()
790-
.to_str()
791-
.unwrap_or(""),
792-
);
793-
if account_name == &account_dir_name {
794-
return Ok(self.root.join(account_dir_name));
795-
}
796-
}
797-
}
798-
799-
Ok(account_dataset_dir_path)
800-
}
801809
}
802810

803811
#[async_trait]
@@ -862,10 +870,14 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy {
862870
dataset_alias: &DatasetAlias,
863871
) -> Result<DatasetHandle, ResolveDatasetError> {
864872
let effective_account_name = self.effective_account_name(dataset_alias);
865-
let account_dataset_dir_path = self.resolve_account_dir(effective_account_name)?;
873+
let (account_dataset_dir_path, _) = self.resolve_account_dir(effective_account_name)?;
866874

867875
if account_dataset_dir_path.is_dir() {
868-
let read_dataset_dir = std::fs::read_dir(account_dataset_dir_path).int_err()?;
876+
let read_dataset_dir = std::fs::read_dir(account_dataset_dir_path).map_err(|_| {
877+
ResolveDatasetError::NotFound(DatasetNotFoundError {
878+
dataset_ref: dataset_alias.as_local_ref(),
879+
})
880+
})?;
869881

870882
for r_dataset_dir in read_dataset_dir {
871883
let dataset_dir_entry = r_dataset_dir.int_err()?;
@@ -962,6 +974,39 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy {
962974

963975
Ok(())
964976
}
977+
978+
fn resolve_account_dir(
979+
&self,
980+
account_name: &AccountName,
981+
) -> Result<(PathBuf, AccountName), ResolveDatasetError> {
982+
let account_dataset_dir_path = self.root.join(account_name);
983+
984+
if !account_dataset_dir_path.is_dir() {
985+
let read_accout_dirs = std::fs::read_dir(self.root.as_path()).int_err()?;
986+
987+
for read_accout_dir in read_accout_dirs {
988+
let account_dir_name = AccountName::new_unchecked(
989+
read_accout_dir
990+
.int_err()?
991+
.file_name()
992+
.to_str()
993+
.unwrap_or(""),
994+
);
995+
if account_name == &account_dir_name {
996+
return Ok((account_dataset_dir_path, account_dir_name));
997+
}
998+
}
999+
return Ok((account_dataset_dir_path, account_name.clone()));
1000+
}
1001+
1002+
let (canonical_account_dataset_dir_path, canonical_account_name) =
1003+
DatasetRepositoryLocalFs::get_canonical_path_param(&account_dataset_dir_path)?;
1004+
1005+
Ok((
1006+
canonical_account_dataset_dir_path,
1007+
AccountName::new_unchecked(canonical_account_name.as_str()),
1008+
))
1009+
}
9651010
}
9661011

9671012
/////////////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)