Skip to content

Commit

Permalink
cache loader init
Browse files Browse the repository at this point in the history
Signed-off-by: Shenoy Pratik <sgguruda@amazon.com>
  • Loading branch information
ps48 committed Mar 6, 2024
1 parent a1423ff commit 6d24d9b
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 5 deletions.
11 changes: 11 additions & 0 deletions common/types/data_connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ export enum CachedDataSourceStatus {
Loading = 'Loading',
}

export enum CachedDataSourceLoadingProgress {
LoadingScheduled = 'Loading Scheduled',
LoadingDatabases = 'Loading Databases',
LoadingTables = 'Loading Tables',
LoadingAccelerations = 'Loading Accelerations',
LoadingError = 'Loading cache ran into error',
LoadingCompleted = 'Loading Completed',
LoadingStopped = 'Loading Stopped',
}

export interface CachedColumn {
name: string;
dataType: string;
Expand Down Expand Up @@ -81,6 +91,7 @@ export interface CachedDataSource {
name: string;
lastUpdated: string; // Assuming date string in UTC format
status: CachedDataSourceStatus;
loadingProgress: string;
databases: CachedDatabase[];
}

Expand Down
10 changes: 10 additions & 0 deletions common/utils/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,13 @@
export function get<T = unknown>(obj: Record<string, any>, path: string, defaultValue?: T): T {
return path.split('.').reduce((acc: any, part: string) => acc && acc[part], obj) || defaultValue;
}

export function addBackticksIfNeeded(input: string): string {
// Check if the string already has backticks
if (input.startsWith('`') && input.endsWith('`')) {
return input; // Return the string as it is
} else {
// Add backticks to the string
return '`' + input + '`';
}
}
4 changes: 0 additions & 4 deletions public/framework/catalog_cache/cache_loader.ts

This file was deleted.

179 changes: 179 additions & 0 deletions public/framework/catalog_cache/cache_loader.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { useEffect, useState } from 'react';
import { ASYNC_POLLING_INTERVAL } from '../../../common/constants/data_sources';
import { CachedDataSourceLoadingProgress } from '../../../common/types/data_connections';
import { DirectQueryLoadingStatus, DirectQueryRequest } from '../../../common/types/explorer';
import { getAsyncSessionId, setAsyncSessionId } from '../../../common/utils/query_session_utils';
import { addBackticksIfNeeded, get as getObjValue } from '../../../common/utils/shared';
import { formatError } from '../../components/event_analytics/utils';
import { usePolling } from '../../components/hooks';
import { SQLService } from '../../services/requests/sql';
import { coreRefs } from '../core_refs';

enum cacheLoadingType {
Databases = 'Load Databases',
Tables = 'Load Tables',
Accelerations = 'Load Accelerations',
}

const runCacheLoadQuery = (
loadingType: cacheLoadingType,
dataSourceName: string,
databaseName?: string
) => {
const [loadQueryStatus, setLoadQueryStatus] = useState<'error' | 'success' | 'loading'>(
'loading'
);
const sqlService = new SQLService(coreRefs.http!);

const {
data: pollingResult,
loading: _pollingLoading,
error: pollingError,
startPolling,
stopPolling,
} = usePolling<any, any>((params) => {
return sqlService.fetchWithJobId(params);
}, ASYNC_POLLING_INTERVAL);

let requestPayload = {} as DirectQueryRequest;
const sessionId = getAsyncSessionId();

switch (loadingType) {
case cacheLoadingType.Databases:
requestPayload = {
lang: 'sql',
query: `SHOW SCHEMAS IN ${addBackticksIfNeeded(dataSourceName)}`,
datasource: dataSourceName,
} as DirectQueryRequest;
break;
case cacheLoadingType.Tables:
requestPayload = {
lang: 'sql',
query: `SHOW TABLES IN ${addBackticksIfNeeded(dataSourceName)}.${addBackticksIfNeeded(
databaseName!
)}`,
datasource: dataSourceName,
} as DirectQueryRequest;
break;
case cacheLoadingType.Accelerations:
requestPayload = {
lang: 'sql',
query: `SHOW FLINT INDEXES`,
datasource: dataSourceName,
} as DirectQueryRequest;
break;

default:
setLoadQueryStatus('error');
const formattedError = formatError(
'',
'Recieved unknown cache query type: ' + `loadingType`,
''
);
coreRefs.core?.notifications.toasts.addError(formattedError, {
title: 'unknown cache query type',
});
console.error(formattedError);
break;
}

if (sessionId) {
requestPayload.sessionId = sessionId;
}

sqlService
.fetch(requestPayload)
.then((result) => {
setAsyncSessionId(getObjValue(result, 'sessionId', null));
if (result.queryId) {
startPolling({
queryId: result.queryId,
});
} else {
console.error('No query id found in response');
}
})
.catch((e) => {
// stopPollingWithStatus(DirectQueryLoadingStatus.FAILED);
const formattedError = formatError(
'',
'The query failed to execute and the operation could not be complete.',
e.body.message
);
coreRefs.core?.notifications.toasts.addError(formattedError, {
title: 'Query Failed',
});
console.error(e);
});

useEffect(() => {
// cancel direct query
if (!pollingResult) return;
const { status: anyCaseStatus, datarows, error } = pollingResult;
const status = anyCaseStatus?.toLowerCase();

if (status === DirectQueryLoadingStatus.SUCCESS || datarows) {
setLoadQueryStatus('success');
stopPolling();
// TODO: Stop polling, update cache
} else if (status === DirectQueryLoadingStatus.FAILED) {
setLoadQueryStatus('error');
stopPolling();
// TODO: Stop polling, update cache with error

// send in a toast with error message
const formattedError = formatError(
'',
'The query failed to execute and the operation could not be complete.',
error
);
coreRefs.core?.notifications.toasts.addError(formattedError, {
title: 'Query Failed',
});
} else {
setLoadQueryStatus('loading');
}
}, [pollingResult, pollingError]);

return { loadQueryStatus, stopPolling };
};

export const CacheLoader = ({
loadingType,
dataSourceName,
databaseName,
}: {
loadingType: cacheLoadingType;
dataSourceName: string;
databaseName?: string;
}) => {
const [loadingStatus, setloadingStatus] = useState<CachedDataSourceLoadingProgress>(
CachedDataSourceLoadingProgress.LoadingScheduled
);
const [stopCurrentPolling, setStopCurrentPolling] = useState(() => () => {});

const stopLoadingCache = () => {
setloadingStatus(CachedDataSourceLoadingProgress.LoadingStopped);
stopCurrentPolling();
};

switch (loadingType) {
case cacheLoadingType.Databases:
break;
case cacheLoadingType.Tables:
break;
case cacheLoadingType.Accelerations:
break;

default:
// TODO: raise error toast
break;
}

return { loadingStatus, stopLoadingCache };
};
41 changes: 40 additions & 1 deletion public/framework/catalog_cache/cache_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { ASYNC_QUERY_CATALOG_CACHE } from '../../../common/constants/shared';
import {
CachedDataSource,
CachedDataSourceLoadingProgress,
CachedDataSourceStatus,
CatalogCacheData,
} from '../../../common/types/data_connections';
Expand Down Expand Up @@ -90,9 +91,46 @@ export class CatalogCacheManager {
localStorage.setItem(this.localStorageKey, JSON.stringify(cacheData));
}

/**
* Loads/Refreshes a datasource in the catalog cache.
* @param dataSource The data source name to be loaded/refreshed
*/
static loadDataSource(dataSourceName: string): boolean {
try {
// let ds = this.getDataSource(dataSourceName);
// // TODO: Initiate loader for datasource
// ds.status = CachedDataSourceStatus.Loading;
// ds.loadingProgress = CachedDataSourceLoadingProgress.LoadingDatabases;
// this.addOrUpdateDataSource(ds);
return true;
} catch (err) {
console.error(err);
return false;
}
}

/**
* Loads/Refreshes a tables of a database in the catalog cache.
* @param dataSource The data source name to be loaded/refreshed
* @param databaseName The database name to be loaded/refreshed
*/
static loadTables(dataSourceName: string, databaseName: string): boolean {
try {
let ds = this.getDataSource(dataSourceName);
// TODO: Initiate loader for Table
ds.status = CachedDataSourceStatus.Loading;
ds.loadingProgress = CachedDataSourceLoadingProgress.LoadingDatabases;
this.addOrUpdateDataSource(ds);
return true;
} catch (err) {
console.error(err);
return false;
}
}

/**
* Adds or updates a data source in the catalog cache.
* @param dataSource The data source to add or update.
* @param dataSource The data source to be added or updated.
*/
static addOrUpdateDataSource(dataSource: CachedDataSource): void {
const cacheData = this.getCatalogCacheData();
Expand Down Expand Up @@ -124,6 +162,7 @@ export class CatalogCacheManager {
name: dataSourceName,
lastUpdated: '',
status: CachedDataSourceStatus.Empty,
loadingProgress: '',
databases: [],
};
this.addOrUpdateDataSource(defaultDataSourceObject);
Expand Down

0 comments on commit 6d24d9b

Please sign in to comment.