Skip to content

Commit

Permalink
feat: add enqueue query method
Browse files Browse the repository at this point in the history
  • Loading branch information
invm committed Oct 9, 2023
1 parent 4e67b1f commit c67ce4d
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 65 deletions.
1 change: 1 addition & 0 deletions src-tauri/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ fn main() {
connections::get_connections,
connections::init_connection,
// connections::disconnect, // TODO
queries::execute_query,
queries::enqueue_query,
queries::get_columns,
queries::get_constraints,
Expand Down
55 changes: 40 additions & 15 deletions src-tauri/src/handlers/queries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{
queues::query::{QueryTask, QueryTaskEnqueueResult, QueryTaskStatus},
state::{AsyncState, ServiceAccess},
utils::error::{CommandResult, Error},
utils::{
crypto::md5_hash,
error::{CommandResult, Error},
},
};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
Expand All @@ -14,11 +17,11 @@ use tracing::info;
pub async fn enqueue_query(
async_state: State<'_, AsyncState>,
conn_id: String,
tab_id: String,
tab_idx: usize,
sql: &str,
_auto_limit: bool,
auto_limit: bool,
) -> CommandResult<QueryTaskEnqueueResult> {
info!(sql, conn_id, tab_id, "enqueue_query");
info!(sql, conn_id, tab_idx, "enqueue_query");
let binding = async_state.connections.lock().await;
let conn = binding.get(&conn_id);
let dialect = conn.unwrap().config.dialect.as_str();
Expand All @@ -30,26 +33,36 @@ pub async fn enqueue_query(
match statements {
Ok(statements) => {
if let Some(conn) = conn {
// let mut stmts = vec![];
// // reduce to unique statements
// for stmt in statements.iter() {
// if !stmts.contains(&stmt.to_string()) {
// stmts.push(stmt.to_string());
// }
// }
let statements: Vec<(String, String)> = statements
.into_iter()
.map(|s| {
let query_hash = md5_hash(&s);
let id = conn.config.id.to_string() + &tab_idx.to_string() + &query_hash;
return (s, id);
})
.collect();
let async_proc_input_tx = async_state.tasks.lock().await;
for (idx, statement) in statements.iter().enumerate() {
info!("Got statement {:?}", statement.to_string());
let task = QueryTask::new(conn.clone(), &tab_id, &statement, idx);
let enqueued_ids: Vec<String> = vec![];
for (idx, stmt) in statements.iter().enumerate() {
let (mut statement, id) = stmt.clone();
info!("Got statement {:?}", statement);
if enqueued_ids.contains(&id) {
continue;
}
if auto_limit {
statement = format!("{} LIMIT 1000", statement);
}
let task = QueryTask::new(conn.clone(), statement, id, tab_idx, idx);
let res = async_proc_input_tx.send(task.clone()).await;
if let Err(e) = res {
return Err(Error::from(e));
}
}
return Ok(QueryTaskEnqueueResult {
conn_id,
tab_id,
tab_idx,
status: QueryTaskStatus::Queued,
results_sets: statements.iter().map(|s| s.1.clone()).collect(),
});
}
if statements.is_empty() {
Expand All @@ -69,6 +82,18 @@ pub struct QueryResultParams {
pub page_size: usize,
}

#[command]
pub async fn execute_query(
app_handle: AppHandle,
conn_id: String,
query: String,
_auto_limit: bool,
) -> CommandResult<Value> {
let connection = app_handle.acquire_connection(conn_id);
let result = connection.execute_query(query).await?;
Ok(result)
}

#[command]
pub async fn query_results(
_app_handle: AppHandle,
Expand Down
31 changes: 18 additions & 13 deletions src-tauri/src/queues/query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::database::connections::ConnectedConnection;
use crate::utils::crypto::md5_hash;
use anyhow::Result;
use serde::Deserialize;
use serde::Serialize;
Expand All @@ -25,20 +24,24 @@ impl Default for QueryTaskStatus {
pub struct QueryTask {
pub conn: ConnectedConnection,
pub query: String,
pub tab_id: String,
pub tab_idx: usize,
pub id: String,
pub status: QueryTaskStatus,
pub query_idx: usize,
}

impl QueryTask {
pub fn new(conn: ConnectedConnection, tab_id: &str, query: &str, query_idx: usize) -> Self {
let query_hash = md5_hash(query);
let id = conn.config.id.to_string() + tab_id + &query_hash;
pub fn new(
conn: ConnectedConnection,
query: String,
query_id: String,
tab_idx: usize,
query_idx: usize,
) -> Self {
QueryTask {
conn,
id,
tab_id: tab_id.to_string(),
id: query_id,
tab_idx,
query_idx,
query: query.to_string(),
status: QueryTaskStatus::Queued,
Expand All @@ -49,15 +52,16 @@ impl QueryTask {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryTaskEnqueueResult {
pub conn_id: String,
pub tab_id: String,
pub tab_idx: usize,
pub status: QueryTaskStatus,
pub results_sets: Vec<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryTaskResult {
pub success: bool,
pub id: String,
pub path: String,
pub path: Option<String>,
pub error: Option<String>,
}

Expand All @@ -69,11 +73,12 @@ pub async fn async_process_model(
let task = input;
match task.conn.execute_query(task.query).await {
Ok(_res) => {
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
output_tx
.send(QueryTaskResult {
success: false,
id: "asd".to_string(),
path: "asd".to_string(),
id: task.id,
path: Some("asd".to_string()),
error: None,
})
.await?;
Expand All @@ -82,8 +87,8 @@ pub async fn async_process_model(
output_tx
.send(QueryTaskResult {
success: false,
id: "asd".to_string(),
path: "asd".to_string(),
id: task.id,
path: None,
error: Some(e.to_string()),
})
.await?;
Expand Down
70 changes: 33 additions & 37 deletions src/components/Screens/Console/Content/QueryTab/QueryTextArea.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
VimIcon,
} from "components/UI/Icons";
import { useAppSelector } from "services/Context";
import { QueryResult } from "interfaces";
import { QueryTaskEnqueueResult } from "interfaces";
import { t } from "utils/i18n";
import { Alert } from "components/UI";
import { basicSetup } from "codemirror";
Expand All @@ -41,7 +41,7 @@ import { createStore } from "solid-js/store";
import { ActionRowButton } from "./ActionRowButton";

import { listen } from "@tauri-apps/api/event";
import { randomId } from "utils/utils";
import { log } from "utils/utils";

export const QueryTextArea = (props: {
idx: Accessor<number>;
Expand All @@ -55,6 +55,7 @@ export const QueryTextArea = (props: {
getContent,
getContentData,
getSchemaTables,
contentStore: { idx: tabIdx },
},
app: { vimModeOn, toggleVimModeOn },
} = useAppSelector();
Expand Down Expand Up @@ -101,16 +102,11 @@ export const QueryTextArea = (props: {

onMount(async () => {
await listen("rs2js", (event) => {
console.log("js: rs2js: ");
console.log({ event });
log("js: rs2js: ");
log(event);
});
});

function sendOutput(value: string) {
console.log("js: js2rs: " + value);
invoke("js2rs", { message: "asdsada" });
}

const getSelection = () => {
return editorView().state.sliceDoc(
editorView().state.selection.ranges[0].from,
Expand All @@ -119,25 +115,25 @@ export const QueryTextArea = (props: {
};

const onExecute = async () => {
sendOutput(code());
// if (loading()) return;
// setLoading(true);
// const selectedText = getSelection();
// updateContentTab("error", undefined);
// const activeConnection = getConnection();
// try {
// const { result_sets } = await invoke<QueryResult>("execute_query", {
// connId: activeConnection.id,
// query: selectedText || code(),
// autoLimit: autoLimit(),
// });
// updateContentTab("data", { query: code(), executed: true, result_sets });
// // console.log({ result_sets });
// } catch (error) {
// updateContentTab("error", String(error));
// } finally {
// setLoading(false);
// }
if (loading() || !code()) return;
setLoading(true);
const selectedText = getSelection();
updateContentTab("error", undefined);
const activeConnection = getConnection();
try {
const result = await invoke<QueryTaskEnqueueResult>("enqueue_query", {
connId: activeConnection.id,
sql: selectedText || code(),
autoLimit: autoLimit(),
tabIdx,
});
// updateContentTab("data", { query: code(), executed: true, result_sets });
log({ result });
} catch (error) {
updateContentTab("error", String(error));
} finally {
setLoading(false);
}
};

const copyQueryToClipboard = () => {
Expand All @@ -146,15 +142,15 @@ export const QueryTextArea = (props: {

createEffect(() => {
setCode(getContentData("Query").query ?? "");
// setSchema(
// getSchemaTables().reduce(
// (acc, table) => ({
// ...acc,
// [table.name]: table.columns.map(({ name }) => name),
// }),
// {}
// )
// );
setSchema(
getSchemaTables().reduce(
(acc, table) => ({
...acc,
[table.name]: table.columns.map(({ name }) => name),
}),
{}
)
);
});

createShortcut(["Control", "e"], onExecute);
Expand Down
16 changes: 16 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ export type QueryResult = {
result_sets: ResultSet[];
};

const QueryTaskStatus = {
Queued: "Queued",
Progress: "Progress",
Completed: "Completed",
Error: "Error",
} as const;

export type QueryTaskStatusType = keyof typeof QueryTaskStatus;

export type QueryTaskEnqueueResult = {
conn_id: string;
tab_id: string;
status: QueryTaskStatusType;
results_sets: string[];
};

export type RawQueryResult = {
result: Row[];
};
Expand Down
4 changes: 4 additions & 0 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import {
TableStrucureEntityType,
} from "interfaces";

export const log = (msg: any) => {
console.log(`[${new Date().toISOString()}]`, msg);
};

export const omit = (obj: any, ...keys: string[]) => {
const copy = { ...obj };
keys.forEach((key) => delete copy[key]);
Expand Down

0 comments on commit c67ce4d

Please sign in to comment.