Skip to content

Commit 7a2eb00

Browse files
committed
Fix review comments Iter 1 - add account normalization
1 parent 1994b9a commit 7a2eb00

File tree

4 files changed

+134
-126
lines changed

4 files changed

+134
-126
lines changed

src/adapter/graphql/tests/tests/test_gql_data.rs

+3-67
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async fn create_test_dataset(
110110
#[test_log::test(tokio::test)]
111111
async fn test_dataset_schema_local_fs() {
112112
let tempdir = tempfile::tempdir().unwrap();
113-
let catalog = create_catalog_with_local_workspace(tempdir.path(), false);
113+
let catalog = create_catalog_with_local_workspace(tempdir.path(), true);
114114
create_test_dataset(&catalog, tempdir.path(), None).await;
115115

116116
let schema = kamu_adapter_graphql::schema_quiet();
@@ -165,75 +165,11 @@ async fn test_dataset_schema_local_fs() {
165165

166166
/////////////////////////////////////////////////////////////////////////////////////////
167167

168-
// #[test_group::group(engine, datafusion)]
169-
#[test_log::test(tokio::test)]
170-
async fn test_dataset_case_insensetive_schema_local_fs() {
171-
let tempdir = tempfile::tempdir().unwrap();
172-
let catalog = create_catalog_with_local_workspace(tempdir.path(), true);
173-
create_test_dataset(
174-
&catalog,
175-
tempdir.path(),
176-
Some(AccountName::new_unchecked("KaMu")),
177-
)
178-
.await;
179-
180-
let schema = kamu_adapter_graphql::schema_quiet();
181-
let res = schema
182-
.execute(
183-
async_graphql::Request::new(indoc::indoc!(
184-
r#"
185-
{
186-
datasets {
187-
byOwnerAndName(accountName: "kAmU", datasetName: "FOo") {
188-
name
189-
data {
190-
tail(limit: 1, schemaFormat: PARQUET_JSON, dataFormat: JSON) {
191-
... on DataQueryResultSuccess {
192-
schema { content }
193-
}
194-
}
195-
}
196-
}
197-
}
198-
}
199-
"#
200-
))
201-
.data(catalog),
202-
)
203-
.await;
204-
assert!(res.is_ok(), "{res:?}");
205-
let json = serde_json::to_string(&res.data).unwrap();
206-
let json = serde_json::from_str::<serde_json::Value>(&json).unwrap();
207-
let data_schema = &json["datasets"]["byOwnerAndName"]["data"]["tail"]["schema"]["content"];
208-
let data_schema =
209-
serde_json::from_str::<serde_json::Value>(data_schema.as_str().unwrap()).unwrap();
210-
assert_eq!(
211-
data_schema,
212-
serde_json::json!({
213-
"name": "arrow_schema",
214-
"type": "struct",
215-
"fields": [{
216-
"name": "offset",
217-
"repetition": "REQUIRED",
218-
"type": "INT64",
219-
"logicalType": "INTEGER(64,false)"
220-
}, {
221-
"name": "blah",
222-
"repetition": "REQUIRED",
223-
"type": "BYTE_ARRAY",
224-
"logicalType": "STRING"
225-
}]
226-
})
227-
);
228-
}
229-
230-
/////////////////////////////////////////////////////////////////////////////////////////
231-
232168
#[test_group::group(engine, datafusion)]
233169
#[test_log::test(tokio::test)]
234170
async fn test_dataset_tail_local_fs() {
235171
let tempdir = tempfile::tempdir().unwrap();
236-
let catalog = create_catalog_with_local_workspace(tempdir.path(), false);
172+
let catalog = create_catalog_with_local_workspace(tempdir.path(), true);
237173
create_test_dataset(&catalog, tempdir.path(), None).await;
238174

239175
let schema = kamu_adapter_graphql::schema_quiet();
@@ -274,7 +210,7 @@ async fn test_dataset_tail_local_fs() {
274210
#[test_log::test(tokio::test)]
275211
async fn test_dataset_tail_empty_local_fs() {
276212
let tempdir = tempfile::tempdir().unwrap();
277-
let catalog = create_catalog_with_local_workspace(tempdir.path(), false);
213+
let catalog = create_catalog_with_local_workspace(tempdir.path(), true);
278214
create_test_dataset(&catalog, tempdir.path(), None).await;
279215

280216
let schema = kamu_adapter_graphql::schema_quiet();

src/infra/core/src/remote_repository_registry_impl.rs

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ impl RemoteRepositoryRegistryImpl {
4141
let file_path = self.repos_dir.join(repo_name);
4242

4343
if !file_path.exists() {
44+
// run full scan to support case-insensetive matches
4445
let all_repositories_stream = self.get_all_repositories();
4546
for repository_name in all_repositories_stream {
4647
if &repository_name == repo_name {

src/infra/core/src/repos/dataset_repository_local_fs.rs

+101-57
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10-
use std::path::PathBuf;
10+
use std::path::{Path, PathBuf};
1111
use std::sync::Arc;
1212

1313
use async_trait::async_trait;
@@ -114,6 +114,18 @@ impl DatasetRepositoryLocalFs {
114114
self.storage_strategy.get_dataset_path(&dataset_handle),
115115
))
116116
}
117+
118+
fn get_canonical_path_param(dataset_path: &Path) -> Result<(PathBuf, String), InternalError> {
119+
let canonical_dataset_path = std::fs::canonicalize(dataset_path).int_err()?;
120+
let dataset_name_str = canonical_dataset_path
121+
.file_name()
122+
.unwrap()
123+
.to_str()
124+
.unwrap()
125+
.to_string();
126+
127+
Ok((canonical_dataset_path, dataset_name_str))
128+
}
117129
}
118130

119131
/////////////////////////////////////////////////////////////////////////////////////////
@@ -241,7 +253,21 @@ impl DatasetRepository for DatasetRepositoryLocalFs {
241253

242254
// It's okay to create a new dataset by this point
243255
let dataset_id = seed_block.event.dataset_id.clone();
244-
let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone());
256+
let dataset_handle = if let Some(account_name) = &dataset_alias.account_name {
257+
let (_, canonical_account_name) = self
258+
.storage_strategy
259+
.resolve_account_dir(account_name)
260+
.int_err()?;
261+
let canonical_dataset_alias = DatasetAlias::new(
262+
Some(canonical_account_name),
263+
dataset_alias.dataset_name.clone(),
264+
);
265+
266+
DatasetHandle::new(dataset_id, canonical_dataset_alias)
267+
} else {
268+
DatasetHandle::new(dataset_id, dataset_alias.clone())
269+
};
270+
245271
let dataset_path = self.storage_strategy.get_dataset_path(&dataset_handle);
246272
let layout = DatasetLayout::create(&dataset_path).int_err()?;
247273
let dataset = Self::build_dataset(layout, self.event_bus.clone());
@@ -430,6 +456,11 @@ trait DatasetStorageStrategy: Sync + Send {
430456
dataset_handle: &DatasetHandle,
431457
new_name: &DatasetName,
432458
) -> Result<(), InternalError>;
459+
460+
fn resolve_account_dir(
461+
&self,
462+
account_name: &AccountName,
463+
) -> Result<(PathBuf, AccountName), ResolveDatasetError>;
433464
}
434465

435466
#[derive(thiserror::Error, Debug)]
@@ -476,10 +507,11 @@ impl DatasetSingleTenantStorageStrategy {
476507
&self,
477508
dataset_path: &PathBuf,
478509
dataset_alias: &DatasetAlias,
479-
) -> Result<DatasetSummary, ResolveDatasetError> {
510+
) -> Result<(DatasetSummary, DatasetAlias), ResolveDatasetError> {
480511
let layout = DatasetLayout::new(dataset_path);
481512
let dataset = DatasetRepositoryLocalFs::build_dataset(layout, self.event_bus.clone());
482-
dataset
513+
514+
let dataset_summary = dataset
483515
.get_summary(GetSummaryOpts::default())
484516
.await
485517
.map_err(|e| {
@@ -490,40 +522,31 @@ impl DatasetSingleTenantStorageStrategy {
490522
} else {
491523
ResolveDatasetError::Internal(e.int_err())
492524
}
493-
})
494-
}
525+
})?;
495526

496-
async fn attempt_resolve_dataset_alias(
497-
&self,
498-
dataset_alias: &DatasetAlias,
499-
) -> Result<DatasetHandle, ResolveDatasetError> {
500-
assert!(
501-
!dataset_alias.is_multi_tenant()
502-
|| dataset_alias.account_name.as_ref().unwrap() == DEFAULT_ACCOUNT_NAME,
503-
"Multi-tenant refs shouldn't have reached down to here with earlier validations"
504-
);
505-
506-
let dataset_path = self.dataset_path_impl(dataset_alias);
507-
if !dataset_path.exists() {
508-
return Err(ResolveDatasetError::NotFound(DatasetNotFoundError {
509-
dataset_ref: dataset_alias.as_local_ref(),
510-
}));
511-
}
527+
let (_, canonical_dataset_name) =
528+
DatasetRepositoryLocalFs::get_canonical_path_param(dataset_path)?;
529+
let canonical_dataset_alias = DatasetAlias {
530+
dataset_name: DatasetName::new_unchecked(canonical_dataset_name.as_str()),
531+
account_name: None,
532+
};
512533

513-
self.resolve_dataset_handle(&dataset_path, dataset_alias)
514-
.await
534+
Ok((dataset_summary, canonical_dataset_alias))
515535
}
516536

517537
async fn resolve_dataset_handle(
518538
&self,
519539
dataset_path: &PathBuf,
520540
dataset_alias: &DatasetAlias,
521541
) -> Result<DatasetHandle, ResolveDatasetError> {
522-
let summary = self
542+
let (summary, canonical_dataset_alias) = self
523543
.attempt_resolving_summary_via_path(dataset_path, dataset_alias)
524544
.await?;
525545

526-
Ok(DatasetHandle::new(summary.id, dataset_alias.clone()))
546+
Ok(DatasetHandle::new(
547+
summary.id,
548+
canonical_dataset_alias.clone(),
549+
))
527550
}
528551
}
529552

@@ -550,7 +573,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy {
550573
}
551574
let dataset_name = DatasetName::try_from(&dataset_dir_entry.file_name()).int_err()?;
552575
let dataset_alias = DatasetAlias::new(None, dataset_name);
553-
match self.attempt_resolve_dataset_alias(&dataset_alias).await {
576+
match self.resolve_dataset_handle(&dataset_dir_entry.path(), &dataset_alias).await {
554577
Ok(hdl) => { yield hdl; Ok(()) }
555578
Err(ResolveDatasetError::NotFound(_)) => Ok(()),
556579
Err(e) => Err(e.int_err())
@@ -623,12 +646,12 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy {
623646

624647
let dataset_path = self.dataset_path_impl(&alias);
625648

626-
let summary = self
649+
let (summary, canonical_dataset_alias) = self
627650
.attempt_resolving_summary_via_path(&dataset_path, &alias)
628651
.await?;
629652

630653
if summary.id == *dataset_id {
631-
return Ok(DatasetHandle::new(summary.id, alias));
654+
return Ok(DatasetHandle::new(summary.id, canonical_dataset_alias));
632655
}
633656
}
634657

@@ -656,6 +679,16 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy {
656679
std::fs::rename(old_dataset_path, new_dataset_path).int_err()?;
657680
Ok(())
658681
}
682+
683+
fn resolve_account_dir(
684+
&self,
685+
_account_name: &AccountName,
686+
) -> Result<(PathBuf, AccountName), ResolveDatasetError> {
687+
Ok((
688+
self.root.join(DEFAULT_ACCOUNT_NAME),
689+
AccountName::new_unchecked(DEFAULT_ACCOUNT_NAME),
690+
))
691+
}
659692
}
660693

661694
/////////////////////////////////////////////////////////////////////////////////////////
@@ -772,32 +805,6 @@ impl DatasetMultiTenantStorageStrategy {
772805
}
773806
})
774807
}
775-
776-
fn resolve_account_dir(
777-
&self,
778-
account_name: &AccountName,
779-
) -> Result<PathBuf, ResolveDatasetError> {
780-
let account_dataset_dir_path = self.root.join(account_name);
781-
782-
if !account_dataset_dir_path.is_dir() {
783-
let read_accout_dirs = std::fs::read_dir(self.root.as_path()).int_err()?;
784-
785-
for read_accout_dir in read_accout_dirs {
786-
let account_dir_name = AccountName::new_unchecked(
787-
read_accout_dir
788-
.int_err()?
789-
.file_name()
790-
.to_str()
791-
.unwrap_or(""),
792-
);
793-
if account_name == &account_dir_name {
794-
return Ok(self.root.join(account_dir_name));
795-
}
796-
}
797-
}
798-
799-
Ok(account_dataset_dir_path)
800-
}
801808
}
802809

803810
#[async_trait]
@@ -862,10 +869,14 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy {
862869
dataset_alias: &DatasetAlias,
863870
) -> Result<DatasetHandle, ResolveDatasetError> {
864871
let effective_account_name = self.effective_account_name(dataset_alias);
865-
let account_dataset_dir_path = self.resolve_account_dir(effective_account_name)?;
872+
let (account_dataset_dir_path, _) = self.resolve_account_dir(effective_account_name)?;
866873

867874
if account_dataset_dir_path.is_dir() {
868-
let read_dataset_dir = std::fs::read_dir(account_dataset_dir_path).int_err()?;
875+
let read_dataset_dir = std::fs::read_dir(account_dataset_dir_path).map_err(|_| {
876+
ResolveDatasetError::NotFound(DatasetNotFoundError {
877+
dataset_ref: dataset_alias.as_local_ref(),
878+
})
879+
})?;
869880

870881
for r_dataset_dir in read_dataset_dir {
871882
let dataset_dir_entry = r_dataset_dir.int_err()?;
@@ -962,6 +973,39 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy {
962973

963974
Ok(())
964975
}
976+
977+
fn resolve_account_dir(
978+
&self,
979+
account_name: &AccountName,
980+
) -> Result<(PathBuf, AccountName), ResolveDatasetError> {
981+
let account_dataset_dir_path = self.root.join(account_name);
982+
983+
if !account_dataset_dir_path.is_dir() {
984+
let read_accout_dirs = std::fs::read_dir(self.root.as_path()).int_err()?;
985+
986+
for read_accout_dir in read_accout_dirs {
987+
let account_dir_name = AccountName::new_unchecked(
988+
read_accout_dir
989+
.int_err()?
990+
.file_name()
991+
.to_str()
992+
.unwrap_or(""),
993+
);
994+
if account_name == &account_dir_name {
995+
return Ok((self.root.join(&account_dir_name), account_dir_name));
996+
}
997+
}
998+
return Ok((account_dataset_dir_path, account_name.clone()));
999+
}
1000+
1001+
let (canonical_account_dataset_dir_path, canonical_account_name) =
1002+
DatasetRepositoryLocalFs::get_canonical_path_param(&account_dataset_dir_path)?;
1003+
1004+
Ok((
1005+
canonical_account_dataset_dir_path,
1006+
AccountName::new_unchecked(canonical_account_name.as_str()),
1007+
))
1008+
}
9651009
}
9661010

9671011
/////////////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)