Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Convert RunQuery to use duckdb #1801

Closed
jonthegeek opened this issue Aug 27, 2024 · 5 comments · Fixed by #1943
Closed

Feature: Convert RunQuery to use duckdb #1801

jonthegeek opened this issue Aug 27, 2024 · 5 comments · Fixed by #1943
Assignees
Labels
data pipeline Data Pipeline functionality enhancement New feature or request
Milestone

Comments

@jonthegeek
Copy link
Contributor

Feature Details

Add an engine argument to RunQuery(). Default to an option, then "sqldf" if the option isn't set. Other option is "duckdb" right now. Check that the engine-package is installed for that option (and move sqldf to Suggests, along with duckdb).

Example Code

Possible Implementation

Additional Comments

Be sure to test that both outputs are the same.

@lauramaxwell lauramaxwell self-assigned this Aug 27, 2024
@lauramaxwell
Copy link
Contributor

@jonthegeek- I pushed up a start to this on fix-1801. It is able to switch between engines if you run it on its own, like below, but it hasn't figured out how to work appropriately in a yaml workflow.

df2 <- data.frame(
    Name = c("John", "Jane", "Bob"),
    Age = c(25, 30, 35),
    Salary = c(50000, 60000, 70000)
  )
query <- "SELECT * FROM df WHERE AGE >= 30"

result <- RunQuery(query, df2, engine = "duckdb")

feel free to take a look!

@lauramaxwell lauramaxwell added enhancement New feature or request data pipeline Data Pipeline functionality labels Sep 4, 2024
@jwildfire
Copy link
Contributor

Let's go ahead and implement. I think it might be of use in gsm.template.

@jwildfire jwildfire added this to the v2.2.0 milestone Oct 9, 2024
@lauramaxwell
Copy link
Contributor

lauramaxwell commented Nov 7, 2024

collaborate with @dpastoor and team to get this implemented. moving towards having only duckdb and removing sqldf from RunQuery

@lauramaxwell lauramaxwell removed their assignment Nov 11, 2024
@lauramaxwell lauramaxwell changed the title Feature: engine arg in RunQuery Feature: Convert RunQuery to use duckdb Nov 11, 2024
@annezheng2
Copy link
Contributor

In the poc I tried, I was passing the lazy table through, and if the step was not RunQuery, it would collect it into memory. Some of the discussions with devin revolved around what is the best to pass along as the argument for duckdb

RunWorkflow:

RunWorkflow <- function(
    lWorkflow,
    lData = NULL,
    bReturnResult = TRUE,
    bKeepInputData = TRUE,
    svc,
    s3_bucket = "gsm") {

  uid <- paste0(lWorkflow$meta$Type, "_", lWorkflow$meta$ID)
  log4r::info(.le$logger, msg = "Initializing workflow", workflow_id = uid)

  # Check that the workflow has steps
  if (length(lWorkflow$steps) == 0) {
    log4r::error(.le$logger, msg = "Workflow has no steps", workflow_id = uid)
    stop(glue::glue("Workflow `{uid}` has no `steps` property."))
  }

  lWorkflow$lData <- lData

  # Load missing data from s3
  tryCatch({
    if ("spec" %in% names(lWorkflow)) {
      lWorkflow$lData <- LoadData(lWorkflow, s3_bucket)
    }
  }, error = function(e) {
    log4r::error(.le$logger, msg = "Error in workflow setup", workflow_id = uid, error_message = e$message)
    stop(glue::glue("Data loading failed for workflow `{uid}`: {e$message}"))
  })

  tryCatch({
    lWorkflow <- ProcessSteps(lWorkflow, svc, s3_bucket)
  }, error = function(e) {
    log4r::error(.le$logger, msg = "Error loading data from S3", workflow_id = uid, s3_bucket = s3_bucket, error_message = e$message)
    stop(glue::glue("Step processing failed in workflow `{uid}`: {e$message}"))
  })
[...]

LoadData:

LoadData <- function(lWorkflow, s3_bucket) {
  log4r::info(.le$logger, "Checking data against spec")
  required_data <- names(lWorkflow$spec)

  tryCatch({
    con <- DBI::dbConnect(duckdb::duckdb(), tempfile(fileext = ".duckdb"))
    for (table in required_data) {
      split_name <- strsplit(table, "_", fixed = TRUE)[[1]]
      directory <- split_name[1]
      table_name <- split_name[2]

      s3_fs <- CreateS3FileSystem()

      parquet <- ReadS3ParquetFile(
        s3_fs = s3_fs,
        bucket = if (directory == "raw") "raw-data" else s3_bucket,
        directory = file.path("study-abc-123", directory),
        table_name = table_name
      )

      DBI::dbWriteTable(con, table, parquet, overwrite = TRUE)
      lWorkflow$lData[[table]] <- dplyr::tbl(con, table)
    }
    rm(parquet)
    gc()
  }, error = function(e) {
    log4r::error(.le$logger, msg = "Error reading Parquet file", table_name = table, s3_bucket = s3_bucket, error_message = e$message)
    stop(glue::glue("Error loading table: {e$message}"))
  })
  return(lWorkflow$lData)
}

ProcessSteps:

ProcessSteps <- function(lWorkflow, svc, s3_bucket) {
  stepCount <- 1
  for (step in lWorkflow$steps) {
    log4r::info(.le$logger, glue::glue("Workflow Step {stepCount} of {length(lWorkflow$steps)}: `{step$name}`"))
    # if we have a query, we'll be using duckdb to run the query against the relevant
    # tables, however if its not a runquery, we need the data directly do we can
    # invoke the function specified in the step
    tryCatch({
      if (step$name != "RunQuery") {
        lWorkflow <- LoadDataToMemory(lWorkflow)
      }
    }, error = function(e) {
      log4r::error(.le$logger, msg = "Error loading data into memory", step_name = step$name, error_message = e$message)
      stop(glue::glue("Failed to load data into memory for step `{step$name}`: {e$message}"))
    })

    result <- tryCatch({
      RunStep(
        lStep = step,
        lData = lWorkflow$lData,
        lMeta = lWorkflow$meta,
        lSpec = lWorkflow$spec
      )
    }, error = function(e) {
      log4r::error(.le$logger, msg = "Error executing step", step_name = step$name, error_message = e$message)
      stop(glue::glue("Execution failed for step `{step$name}`: {e$message}"))
    })
 [...]

RunQuery:

RunQuery <- function(strQuery, df, df2 = NULL) {
  stopifnot(is.character(strQuery))

  # Check that strQuery contains "FROM df"
  if (!stringr::str_detect(strQuery, "FROM df")) {
    log4r::error(.le$logger, "strQuery must contain 'FROM df'")
    stop("strQuery must contain 'FROM df'")
  }

  # Set up the connection and table names if passing in duckdb lazy table
  if (inherits(df, "tbl_dbi")) {
    con <- dbplyr::remote_con(df)
    table_name <- dbplyr::remote_name(df)
  } else {
    # For regular data frames, create a new connection and temporary table
    con <- DBI::dbConnect(duckdb::duckdb())
    temp_table_name <- paste0("temp_table_", format(Sys.time(), "%Y%m%d_%H%M%S"))
    DBI::dbWriteTable(con, temp_table_name, df)
    table_name <- temp_table_name
  }

  # Handle df2 if provided
  if (!is.null(df2)) {
    if (inherits(df2, "tbl_dbi")) {
      table_name2 <- dbplyr::remote_name(df2)
    } else {
      temp_table_name2 <- paste0("temp_table2_", format(Sys.time(), "%Y%m%d_%H%M%S"))
      DBI::dbWriteTable(con, temp_table_name2, df2)
      table_name2 <- temp_table_name2
    }
    strQuery <- stringr::str_replace(strQuery, "FROM df2", paste0("FROM ", table_name2))
  }

  strQuery <- stringr::str_replace(strQuery, "FROM df", paste0("FROM ", table_name))

  result <- tryCatch({
    result <- DBI::dbGetQuery(con, strQuery)
    log4r::info(.le$logger, glue::glue("SQL Query executed successfully: {nrow(result)} rows returned."))
    result
  }, error = function(e) {
    log4r::error(.le$logger, glue::glue("Error executing query: {e$message}"))
    stop(glue::glue("Error executing query: {e$message}"))
  })

  return(result)
}

@annezheng2
Copy link
Contributor

df2 in RunQuery was due to converting DATACHG, DATAENT, and QUERY workflows to use sql instead:

from:

steps:
  # Merge [ subjid ] onto EDC domains.
  - output: Temp_SubjectLookup
    name: select
    params:
      .data: Mapped_SUBJ
      subjid: subjid
      subject_nsv: subject_nsv
  - output: Mapped_DATACHG
    name: left_join
    params:
      x: Raw_DATACHG
      "y": Temp_SubjectLookup
      by: subject_nsv

to:

steps:
  # Merge [ subjid ] onto EDC domains
  - output: Mapped_DATACHG
    name: RunQuery
    params:
      df: Raw_DATACHG
      df2: Mapped_SUBJ
      strQuery: "SELECT x.*, y.subjid
FROM df x
LEFT JOIN (
  SELECT subjid, subject_nsv
  FROM df2
) y
ON x.subject_nsv = y.subject_nsv;"

@annezheng2 annezheng2 linked a pull request Nov 12, 2024 that will close this issue
@annezheng2 annezheng2 mentioned this issue Nov 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data pipeline Data Pipeline functionality enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants