From 35bfe4948ef3a44253048127b0406b44d6fc9c95 Mon Sep 17 00:00:00 2001 From: michael_b Date: Sun, 17 Dec 2023 20:18:46 +0100 Subject: [PATCH 1/8] first draft for ml process --- R/api.R | 1 + R/processes.R | 106 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/R/api.R b/R/api.R index c46acfb..ccbdbb9 100644 --- a/R/api.R +++ b/R/api.R @@ -314,5 +314,6 @@ addEndpoint = function() { Session$assignProcess(subtract) Session$assignProcess(multiply) Session$assignProcess(divide) + Session$assignProcess(naive_ml) } diff --git a/R/processes.R b/R/processes.R index a8028c1..1f3122c 100644 --- a/R/processes.R +++ b/R/processes.R @@ -1140,3 +1140,109 @@ save_result <- Process$new( return(data) } ) + +naive_ml = Process$new( + id = "naive_ml", + description = "Simple implementation of Random Forest Algorithm.", + categories = as.array("cubes"), + summary = "perform RF on a Datacube.", + parameters = list( + Parameter$new( + name = "data", + description = "A data cube with bands.", + schema = list( + type = "object", + subtype = "raster-cube" + )), + Parameter$new( + name = "spatial_extent", + description = "Limits the data to load from the collection to the specified bounding box", + schema = list( + list( + title = "Bounding box", + type = "object", + subtype = "bounding-box", + properties = list( + east = list( + description = "East (upper right corner, coordinate axis 1).", + type = "number" + ), + west = list( + description = "West lower left corner, coordinate axis 1).", + type = "number" + ), + north = list( + description = "North (upper right corner, coordinate axis 2).", + type = "number" + ), + south = list( + description = "South (lower left corner, coordinate axis 2).", + type = "number" + ) + ), + required = c("east", "west", "south", "north") + ), + list( + title = "GeoJson", + type = "object", + subtype = "geojson" + ), + list( + title = "No filter", + description = "Don't filter spatially. All data is included in the data cube.", + type = "null" + ) + ) + ) + ), + returns = eo_datacube, + operation = function(data, spatial_extend, job) + { + message("naive ML") + # # get spatial extend to build polygon + # xmin <- as.numeric(spatial_extent$west) + # ymin <- as.numeric(spatial_extent$south) + # xmax <- as.numeric(spatial_extent$east) + # ymax <- as.numeric(spatial_extent$north) + # + # # get CRS from Datacube (via Int parsing) + # cube_crs = as.numeric(gsub("\\D", "", gdalcubes::srs(cube))) + # + # + # # create polygon for later extraction of the cube + # polygon_coord_df = data.frame(x = c(xmin,ymin), y = c(xmax ,ymax)) + # + # message("Create Polygon from AOI...") + # + # polygon <- polygon_coord_df |> + # # create sf_point object, convert to cube crs + # st_as_sf(coords = c("x", "y"), crs = 4326) %>% st_transform(crs = cube_crs) |> + # st_bbox() |> + # st_as_sfc() |> + # # create sf_polygon object + # st_as_sf() + # + # message(polygon) + # + # # create grid with same spatial resolution to later join the geometry with each pixel of the satellite image + # grid = st_as_stars(st_bbox(polygon), dx = 30, dy = 30) + # + # message("Rasterize Polygon...") + # + # # each point represents the center of a raster cell in the satellite image + # aoi_points = polygon |> st_rasterize(grid) |> st_as_sf(as_points = TRUE) + # + # message(aoi_points) + + + # message("Extract Features...") + # + # # extract geometry from the datacube + # features = extract_geom(data, aoi_points) + # + # message("Head of Features: ") + # message(head(features)) + return(data) + } + +) From 473221c87e73080eb184ce8be9e9818e70f79cd4 Mon Sep 17 00:00:00 2001 From: michael_b Date: Mon, 18 Dec 2023 01:43:05 +0100 Subject: [PATCH 2/8] add draft for process --- R/processes.R | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/R/processes.R b/R/processes.R index 1f3122c..04dbc53 100644 --- a/R/processes.R +++ b/R/processes.R @@ -1246,3 +1246,40 @@ naive_ml = Process$new( } ) + +# Train ML Model +train_model <- Process$new( + id = "train_model", + description = "Train a machine learning algorithm based on the provided training data on satellite imagery gathered from a datacube.", + categories = as.array("machine-learning", "cubes"), + summary = "train machine learning model.", + parameters = list( + Parameter$new( + name = "data", + description = "A data cube with bands.", + schema = list( + type = "object", + subtype = "raster-cube" + ) + ), + Parameter$new( + name = "bands", + description = "A list of band names.", + schema = list( + type = "array" + ), + optional = TRUE + ) + ), + returns = list( + description = "The trained model.", + schema = list(type = "object", subtype = "caret-ml-model") + ), + operation = function(data, bands, job) + { + message("Test Train model") + x = 1 + return(x) + } +) + From 8a6d022781187d304a5f7314a0ada9b8d7399039 Mon Sep 17 00:00:00 2001 From: michael_b Date: Mon, 18 Dec 2023 01:43:17 +0100 Subject: [PATCH 3/8] add support for RDS --- R/SessionConfig-Class.R | 11 +++++++++++ R/api.R | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/R/SessionConfig-Class.R b/R/SessionConfig-Class.R index 6616c0d..3b00ac3 100644 --- a/R/SessionConfig-Class.R +++ b/R/SessionConfig-Class.R @@ -67,6 +67,17 @@ SessionConfig = function(api.port = NULL, host = NULL, aws.ipv4 = NULL) { ) ) + ), + RDS = list( + title = "R Data Set", + description = "Export to RDS", + parameters = list( + format = list( + type = "string", + description = "RDS" + ) + ) + ) ), inputFormats = list( diff --git a/R/api.R b/R/api.R index ccbdbb9..963f042 100644 --- a/R/api.R +++ b/R/api.R @@ -172,6 +172,22 @@ NULL else if (format$title == "GeoTiff") { file = gdalcubes::write_tif(job$results) } + else if (format$title == "R Data Set") + { + # THINK ABOUT PRETTIER SOLUTION + + temp = base::tempfile() + + base::saveRDS(job$results, temp) + + res$status = 200 + res$body = readBin(temp, "raw", n = file.info(temp)$size) + + content_type = plumber:::getContentType(tools::file_ext(temp)) + res$setHeader("Content-Type", content_type) + + return(res) + } else { throwError("FormatUnsupported") } @@ -183,18 +199,38 @@ NULL else if (format == "GTiff") { file = gdalcubes::write_tif(job$results) } + else if (format == "RDS") { + + # THINK ABOUT PRETTIER SOLUTION + + temp = base::tempfile() + + base::saveRDS(job$results, temp) + + res$status = 200 + res$body = readBin(temp, "raw", n = file.info(temp)$size) + + content_type = plumber:::getContentType(tools::file_ext(temp)) + res$setHeader("Content-Type", content_type) + + return(res) + } else { throwError("FormatUnsupported") } } + # path to where the data is stored first = file[1] + res$status = 200 res$body = readBin(first, "raw", n = file.info(first)$size) + content_type = plumber:::getContentType(tools::file_ext(first)) res$setHeader("Content-Type", content_type) return(res) + },error=handleError) } @@ -315,5 +351,6 @@ addEndpoint = function() { Session$assignProcess(multiply) Session$assignProcess(divide) Session$assignProcess(naive_ml) + Session$assignProcess(train_model) } From 93091ee32b70a10542f3f0e46882f9360bd3aa53 Mon Sep 17 00:00:00 2001 From: michael_b Date: Mon, 18 Dec 2023 02:05:31 +0100 Subject: [PATCH 4/8] improved export error handling --- R/api.R | 77 ++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 23 deletions(-) diff --git a/R/api.R b/R/api.R index 963f042..608fd68 100644 --- a/R/api.R +++ b/R/api.R @@ -175,18 +175,33 @@ NULL else if (format$title == "R Data Set") { # THINK ABOUT PRETTIER SOLUTION - - temp = base::tempfile() - - base::saveRDS(job$results, temp) - - res$status = 200 - res$body = readBin(temp, "raw", n = file.info(temp)$size) - - content_type = plumber:::getContentType(tools::file_ext(temp)) - res$setHeader("Content-Type", content_type) - - return(res) + tryCatch( + { + job$results = gdalcubes::as_json(job$results) + message(class(job$results)) + }, + error = function(error) + { + message('An Error Occurred') + message(error) + }, + warning = function(warning) { + message('A Warning Occurred') + message(warning) + }, + finally = { + + # perform rest of the result + temp = base::tempfile() + base::saveRDS(job$results, temp) + res$status = 200 + res$body = readBin(temp, "raw", n = file.info(temp)$size) + + content_type = plumber:::getContentType(tools::file_ext(temp)) + res$setHeader("Content-Type", content_type) + + return(res) + }) } else { throwError("FormatUnsupported") @@ -202,18 +217,34 @@ NULL else if (format == "RDS") { # THINK ABOUT PRETTIER SOLUTION + tryCatch( + { + job$results = gdalcubes::as_json(job$results) + message(class(job$results)) + }, + error = function(error) + { + message('An Error Occurred') + message(error) + }, + warning = function(warning) { + message('A Warning Occurred') + message(warning) + }, + finally = { + + # perform rest of the result + temp = base::tempfile() + base::saveRDS(job$results, temp) + res$status = 200 + res$body = readBin(temp, "raw", n = file.info(temp)$size) + + content_type = plumber:::getContentType(tools::file_ext(temp)) + res$setHeader("Content-Type", content_type) + + return(res) + }) - temp = base::tempfile() - - base::saveRDS(job$results, temp) - - res$status = 200 - res$body = readBin(temp, "raw", n = file.info(temp)$size) - - content_type = plumber:::getContentType(tools::file_ext(temp)) - res$setHeader("Content-Type", content_type) - - return(res) } else { throwError("FormatUnsupported") From e8507b004645465db03d80fa4f2ed65ca26257cc Mon Sep 17 00:00:00 2001 From: michael_b Date: Wed, 20 Dec 2023 17:04:52 +0100 Subject: [PATCH 5/8] Revert commits 35bfe4948ef3a44253048127b0406b44d6fc9c95 473221c87e73080eb184ce8be9e9818e70f79cd4 --- R/api.R | 3 +++ R/processes.R | 3 +++ 2 files changed, 6 insertions(+) diff --git a/R/api.R b/R/api.R index 608fd68..819dca4 100644 --- a/R/api.R +++ b/R/api.R @@ -381,7 +381,10 @@ addEndpoint = function() { Session$assignProcess(subtract) Session$assignProcess(multiply) Session$assignProcess(divide) +<<<<<<< HEAD Session$assignProcess(naive_ml) Session$assignProcess(train_model) +======= +>>>>>>> parent of 35bfe49 (first draft for ml process) } diff --git a/R/processes.R b/R/processes.R index 04dbc53..3cfe7ee 100644 --- a/R/processes.R +++ b/R/processes.R @@ -1140,6 +1140,7 @@ save_result <- Process$new( return(data) } ) +<<<<<<< HEAD naive_ml = Process$new( id = "naive_ml", @@ -1283,3 +1284,5 @@ train_model <- Process$new( } ) +======= +>>>>>>> parent of 35bfe49 (first draft for ml process) From bfc981dcfad467012087684dadb04cc4d2c78498 Mon Sep 17 00:00:00 2001 From: michael_b Date: Wed, 20 Dec 2023 17:06:55 +0100 Subject: [PATCH 6/8] Revert commits 473221c87e73080eb184ce8be9e9818e70f79cd4 35bfe4948ef3a44253048127b0406b44d6fc9c95 --- R/api.R | 6 --- R/processes.R | 146 -------------------------------------------------- 2 files changed, 152 deletions(-) diff --git a/R/api.R b/R/api.R index 819dca4..32a7edd 100644 --- a/R/api.R +++ b/R/api.R @@ -381,10 +381,4 @@ addEndpoint = function() { Session$assignProcess(subtract) Session$assignProcess(multiply) Session$assignProcess(divide) -<<<<<<< HEAD - Session$assignProcess(naive_ml) - Session$assignProcess(train_model) -======= ->>>>>>> parent of 35bfe49 (first draft for ml process) - } diff --git a/R/processes.R b/R/processes.R index 3cfe7ee..a8028c1 100644 --- a/R/processes.R +++ b/R/processes.R @@ -1140,149 +1140,3 @@ save_result <- Process$new( return(data) } ) -<<<<<<< HEAD - -naive_ml = Process$new( - id = "naive_ml", - description = "Simple implementation of Random Forest Algorithm.", - categories = as.array("cubes"), - summary = "perform RF on a Datacube.", - parameters = list( - Parameter$new( - name = "data", - description = "A data cube with bands.", - schema = list( - type = "object", - subtype = "raster-cube" - )), - Parameter$new( - name = "spatial_extent", - description = "Limits the data to load from the collection to the specified bounding box", - schema = list( - list( - title = "Bounding box", - type = "object", - subtype = "bounding-box", - properties = list( - east = list( - description = "East (upper right corner, coordinate axis 1).", - type = "number" - ), - west = list( - description = "West lower left corner, coordinate axis 1).", - type = "number" - ), - north = list( - description = "North (upper right corner, coordinate axis 2).", - type = "number" - ), - south = list( - description = "South (lower left corner, coordinate axis 2).", - type = "number" - ) - ), - required = c("east", "west", "south", "north") - ), - list( - title = "GeoJson", - type = "object", - subtype = "geojson" - ), - list( - title = "No filter", - description = "Don't filter spatially. All data is included in the data cube.", - type = "null" - ) - ) - ) - ), - returns = eo_datacube, - operation = function(data, spatial_extend, job) - { - message("naive ML") - # # get spatial extend to build polygon - # xmin <- as.numeric(spatial_extent$west) - # ymin <- as.numeric(spatial_extent$south) - # xmax <- as.numeric(spatial_extent$east) - # ymax <- as.numeric(spatial_extent$north) - # - # # get CRS from Datacube (via Int parsing) - # cube_crs = as.numeric(gsub("\\D", "", gdalcubes::srs(cube))) - # - # - # # create polygon for later extraction of the cube - # polygon_coord_df = data.frame(x = c(xmin,ymin), y = c(xmax ,ymax)) - # - # message("Create Polygon from AOI...") - # - # polygon <- polygon_coord_df |> - # # create sf_point object, convert to cube crs - # st_as_sf(coords = c("x", "y"), crs = 4326) %>% st_transform(crs = cube_crs) |> - # st_bbox() |> - # st_as_sfc() |> - # # create sf_polygon object - # st_as_sf() - # - # message(polygon) - # - # # create grid with same spatial resolution to later join the geometry with each pixel of the satellite image - # grid = st_as_stars(st_bbox(polygon), dx = 30, dy = 30) - # - # message("Rasterize Polygon...") - # - # # each point represents the center of a raster cell in the satellite image - # aoi_points = polygon |> st_rasterize(grid) |> st_as_sf(as_points = TRUE) - # - # message(aoi_points) - - - # message("Extract Features...") - # - # # extract geometry from the datacube - # features = extract_geom(data, aoi_points) - # - # message("Head of Features: ") - # message(head(features)) - return(data) - } - -) - -# Train ML Model -train_model <- Process$new( - id = "train_model", - description = "Train a machine learning algorithm based on the provided training data on satellite imagery gathered from a datacube.", - categories = as.array("machine-learning", "cubes"), - summary = "train machine learning model.", - parameters = list( - Parameter$new( - name = "data", - description = "A data cube with bands.", - schema = list( - type = "object", - subtype = "raster-cube" - ) - ), - Parameter$new( - name = "bands", - description = "A list of band names.", - schema = list( - type = "array" - ), - optional = TRUE - ) - ), - returns = list( - description = "The trained model.", - schema = list(type = "object", subtype = "caret-ml-model") - ), - operation = function(data, bands, job) - { - message("Test Train model") - x = 1 - return(x) - } -) - -======= ->>>>>>> parent of 35bfe49 (first draft for ml process) From 8a3f1cef5735e0c8244e40670fd0e09883524319 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Br=C3=BCggemann?= <102391796+MichaelBrueggemann@users.noreply.github.com> Date: Tue, 26 Dec 2023 12:05:03 +0100 Subject: [PATCH 7/8] Ml process v2 (#1) * better error message * added debugging output * change host for better local testing * added feature extraction and data splitting * updated docu and startup script * delete old draft process * code restructure and feature size based reducing * added model training with fixed hyperparams * fixed training model * added /jobs * add 'save model to workspace' functionality * added model_id for model retrival * added test workspace * added predict model process * better hyperparameter debug print * added export "data.frame as NetCDF" * prediction results as spatial data.frame --- .gitignore | 7 +- R/api.R | 107 +++++++++- R/processes.R | 578 +++++++++++++++++++++++++++++++++++++++++--------- README.md | 5 +- startLocal.R | 14 +- 5 files changed, 589 insertions(+), 122 deletions(-) diff --git a/.gitignore b/.gitignore index 97b79e4..1258a2b 100644 --- a/.gitignore +++ b/.gitignore @@ -41,5 +41,8 @@ vignettes/*.pdf #mac .DS_Store -#results from backend jobs -/results \ No newline at end of file +# test workspace +/test_workspace + +# jobInfos from backend jobs +/jobs \ No newline at end of file diff --git a/R/api.R b/R/api.R index 608fd68..34e55d9 100644 --- a/R/api.R +++ b/R/api.R @@ -165,9 +165,60 @@ NULL job = newJob$run() format = job$output + + #TODO: IMPLEMENT CHECK, IF job$results == datacube + # if Datacube: write in desired format as is + # if data.frame (NetCDF): write whole data.frame as NetCDF + + # TODO: harmonize whole file save process do reduce redundancy + if (class(format) == "list") { if (format$title == "Network Common Data Form") { - file = gdalcubes::write_ncdf(job$results) + + # if no error occurs, job$results == datacube + tryCatch( + { + file = gdalcubes::write_ncdf(job$results) + break + }, + error = function(error) + { + message('An Error Occurred.') + message(toString(error)) + }, + warning = function(warning) { + message('A Warning Occurred') + message(toString(warning)) + }) + # when error then proceed with the "data.frame" variant + tryCatch( + { + + # convert data.frame in "spatial dataframe" + data = sf::st_as_sf(job$results) + + file = base::tempfile() + + # save whole file + sf::st_write(data, file, driver = "netCDF") + + res$status = 200 + res$body = readBin(file, "raw", n = file.info(file)$size) + + content_type = plumber:::getContentType(tools::file_ext(file)) + res$setHeader("Content-Type", content_type) + + return(res) + }, + error = function(error) + { + message('An Error Occurred.') + message(toString(error)) + }, + warning = function(warning) { + message('A Warning Occurred') + message(toString(warning)) + }) } else if (format$title == "GeoTiff") { file = gdalcubes::write_tif(job$results) @@ -182,7 +233,7 @@ NULL }, error = function(error) { - message('An Error Occurred') + message('An Error Occurred. Passed data was not of type "datacube".') message(error) }, warning = function(warning) { @@ -209,7 +260,53 @@ NULL } else { if (format == "NetCDF") { - file = gdalcubes::write_ncdf(job$results) + + # if no error occurs, job$results == datacube + tryCatch( + { + file = gdalcubes::write_ncdf(job$results) + break + }, + error = function(error) + { + message('An Error Occurred.') + message(toString(error)) + }, + warning = function(warning) { + message('A Warning Occurred') + message(toString(warning)) + }) + # when error then proceed with the "data.frame" variant + tryCatch( + { + + # convert data.frame in "spatial dataframe" + data = sf::st_as_sf(job$results) + + file = base::tempfile() + + # save whole file + sf::st_write(data, file, driver = "netCDF") + + res$status = 200 + res$body = readBin(file, "raw", n = file.info(file)$size) + + content_type = plumber:::getContentType(tools::file_ext(file)) + res$setHeader("Content-Type", content_type) + + return(res) + }, + error = function(error) + { + message('An Error Occurred.') + message(toString(error)) + }, + warning = function(warning) { + message('A Warning Occurred') + message(toString(warning)) + }) + + } else if (format == "GTiff") { file = gdalcubes::write_tif(job$results) @@ -224,7 +321,7 @@ NULL }, error = function(error) { - message('An Error Occurred') + message('An Error Occurred. Passed data was not of type "datacube".') message(error) }, warning = function(warning) { @@ -381,7 +478,7 @@ addEndpoint = function() { Session$assignProcess(subtract) Session$assignProcess(multiply) Session$assignProcess(divide) - Session$assignProcess(naive_ml) Session$assignProcess(train_model) + Session$assignProcess(predict_model) } diff --git a/R/processes.R b/R/processes.R index 04dbc53..d76dc90 100644 --- a/R/processes.R +++ b/R/processes.R @@ -1141,11 +1141,13 @@ save_result <- Process$new( } ) -naive_ml = Process$new( - id = "naive_ml", - description = "Simple implementation of Random Forest Algorithm.", - categories = as.array("cubes"), - summary = "perform RF on a Datacube.", + +# Train ML Model +train_model <- Process$new( + id = "train_model", + description = "Train a machine learning algorithm based on the provided training data on satellite imagery gathered from a datacube.", + categories = as.array("machine-learning", "cubes"), + summary = "train machine learning model.", parameters = list( Parameter$new( name = "data", @@ -1153,10 +1155,296 @@ naive_ml = Process$new( schema = list( type = "object", subtype = "raster-cube" - )), + ) + ), Parameter$new( - name = "spatial_extent", - description = "Limits the data to load from the collection to the specified bounding box", + name = "model_type", + description = "Type of the model to be trained. Must be one of the following types: RF.", + schema = list( + type = "string" + ), + ), + Parameter$new( + name = "labeled_polygons", + description = "String containing the GeoJSON with Polygons. These contain class labels used to train the model.", + schema = list( + type = "string", + subtype = "GeoJSON" + ) + ), + Parameter$new( + name = "hyperparameters", + description = "List of Hyperparameters used for the model", + schema = list( + type = "list" + ), + optional = TRUE + ), + Parameter$new( + name = "save_model", + description = "Declare wether the computed model should be saved in the user workspace. Defaults to false.", + schema = list( + type = "boolean" + ), + optional = TRUE + ), + Parameter$new( + name = "model_id", + description = "Id under which the model should be stored. Defaults to NULL", + schema = list( + type = "string" + ), + optional = TRUE + ) + + ), + returns = list( + description = "The trained model.", + schema = list(type = "object", subtype = "caret-ml-model") + ), + operation = function(data, model_type, labeled_polygons, hyperparameters = NULL, save_model = FALSE, model_id = NULL, job) + { + # show call stack for debugging + message("train_model called...") + + message("\nCall parameters: ") + message("\ndata: ") + message(gdalcubes::as_json(data)) + message("\nmodel_type: ") + message(model_type) + + tryCatch({ + message("\nlabeled_polygons: ") + + # read GeoJSON data as sf + labeled_polygons = sf::st_read(labeled_polygons, quiet = TRUE) + + # change CRS to cube CRS + labeled_polygons = sf::st_transform(labeled_polygons, crs = gdalcubes::srs(data)) + + message("Training Polygons sucessfully loaded!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + message("\nhyperparameters: ") + if (is.null(hyperparameters)) + { + message("No Hyperparameters passed!") + } + else + { + for (name in names(hyperparameters)) + { + message(paste0(name, ": ", hyperparameters[name])) + } + } + + message("\nsave_model:") + message(save_model) + + message("\nmodel_id:") + print(model_id) # to also show "NULL" + + + # obvios boolean check for mor readibility + if (save_model == TRUE && is.null(model_id)) + { + message("If the model should be safed, a model_id needs to be given!") + stop("") + } + + tryCatch({ + message("\nExtract features...") + + # extract features from cube + features = gdalcubes::extract_geom(data, labeled_polygons) + + message("all features extracted!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + # add FID for merge with 'features' + labeled_polygons$FID = rownames(labeled_polygons) + + tryCatch({ + message("\nMerge features with training data...") + + # this df contains all information from the datacube and the labeled_polgons + training_df = merge(labeled_polygons, features, by = "FID") + + message("Merging complete!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + # make copy to filter out values not needed for training + training_df_filtered = training_df + training_df_filtered$time = NULL + training_df_filtered$geometry = NULL + + + #TODO: find reasonable threshold + if (nrow(training_df) > 10000) + { + tryCatch({ + message("\nReduce number of features...") + # data frame for later storage + training_df_reduced = data.frame() + + # from all data with the same FID (same polygon) take only 50% of the + # features for each training polygon as they are assumed to carry similar information + for (i in as.numeric(unique(training_df_filtered$FID))) + { + #TODO: find better "reducing" function + sub_df = training_df_filtered[training_df_filtered$FID == i,] + + # take 50% of sub_df rows + sub_df = sub_df[1:(nrow(sub_df)/2),] + + # append new rows + training_df_reduced = rbind(training_df_reduced, sub_df) + } + + # overwrite filtered df + training_df_filtered = training_df_reduced + + message("Reducing completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + } + + # remove FID to not train model on FID + training_df_filtered$FID = NULL + + + tryCatch({ + message("\nSplit training Data...") + + train_row_numbers = caret::createDataPartition( + training_df_filtered$class, p=0.8, list=FALSE + ) + training_data = training_df_filtered[train_row_numbers,] + testing_data = training_df_filtered[-train_row_numbers,] + + message("Data splitting completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + # build specific model given by "model_type" + if (model_type == "RF") + { + # set seed for reproducibility while model training + #TODO possibly remove + set.seed(100) + + message("\nChecking hyperparameters for Random Forest...") + + if (!all(c("mtry", "ntree") %in% names(hyperparameters))) + { + message("'hyperparameters' has to contain 'mtry' and 'ntree'!") + stop("") + } + + message("hyperparameters for Random Forest checked!") + + # use fixed hyperparams given by the user + # (this may result in a lack of accuracy for the model) + if (!is.null(hyperparameters)) + { + + tryCatch({ + message("\nTrain Model with fixed hyperparameters...") + + # no parameters are tuned + trainCtrl <- caret::trainControl(method = "none", classProbs = TRUE) + + model <- caret::train( + class ~ ., + data = training_data, + method = "rf", + trControl = trainCtrl, + # only one model is passed (fixed hyperparams are given) + tuneGrid = expand.grid(mtry = hyperparameters$mtry), + ntree = hyperparameters$ntree) + + message("Model training finished!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + } + # else tune model hyperparameters + + } + + # save model to user workspace + if (save_model) + { + tryCatch({ + message("\nSaving model to user workspace...") + + saveRDS(model, paste0(Session$getConfig()$workspace.path, "/", model_id, ".rds")) + + message("Saving complete!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + } + + return(model) + } +) + + +predict_model <- Process$new( + id = "predict_model", + description = "Perform a prediction on a datacube based on the given model.", + categories = as.array("cubes", "machine-learning"), + summary = "Predict data on datacube.", + parameters = list( + Parameter$new( + name = "data", + description = "The data to work with.", + schema = list( + type = "object", + subtype = "raster-cube" + ) + ), + Parameter$new( + name = "model_id", + description = "Id of the model that should be used for prediction. The model will be searches in the user workspace.", + schema = list( + type = "string" + ) + ), + Parameter$new( + name = "aoi_extend", + description = "Spatial extend of the area of interest (aoi) to be used for classification. Has to be in EPSG:3857", schema = list( list( title = "Bounding box", @@ -1181,105 +1469,191 @@ naive_ml = Process$new( ) ), required = c("east", "west", "south", "north") - ), - list( - title = "GeoJson", - type = "object", - subtype = "geojson" - ), - list( - title = "No filter", - description = "Don't filter spatially. All data is included in the data cube.", - type = "null" ) ) ) ), - returns = eo_datacube, - operation = function(data, spatial_extend, job) - { - message("naive ML") - # # get spatial extend to build polygon - # xmin <- as.numeric(spatial_extent$west) - # ymin <- as.numeric(spatial_extent$south) - # xmax <- as.numeric(spatial_extent$east) - # ymax <- as.numeric(spatial_extent$north) - # - # # get CRS from Datacube (via Int parsing) - # cube_crs = as.numeric(gsub("\\D", "", gdalcubes::srs(cube))) - # - # - # # create polygon for later extraction of the cube - # polygon_coord_df = data.frame(x = c(xmin,ymin), y = c(xmax ,ymax)) - # - # message("Create Polygon from AOI...") - # - # polygon <- polygon_coord_df |> - # # create sf_point object, convert to cube crs - # st_as_sf(coords = c("x", "y"), crs = 4326) %>% st_transform(crs = cube_crs) |> - # st_bbox() |> - # st_as_sfc() |> - # # create sf_polygon object - # st_as_sf() - # - # message(polygon) - # - # # create grid with same spatial resolution to later join the geometry with each pixel of the satellite image - # grid = st_as_stars(st_bbox(polygon), dx = 30, dy = 30) - # - # message("Rasterize Polygon...") - # - # # each point represents the center of a raster cell in the satellite image - # aoi_points = polygon |> st_rasterize(grid) |> st_as_sf(as_points = TRUE) - # - # message(aoi_points) - - - # message("Extract Features...") - # - # # extract geometry from the datacube - # features = extract_geom(data, aoi_points) - # - # message("Head of Features: ") - # message(head(features)) - return(data) - } - -) - -# Train ML Model -train_model <- Process$new( - id = "train_model", - description = "Train a machine learning algorithm based on the provided training data on satellite imagery gathered from a datacube.", - categories = as.array("machine-learning", "cubes"), - summary = "train machine learning model.", - parameters = list( - Parameter$new( - name = "data", - description = "A data cube with bands.", - schema = list( - type = "object", - subtype = "raster-cube" - ) - ), - Parameter$new( - name = "bands", - description = "A list of band names.", - schema = list( - type = "array" - ), - optional = TRUE - ) - ), returns = list( - description = "The trained model.", - schema = list(type = "object", subtype = "caret-ml-model") + description = "Spatial data frame containing the geometry, class and class probability for each pixel", + schema = list(type = "data.frame") ), - operation = function(data, bands, job) - { - message("Test Train model") - x = 1 - return(x) + operation = function(data, model_id, aoi_extend, job) { + # show call stack for debugging + message("predict_model called...") + + message("\nCall parameters: ") + message("\ndata: ") + message(gdalcubes::as_json(data)) + + message("\nmodel_id:") + message(model_id) + + message("\naoi_extend:") + for (name in names(aoi_extend)) + { + message(paste0(name),": ", aoi_extend[name]) + } + + xmin = aoi_extend$west + ymin = aoi_extend$south + xmax = aoi_extend$east + ymax = aoi_extend$north + + + tryCatch({ + message("\nCreate AOI Polygons...") + + aoi_polygon_df = data.frame(x = c(xmin,xmax), y = c(ymin ,ymax)) + + poly <- aoi_polygon_df |> + # create sf_point object + sf::st_as_sf(coords = c("x", "y"), crs = 3857) |> sf::st_transform(gdalcubes::srs(data)) |> + sf::st_bbox() |> + sf::st_as_sfc() |> + # create sf_polygon object + sf::st_as_sf() + + + # get cube resolution + cube_resolution = gdalcubes::dimensions(data)$x$pixel_size + + # grid to rasterize the aoi polygon + grid = stars::st_as_stars(sf::st_bbox(poly), dx = cube_resolution, dy = cube_resolution) + + # aoi polygon rastern (in ein polygon pro pixel) + aoi_points = poly |> stars::st_rasterize(grid) |> sf::st_as_sf() + + message("AOI Polygons created!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + tryCatch({ + message("\nExtract features...") + + # extract features from cube + features = gdalcubes::extract_geom(data, aoi_points) + + # reset FID to prevent mismatch after extraction + features$FID = NULL + features$FID = rownames(features) + + message("All features extracted!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + tryCatch({ + message("\nAdd spatial information to data.frame...") + + # FID for later merge + aoi_points$FID = rownames(aoi_points) + + # remove old ID + aoi_points$ID = NULL + + message("Spatial information added!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + + tryCatch({ + message("\nMerge data.frame and aoi_points...") + + features = base::merge(features, aoi_points, by = "FID") + + # reset FID to prevent mismatch after merge + features$FID = rownames(features) + + message("Merge completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + tryCatch({ + message("\nPreparing prediction dataset...") + + # copy features to filter out unwanted data + features_filtered = features + features_filtered$time = NULL + features_filtered$FID = NULL + features_filtered$geometry = NULL + + message("Data preperation finished!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + tryCatch({ + message("\nPerform predicition...") + + # get model from user workspace + model = readRDS(paste0(Session$getConfig()$workspace.path, "/", model_id, ".rds")) + + # predict classes + predicted_classes = stats::predict(model, newdata = features_filtered) + + # get class probalilities + prediction_accuracys = stats::predict(model, newdata = features_filtered, type = "prob") + + # get column with only the highest class prob + max_accuracy_per_pixel = apply(prediction_accuracys, 1, base::max) + + message("Prediction completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + + tryCatch({ + message("\nCreate output dataframe...") + + # create data.frame of same length as features + output_dataframe = base::as.data.frame(base::matrix(NA, + nrow = nrow(features), + ncol = 1, + dimnames = list(c(), "FID"))) + + output_dataframe$FID = features$FID + output_dataframe$class = predicted_classes + output_dataframe$class_accuracys = max_accuracy_per_pixel + output_dataframe$geometry = features$geometry + + message("Output dataframe created!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + + return(output_dataframe) } ) - diff --git a/README.md b/README.md index de2a1cb..635cc50 100644 --- a/README.md +++ b/README.md @@ -89,11 +89,12 @@ docker-compose build --no-cache && docker-compose up ``` ## Development Notes: -While developing, you can skip rebuilding the docker container everytime. Instead you can run the server locally. -Just run "Rscript startLocal.R" inside this directory. +While developing, you can skip rebuilding the docker container everytime. Instead you can run the server locally. To run this server locally, you need RTools4.0. For easier setup, please open "openeocubes.Rproj". Here every build tool is already set up and you can just run "Rscript startLocal.R" inside this directory. This will compile this Repository as a R Package and start the server. +The script "statLocal.R" is not intended to be used on an AWS Instance. + ## Getting Started: ### Example 1: NDVI Script in R-Studio using OpenEO R-Client diff --git a/startLocal.R b/startLocal.R index 755f05f..24f22ef 100644 --- a/startLocal.R +++ b/startLocal.R @@ -4,16 +4,8 @@ remotes::install_local("./", dependencies = TRUE, force = TRUE) # Start service library(openeocubes) -aws.host <- Sys.getenv("AWSHOST") - -if (aws.host == "") { - aws.host <- NULL -} else { - message("AWS host port id is: ", aws.host) -} - - -config <- SessionConfig(api.port = 8000, host = "0.0.0.0", aws.ipv4 = aws.host) -config$workspace.path <- "/var/openeo/workspace" +config <- SessionConfig(api.port = 8000, host = "127.0.0.1") +# set workspace for testing +config$workspace.path = paste0(getwd(), "/test_workspace") createSessionInstance(config) Session$startSession() From 85a63f2efe0d73e0720cde48b9c2d620b5b30114 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Br=C3=BCggemann?= <102391796+MichaelBrueggemann@users.noreply.github.com> Date: Thu, 4 Jan 2024 15:39:21 +0100 Subject: [PATCH 8/8] Changeable resolution in "load_collection" (#5) * revert commits * add changeable spatial resolution --- R/SessionConfig-Class.R | 11 ----------- R/api.R | 7 +++---- R/processes.R | 17 +++++++++++++++-- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/R/SessionConfig-Class.R b/R/SessionConfig-Class.R index 3b00ac3..6616c0d 100644 --- a/R/SessionConfig-Class.R +++ b/R/SessionConfig-Class.R @@ -67,17 +67,6 @@ SessionConfig = function(api.port = NULL, host = NULL, aws.ipv4 = NULL) { ) ) - ), - RDS = list( - title = "R Data Set", - description = "Export to RDS", - parameters = list( - format = list( - type = "string", - description = "RDS" - ) - ) - ) ), inputFormats = list( diff --git a/R/api.R b/R/api.R index 34e55d9..dbed33f 100644 --- a/R/api.R +++ b/R/api.R @@ -254,6 +254,7 @@ NULL return(res) }) } + else { throwError("FormatUnsupported") } @@ -311,6 +312,7 @@ NULL else if (format == "GTiff") { file = gdalcubes::write_tif(job$results) } + else if (format == "RDS") { # THINK ABOUT PRETTIER SOLUTION @@ -343,22 +345,19 @@ NULL }) } + else { throwError("FormatUnsupported") } } - # path to where the data is stored first = file[1] - res$status = 200 res$body = readBin(first, "raw", n = file.info(first)$size) - content_type = plumber:::getContentType(tools::file_ext(first)) res$setHeader("Content-Type", content_type) return(res) - },error=handleError) } diff --git a/R/processes.R b/R/processes.R index d76dc90..08f3d54 100644 --- a/R/processes.R +++ b/R/processes.R @@ -124,10 +124,18 @@ load_collection <- Process$new( type = "array" ), optional = TRUE + ), + Parameter$new( + name = "resolution", + description = "Specify resolution for spatial resampling.", + schema = list( + type = "integer" + ), + optional = TRUE ) ), returns = eo_datacube, - operation = function(id, spatial_extent, crs = 4326, temporal_extent, bands = NULL, job) { + operation = function(id, spatial_extent, crs = 4326, temporal_extent, bands = NULL, resolution = 30, job) { # temporal extent preprocess t0 <- temporal_extent[[1]] t1 <- temporal_extent[[2]] @@ -190,7 +198,7 @@ load_collection <- Process$new( crs <- c("EPSG", crs) crs <- paste(crs, collapse = ":") v.overview <- gdalcubes::cube_view( - srs = crs, dx = 30, dy = 30, dt = "P15D", + srs = crs, dx = resolution, dy = resolution, dt = "P15D", aggregation = "median", resampling = "average", extent = list( t0 = t0, t1 = t1, @@ -206,6 +214,9 @@ load_collection <- Process$new( cube <- gdalcubes::select_bands(cube, bands) } + message(gdalcubes::dimensions(cube)) + + message("The data cube is created....") message(gdalcubes::as_json(cube)) return(cube) @@ -1142,6 +1153,7 @@ save_result <- Process$new( ) + # Train ML Model train_model <- Process$new( id = "train_model", @@ -1657,3 +1669,4 @@ predict_model <- Process$new( return(output_dataframe) } ) +