Skip to content

Commit

Permalink
SLURM Integration (#11)
Browse files Browse the repository at this point in the history
This PR builds on #5, with two main objectives:
- add SLURM, config, and supporting services to the stack
- integrate async job processing into the backend via SLURM

In addition to the main objectives, this PR includes a few tweaks to API
response handling and the testing framework. A new field, `reason`, has
been added to the `analyses` table to capture why a status was set.
Currently it gets set to an exception traceback if the analysis throws
an error.

The PR includes a skeleton of how analyses would be processed in
`backend/api/dispatch/submit.R` (called via `dispatchAnalysis()` in
`backend/api/endpoints/analyses.R`), but it doesn't actually do any real
work yet.

Things to try:
- bringing up the full stack and seeing if it builds and runs without
issue
- submitting analyses and seeing if the status eventually gets updated
- trying the tests: after launching the stack, execute `./run_stack.sh
shell` in a separate window, then run `./run_tests.sh`, which will
perform a few integration tests
  • Loading branch information
falquaddoomi authored Oct 3, 2024
1 parent 7af1f3c commit 185f1e8
Show file tree
Hide file tree
Showing 47 changed files with 2,127 additions and 141 deletions.
20 changes: 19 additions & 1 deletion .env.TEMPLATE
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,29 @@
# specified environment
DEFAULT_ENV=dev

# if 0, doesn't open a browser to the frontend web app on a normal stack launch
# registry in which to store the images (uses dockerhub if unspecified)
REGISTRY_PREFIX="us-central1-docker.pkg.dev/cuhealthai-foundations/jravilab-public"

# if 0, doesn't open a browser to the frontend webapp on a normal stack launch
DO_OPEN_BROWSER=1

# database (postgres)
POSTGRES_USER=molevolvr
POSTGRES_PASSWORD=
POSTGRES_DB=molevolvr
POSTGRES_HOST=db-${DEFAULT_ENV}

# slurm accounting database (mariadb)
MARIADB_ROOT_PASSWORD=
MARIADB_USER=slurmdbd
MARIADB_PASSWORD=
MARIADB_DATABASE=slurm_acct_db
MARIADB_HOST=accounting-${DEFAULT_ENV}
MARIADB_PORT=3306

# slurm-specific vars
CLUSTER_NAME=molevolvr-${DEFAULT_ENV}
SLURM_MASTER=master-${DEFAULT_ENV} # who's running slurmctld
SLURM_DBD_HOST=master-${DEFAULT_ENV} # who's running slurmdbd
SLURM_WORKER=worker-${DEFAULT_ENV} # who's running slurmd
SLURM_CPUS=10 # how many cpus to allocate on the worker node
134 changes: 112 additions & 22 deletions backend/README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,119 @@
# MolEvolvR Backend

The backend is implemented as a RESTful API over the following entities:

- `User`: Represents a user of the system. At the moment logins aren't
required, so all regular users are the special "Anonymous" user. Admins
have individual accounts.
- `Analysis`: Represents an analysis submitted by a user. Each analysis has a unique ID
and is associated with a user. analyses contain the following sub-entities:
- `Submission`: Represents the submission of a Analysis, e.g. the data
itself as well the submission's parameters (both selected by the
The backend is implemented as a RESTful API. It currently provides endpoints for
just the `analysis` entity, but will be expanded to include other entities as
well.

## Usage

Run the `launch_api.sh` script to start API server in a hot-reloading development mode.
The server will run on port 9050, unless the env var `API_PORT` is set to another
value. Once it's running, you can access it at http://localhost:9050.

If the env var `USE_SLURM` is equal to 1, the script will create a basic SLURM
configuration and then launch `munge`, a client used to authenticate to the
SLURM cluster. The template that configures the backend's connection to SLURM
can be found at `./cluster_config/slurm.conf.template`.

The script then applies any outstanding database migrations via
[atlas](https://github.com/ariga/atlas). Finally the API server is started by
executing the `entrypoint.R` script via
[drip](https://github.com/siegerts/drip), which restarts the server whenever
there are changes to the code.

*(Side note: the entrypoint contains a bit of custom logic to
defer actually launching the server until the port it listens on is free, since
drip doesn't cleanly shut down the old instance of the server.)*

## Implementation

The backend is implemented in [Plumber](https://www.rplumber.io/index.html), a
package for R that allows for the creation of RESTful APIs. The API is defined
in the `api/plumber.R` file, which defines the router and some shared metadata
routes. The rest of the routes are brought in from the `endpoints/` directory.

Currently implemented endpoints:
- `POST /analyses`: Create a new analysis
- `GET /analyses`: Get all analyses
- `GET /analyses/:id`: Get a specific analysis by its ID
- `GET /analyses/:id/status`: Get just the status field for an analysis by its ID

*(TBC: more comprehensive docs; see the [Swagger docs](http://localhost:9050/__docs__/) for now)*

## Database Schema

The backend uses a PostgreSQL database to store analyses. The database's schema
is managed by [atlas](https://github.com/ariga/atlas); you can find the current
schema definition at `./schema/schema.pg.hcl`. After changing the schema, you
can create a "migration", i.e. a set of SQL statements that will bring the
database up to date with the new schema, by running `./schema/makemigration.sh
<reason>`; if all is well with the schema, the new migration will be put in
`./schema/migrations/`.

Any pending migrations are applied automatically when the backend starts up, but
you can manually apply new migrations by running `./schema/apply.sh`.

## Testing

You can run the tests for the backend by running the `run_tests.sh` script. The
script will recursively search for all files with the pattern `test_*.R` in the
`tests/` directory and run them. Tests are written using the
[testthat](https://testthat.r-lib.org/) package.

Note that the tests currently depend on the stack's services being available, so
you should run the tests from within the backend container after having started
the stack normally. An easy way to do that is to execute `./run_stack.sh shell`
in the repo root, which will give you an interactive shell in the backend
container. Eventually, we'll have them run in their own environment, which the
`run_tests.sh` script will likely orchestrate.

## Implementation Details

### Domain Entities

*NOTE: the backend is as of now a work in progress, so expect this to change.*

The backend includes, or will include, the following entities:

- `User`: Represents a user of the system. At the moment logins aren't required,
so all regular users are the special "Anonymous" user. Admins have individual
accounts.
- `Analysis`: Represents an analysis submitted by a user. Each analysis has a
unique ID and is associated with a user. analyses contain the following
sub-entities:
- `AnalysisSubmission`: Represents the submission of a Analysis, e.g. the
data itself as well the submission's parameters (both selected by the
user and supplied by the system).
- `AnalysisStatus`: Represents the status of a Analysis. Each Analysis has a status
associated with it, which is updated as the Analysis proceeds through its
processing stages.
- `AnalysisStatus`: Represents the status of a Analysis. Each Analysis has a
status associated with it, which is updated as the Analysis proceeds through
its processing stages.
- `AnalysisResult`: Represents the result of a Analysis.
- `Cluster`: Represents the status of the overall cluster, including
how many analyses have been completed, how many are in the queue,
and other statistics related to the processing of analyses.
- `Queue`: Represents the status of processing analyses, including how many
analyses have been completed, how many are in the queue, and other statistics.
- `System`: Represents the system as a whole, including the version of the
backend, the version of the frontend, and other metadata about the system.
Includes runtime statistics about the execution environment as well, such as RAM
and CPU usage. Includes cluster information, too, such as node uptime and
health.

## Implementation
### Job Processing

*NOTE: we use the term "job" here to indicate any asynchronous task that the
backend needs to perform outside of the request-response cycle. It's not related
to the app domain's terminology of a "job" (i.e. an analysis).*

The backend is implemented in Plumber, a package for R that allows for the
creation of RESTful APIs. The API is defined in the `api/router.R` file, which
contains the endpoints for the API. Supporting files are found in
`api/resources/`.
The backend makes use of
[future.batchtools](https://future.batchtools.futureverse.org/), an extension
that adds [futures](https://future.futureverse.org/) support to
[batchtools](https://mllg.github.io/batchtools/index.html), a package for
processing asynchronous jobs. The package provides support for many
job-processing systems, including
[SLURM](https://slurm.schedmd.com/documentation.html); more details on
alternative systems can be found in the [`batchtools` package
documentation](https://mllg.github.io/batchtools/articles/batchtools.html).

The API is then run using the `launch_api.R` file, which starts the Plumber
server.
In our case, we use SLURM; `batchtools` basically wraps SLURM's `sbatch` command
and handles producing a job script for an R callable, submitting the script to
the cluster for execution, and collecting the results to be returned to R. The
template for the job submission script can be found at
`./cluster_config/slurm.tmpl`.
33 changes: 33 additions & 0 deletions backend/api/cluster.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# contains shared state for interacting with the job dispatch system

box::use(
batchtools[makeRegistry],
future.batchtools[...],
future[plan, future, value]
)

.on_load <- function(ns) {
options(future.cache.path = "/opt/shared-jobs/.future", future.delete = TRUE)

# create a registry
dir.create("/opt/shared-jobs/jobs-scratch", recursive = TRUE, showWarnings = FALSE)
# reg <- makeRegistry(file.dir = NA, work.dir = "/opt/shared-jobs/jobs-scratch")
# call plan()
plan(
batchtools_slurm,
template = "/app/cluster_config/slurm.tmpl",
resources = list(nodes = 1, cpus = 1, walltime=2700, ncpus=1, memory=1000)
)
}

#' Takes in a block of code and runs it asynchronously, returning the future
#' @param callable a function that will be run asynchronously in a slurm job
#' @param work.dir the directory to run the code in, which should be visible to worker nodes
#' @return a future object representing the asynchronous job
dispatch <- function(callable, work.dir="/opt/shared-jobs/jobs-scratch") {
# ensure we run jobs in a place where slurm nodes can access them, too
setwd(work.dir)
future(callable())
}

box::export(dispatch)
56 changes: 56 additions & 0 deletions backend/api/dispatch/submit.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
box::use(
analyses = api/models/analyses,
api/cluster[dispatch]
)

#' Dispatch an analysis, i.e. create a record for it in the database and submit
#' it to the cluster for processing
#' @param name the name of the analysis
#' @param type the type of the analysis
#' @return the id of the new analysis
dispatchAnalysis <- function(name, type) {
# create the analysis record
analysis_id <- analyses$db_submit_analysis(name, type)

# print to the error log that we're dispatching this
cat("Dispatching analysis", analysis_id, "\n")

# dispatch the analysis async (to slurm, or wherever)
promise <- dispatch(function() {

tryCatch({
# do the analysis
analyses$db_update_analysis_status(analysis_id, "analyzing")

# FIXME: implement calls to the molevolvr package to perform the
# analysis. we may fire off additional 'dispatch()' calls if we
# need to parallelize things.

# --- begin testing section which should be removed ---

# for now, just do a "task"
Sys.sleep(1) # pretend we're doing something

# if type is "break", raise an error to test the handler
if (type == "break") {
stop("test error")
}

# --- end testing section ---

# finalize when we're done
analyses$db_update_analysis_status(analysis_id, "complete")
}, error = function(e) {
# on error, log the error and update the status
analyses$db_update_analysis_status(analysis_id, "error", reason=e$message)
cat("Error in analysis ", analysis_id, ": ", e$message, "\n")
flush()
})

cat("Analysis", analysis_id, " completed\n")
})

return(analysis_id)
}

box::export(dispatchAnalysis)
54 changes: 39 additions & 15 deletions backend/api/endpoints/analyses.R
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# endpoints for submitting and checking information about analyses.
# included by the router aggregator in ./plumber.R; all these endpoints are
# prefixed with /analysis/ by the aggregator.
# prefixed with /analyses/ by the aggregator.

box::use(
analyses = api/models/analyses,
api/dispatch/submit[dispatchAnalysis],
api/helpers/responses[api_404_if_empty],
tibble[tibble],
dplyr[select, any_of, mutate],
dplyr[select, any_of, mutate, pull],
dbplyr[`%>%`]
)

Expand All @@ -18,6 +20,10 @@ box::use(
analysis_list <- function() {
result <- analyses$db_get_analyses()

# NOTE: this is 'postprocessing' is required when jsonlite's force param is
# FALSE, because it can't figure out how to serialize the types otherwise.
# while we just set force=TRUE now, i don't know all the implications of that
# choice, so i'll leave this code here in case we need it.
# postprocess types in the result
# result <- result %>%
# mutate(
Expand All @@ -32,30 +38,48 @@ analysis_list <- function() {
#* @tag Analyses
#* @serializer jsonExt
#* @get /<id:str>/status
analysis_status <- function(id) {
result <- analyses$db_get_analysis_by_id(id)
result$status
#* @response 404 error_message="Analysis with id '...' not found"
analysis_status <- function(id, res) {
api_404_if_empty(
analyses$db_get_analysis_by_id(id) %>% pull(status), res,
error_message=paste0("Analysis with id '", id, "' not found")
)
}


#* Query the database for an analysis's complete information.
#* @tag Analyses
#* @serializer jsonExt
#* @serializer jsonExt list(auto_unbox=TRUE)
#* @get /<id:str>
analysis_by_id <- function(id){
result <- analyses$db_get_analysis_by_id(id)
# result is a tibble with one row, so just
# return that row rather than the entire tibble
result
#* @response 200 schema=analysis
#* @response 404 error_message="Analysis with id '...' not found"
analysis_by_id <- function(id, res) {
# below we return the analysis object; we have to unbox it again
# because auto_unbox only unboxes length-1 lists and vectors, not
# dataframes
api_404_if_empty(
jsonlite::unbox(analyses$db_get_analysis_by_id(id)),
res, error_message=paste0("Analysis with id '", id, "' not found")
)
}

#* Submit a new MolEvolvR analysis, returning the analysis ID
#* @tag Analyses
#* @serializer jsonExt
#* @post /
#* @param name:str A friendly name for the analysis chosen by the user
#* @param type:str Type of the analysis (e.g., "FASTA")
analysis_submit <- function(name, type) {
# submit the analysis
result <- analyses$db_submit_analysis(name, type)
# the result is a scalar in a vector, so just return the scalar
# result[[1]]
# submits the analysis, which handles:
# - inserting the analysis into the database
# - dispatching the analysis to the cluster
# - returning the analysis ID
analysis_id <- dispatchAnalysis(name, type)

# NOTE: unboxing (again?) gets it return a single string rather than a list
# with a string in it. while it works, it's a hack, and i should figure out
# how to make the serializer do this for me.
return(
jsonlite::unbox(analyses$db_get_analysis_by_id(analysis_id))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

box::use(
plumber[register_serializer, serializer_content_type],
api/support/string_helpers[inline_str_list]
api/helpers/string_helpers[inline_str_list]
)

#' Register custom serializers, e.g. for JSON with specific defaults
Expand Down
13 changes: 13 additions & 0 deletions backend/api/helpers/responses.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#' Helpers for returning error responses from the API

api_404_if_empty <- function(result, res, error_message="Not found") {
if (isTRUE(nrow(result) == 0 || is.null(result) || length(result) == 0)) {
cat("Returning 404\n")
res$status <- 404
return(error_message)
}

return(result)
}

box::export(api_404_if_empty)
File renamed without changes.
Loading

0 comments on commit 185f1e8

Please sign in to comment.