Skip to content

Commit

Permalink
Merge pull request #70 from OHDSI/develop
Browse files Browse the repository at this point in the history
Release 0.5.10
  • Loading branch information
azimov authored Aug 21, 2024
2 parents d939010 + 65ebc9b commit c734b7e
Show file tree
Hide file tree
Showing 44 changed files with 530 additions and 145 deletions.
4 changes: 3 additions & 1 deletion .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ compare_versions
LICENSE
.git
..Rcheck
errorReportSql.txt
errorReportSql.txt
^CRAN-SUBMISSION$
^cran-comments\.md$
3 changes: 3 additions & 0 deletions CRAN-SUBMISSION
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Version: 0.5.10
Date: 2024-08-21 04:10:48 UTC
SHA: bb001b6745dfbc303db44ef96b9eb8aaa2f67901
3 changes: 1 addition & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: ResultModelManager
Title: Result Model Manager
Version: 0.5.9
Version: 0.5.10
Authors@R:
person("Jamie", "Gilbert", , "gilbert@ohdsi.org", role = c("aut", "cre"))
Description: Database data model management utilities for R packages in the Observational Health Data Sciences and
Expand Down Expand Up @@ -35,7 +35,6 @@ Suggests:
testthat (>= 3.0.0),
RSQLite,
duckdb,
RPostgres,
knitr,
rmarkdown,
keyring,
Expand Down
20 changes: 20 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
# ResultModelManager 0.5.10

Changes:

1. Using readr column types to work around issues with inconsistent type conversion between DBI and JDBC drivers.

Bug fixes:

1. Resolved issue where failed queries were being aborted inside the wrong connection
in PooledConnectionHandler

2. Refactored pooled connection handler to better ensure checkout connections are returned


# ResultModelManager 0.5.9

Changes:

1. More tidy cleanup of PooledConnectionHandlers to prevent leaked connections.

# ResultModelManager 0.5.9

Changes:
Expand Down
40 changes: 22 additions & 18 deletions R/ConnectionHandler.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Limit row count is intended for web applications that may cause a denial of service
.limitRowCount <- function(sql, overrideRowLimit) {
limitRowCount <- as.integer(Sys.getenv("LIMIT_ROW_COUNT"))
if (!is.na(limitRowCount) &
limitRowCount > 0 &
!overrideRowLimit) {
sql <- SqlRender::render("SELECT TOP @limit_row_count * FROM (@query) result;",
query = gsub(";$", "", sql), # Remove last semi-colon
limit_row_count = limitRowCount
)
}
return(sql)
}

#' ConnectionHandler
#' @description
#' Class for handling DatabaseConnector:connection objects with consistent R6 interfaces for pooled and non-pooled connections.
Expand Down Expand Up @@ -111,10 +125,6 @@ ConnectionHandler <- R6::R6Class(
#' Connects automatically if it isn't yet loaded
#' @returns DatabaseConnector Connection instance
getConnection = function() {
if (is.null(self$con)) {
self$initConnection()
}

if (!self$dbIsValid()) {
self$initConnection()
}
Expand Down Expand Up @@ -168,15 +178,7 @@ ConnectionHandler <- R6::R6Class(
#' @param ... Additional query parameters
#' @returns boolean TRUE if connection is valid
queryDb = function(sql, snakeCaseToCamelCase = self$snakeCaseToCamelCase, overrideRowLimit = FALSE, ...) {
# Limit row count is intended for web applications that may cause a denial of service if they consume too many
# resources.
limitRowCount <- as.integer(Sys.getenv("LIMIT_ROW_COUNT"))
if (!is.na(limitRowCount) & limitRowCount > 0 & !overrideRowLimit) {
sql <- SqlRender::render("SELECT TOP @limit_row_count * FROM (@query) result;",
query = gsub(";$", "", sql), # Remove last semi-colon
limit_row_count = limitRowCount
)
}
sql <- .limitRowCount(sql, overrideRowLimit)
sql <- self$renderTranslateSql(sql, ...)

tryCatch(
Expand All @@ -203,7 +205,7 @@ ConnectionHandler <- R6::R6Class(

tryCatch(
{
data <- self$executeFunction(sql)
self$executeFunction(sql)
},
error = function(error) {
if (self$dbms() %in% c("postgresql", "redshift")) {
Expand All @@ -223,17 +225,19 @@ ConnectionHandler <- R6::R6Class(
#' Does not translate or render sql.
#' @param sql sql query string
#' @param snakeCaseToCamelCase (Optional) Boolean. return the results columns in camel case (default)
queryFunction = function(sql, snakeCaseToCamelCase = self$snakeCaseToCamelCase) {
DatabaseConnector::querySql(self$getConnection(), sql, snakeCaseToCamelCase = snakeCaseToCamelCase)
#' @param connection (Optional) connection object
queryFunction = function(sql, snakeCaseToCamelCase = self$snakeCaseToCamelCase, connection = self$getConnection()) {
DatabaseConnector::querySql(connection, sql, snakeCaseToCamelCase = snakeCaseToCamelCase)
},

#' execute Function
#' @description
#' exec query Function that can be overriden with subclasses (e.g. use different base function or intercept query)
#' Does not translate or render sql.
#' @param sql sql query string
executeFunction = function(sql) {
DatabaseConnector::executeSql(self$getConnection(), sql)
#' @param connection connection object
executeFunction = function(sql, connection = self$getConnection()) {
DatabaseConnector::executeSql(connection, sql)
}
)
)
35 changes: 28 additions & 7 deletions R/DataModel.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ checkAndFixDataTypes <-
table <- dplyr::mutate_at(table, i, as.numeric)
}
} else if (expectedType == "int") {
if (observedTypes[i] != "integer") {
if (!observedTypes[i] %in% c("integer", "numeric")) {
ParallelLogger::logDebug(
sprintf(
"Column %s in table %s in results folder %s is of type %s, but was expecting %s. Attempting to convert.",
Expand All @@ -121,7 +121,7 @@ checkAndFixDataTypes <-
expectedType
)
)
table <- dplyr::mutate_at(table, i, as.integer)
table <- dplyr::mutate_at(table, i, as.numeric)
}
} else if (expectedType == "varchar") {
if (observedTypes[i] != "character") {
Expand Down Expand Up @@ -423,12 +423,14 @@ uploadTable <- function(tableName,
purgeSiteDataBeforeUploading,
warnOnMissingTable) {
csvFileName <- paste0(tableName, ".csv")
specifications <- specifications %>%
dplyr::filter(.data$tableName == !!tableName)

if (csvFileName %in% list.files(resultsFolder)) {
rlang::inform(paste0("Uploading file: ", csvFileName, " to table: ", tableName))

primaryKey <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
tolower(.data$primaryKey) == "yes") %>%
dplyr::filter(tolower(.data$primaryKey) == "yes") %>%
dplyr::select("columnName") %>%
dplyr::pull()

Expand All @@ -443,8 +445,7 @@ uploadTable <- function(tableName,
env$purgeSiteDataBeforeUploading <- purgeSiteDataBeforeUploading
if (purgeSiteDataBeforeUploading && "database_id" %in% primaryKey) {
type <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
.data$columnName == "database_id") %>%
dplyr::filter(.data$columnName == "database_id") %>%
dplyr::select("dataType") %>%
dplyr::pull()
# Remove the existing data for the databaseId
Expand Down Expand Up @@ -477,12 +478,32 @@ uploadTable <- function(tableName,
env$primaryKeyValuesInDb <- primaryKeyValuesInDb
}

# Remove data size or types
types <- sub(" ", "", sub("\\(.*\\)", "", specifications$dataType))

# Convert the types to readr's col_types format
convertType <- Vectorize(
function(type) {
switch(type,
varchar = "c",
bigint = "n",
int = "n",
date = "D",
"?"
) # default to guess if type not matched
}
)

types <- convertType(types)
# Create a named vector of column types
names(types) <- specifications$columnName
colTypes <- do.call(readr::cols, as.list(types))

readr::read_csv_chunked(
file = file.path(resultsFolder, csvFileName),
callback = function(chunk, pos) uploadChunk(chunk, pos, env, specifications, resultsFolder, connection, runCheckAndFixCommands, forceOverWriteOfSpecifications),
chunk_size = 1e7,
col_types = readr::cols(),
col_types = colTypes,
guess_max = 1e6,
progress = FALSE
)
Expand Down
Loading

0 comments on commit c734b7e

Please sign in to comment.