diff --git a/docs/src/pages/guides/FHIRPerformanceGuide.md b/docs/src/pages/guides/FHIRPerformanceGuide.md index e6ec4706303..f8a0e27daba 100644 --- a/docs/src/pages/guides/FHIRPerformanceGuide.md +++ b/docs/src/pages/guides/FHIRPerformanceGuide.md @@ -202,7 +202,7 @@ The following table summarizes how the transaction timeout is used for different | POST/PUT | Single transaction scope for entire request | | Batch Bundle | Transaction per bundle entry. Request processing time can therefore exceed totalTranLifetimeTimeout | | Transaction Bundle | Single transaction scope for entire request | -| $reindex | One HTTP call can request multiple resources to be reindexed. Each resource is reindexed in the scope of its own transaction. Reindexing is a relatively quick operation per resource - usually well under 1s - so transaction timeouts are unlikely. Reduce the number of resources processed per reindex operation to avoid read timeouts. Use concurrent requests to increase overall throughput. | +| $reindex | One HTTP call can request multiple resources to be reindexed. By default, each resource is reindexed in the scope of its own transaction. Reindexing is a relatively quick operation per resource - usually well under 1s - so transaction timeouts are unlikely. However, if a list of index IDs is specified, all those resources will be reindexed within a single tranaction, so reduce the number index IDs specified if transaction timeouts occur. Use concurrent requests to increase overall throughput. | Because some requests use multiple transactions under the covers, the overall request response time can sometimes be greater than the transaction timeout. There is no server-side tuneable property for the overall request processing time. Tuning of the client read timeout and/or network configuration may be required when extending the maximum transaction time to more than 2 minutes, or supporting multi-transaction requests which also exceed 2 minutes. @@ -356,11 +356,11 @@ See the [PostgreSQL Query Planning](https://www.postgresql.org/docs/12/runtime-c ### 4.1.1. Fillfactor -In PostgreSQL, the default `fillfactor` for each table is 100 - no room is reserved for updates. This maximizes storage utilization, but impacts performance for updates which occur when new versions of a resource are ingested. Update statements are also used frequently during the reindex process. +In PostgreSQL, the default `fillfactor` for each table is 100 - no room is reserved for updates. This maximizes storage utilization, but impacts performance for updates which occur when new versions of a resource are ingested. Update statements are also used frequently during the reindex process, if index IDs are not specified. To provide space for updates, all the `_logical_resources` should be configured with a `fillfactor` of 80 as a starting point. DBAs may specify their own `fillfactor` values based on their own knowledge and understanding of the system. -The `fillfactor` for the `logical_resources` table may benefit from an even lower value to support the heavy update load during a reindex operation. This is a special case due to the fact that every row in the table is updated once. +The `fillfactor` for the `logical_resources` table may benefit from an even lower value to support the heavy update load during a reindex operation, if index IDs are not specified. This is a special case due to the fact that every row in the table is updated once. To change the fillfactor for existing data, a `VACUUM FULL` operation is required: @@ -374,7 +374,7 @@ This should only be performed during a maintenance window when there is no load ### 4.1.2. Tuning Auto-vacuum -When running reindex operations (after a search parameter configuration change, for example), the `logical_resources` table undergoes frequent updates to an indexed column. Due to the nature of how PostgreSQL handles updates, this results in a significant amount of old index blocks which slows progress. The table storage parameters may need to be tuned to vacuum the `logical_resources` table more aggressively. To address this, tune the storage parameters for this table as follows: +When running reindex operations (after a search parameter configuration change, for example) without specifying index IDs, the `logical_resources` table undergoes frequent updates to an indexed column. Due to the nature of how PostgreSQL handles updates, this results in a significant amount of old index blocks which slows progress. The table storage parameters may need to be tuned to vacuum the `logical_resources` table more aggressively. To address this, tune the storage parameters for this table as follows: ``` sql -- Lower the trigger threshold for starting work @@ -384,7 +384,7 @@ alter table fhirdata.logical_resources SET (autovacuum_vacuum_scale_factor = 0.0 alter table fhirdata.logical_resources SET (autovacuum_vacuum_cost_limit=2000); ``` -The default value for autovacuum_vacuum_cost_limit is likely too restrictive for a system with good IO performance. Increasing the value to 2000 increases the throttling threshold 10x, significantly improving throughput and helping the `logical_resources` vacuuming to be completed before it negatively impacts reindexing performance. +The default value for autovacuum_vacuum_cost_limit is likely too restrictive for a system with good IO performance. Increasing the value to 2000 increases the throttling threshold 10x, significantly improving throughput and helping the `logical_resources` vacuuming to be completed before it negatively impacts performance of the reindex operation (when index IDs are not specified on the reindex operation). See the [PostSQL VACUUM documentation](https://www.postgresql.org/docs/12/sql-vacuum.html) for more details. diff --git a/docs/src/pages/guides/FHIRSearchConfiguration.md b/docs/src/pages/guides/FHIRSearchConfiguration.md index 9a956bbedd7..02fd29275b9 100644 --- a/docs/src/pages/guides/FHIRSearchConfiguration.md +++ b/docs/src/pages/guides/FHIRSearchConfiguration.md @@ -265,62 +265,43 @@ Reindexing is implemented as a custom operation that tells the IBM FHIR Server t The `$reindex` operation can be invoked via an HTTP(s) POST to `[base]/$reindex`, `[base]/[type]/$reindex`, or `[base]/[type]/[instance]/$reindex`. By default, the operation at the System-level or Type-level selects 10 resources and re-extract their search parameters values based on the current configuration of the server. The operation supports the following parameters to control the behavior: +Reindexing is resource-intensive and can take several hours or even days to complete depending on the approach used, the number of resources currently in the system, and the capability of the hosting platform. + +### 2.1 Server-side-driven approach +By default, the operation will select 10 resources and re-extract their search parameters values based on the current configuration of the server. The operation supports the following parameters to control the behavior: + |name|type|description| |----|----|-----------| -|`tstamp`|string|Reindex any resource not previously reindexed before this timestamp. Format as a date YYYY-MM-DD or time YYYY-MM-DDTHH:MM:DDZ.| +|`tstamp`|string|Reindex only resources not previously reindexed since this timestamp. Format as a date YYYY-MM-DD or time YYYY-MM-DDTHH:MM:SSZ.| |`resourceCount`|integer|The maximum number of resources to reindex in this call. If this number is too large, the processing time might exceed the transaction timeout and fail.| -|`resourceLogicalId`|string|The ResourceType or the ResourceType/Logical id for targetted reindexing, only valid at System-level| - -An example request is: - -``` sh -curl --location --request POST 'https://localhost:9443/fhir-server/api/v4/$reindex' \ ---header 'X-FHIR-TENANT-ID: default' \ ---header 'Content-Type: application/fhir+json' \ --u 'fhiruser:change-password' \ ---data-raw '{ - "resourceType": "Parameters", - "parameter": [ - { - "name": "resourceCount", - "valueInteger": 100 - }, - { - "name": "tstamp", - "valueString": "2021-01-01" - } - ] -}' -``` -An example response when processing in a loop: +The IBM FHIR Server tracks when a resource was last reindexed and only resources with a reindex_tstamp value less than the given tstamp parameter will be processed. When a resource is reindexed, its reindex_tstamp is set to the given tstamp value. In most cases, using the current date (for example "2020-10-27") is the best option for this value. -``` json -{ - "resourceType": "OperationOutcome", - "issue": [ - { - "severity": "information", - "code": "informational", - "diagnostics": "Processed Patient/1795df2b501-04f88a35-9f2f-4871-a05e-ba8090fa18f5" - } - ] -} -``` +### 2.2 Client-side-driven approach +Another option is to have the client utilize the `$retrieve-index` and `$reindex` in parallel to drive the processing. -An example response when processing is complete: +The `$retrieve-index` operation is called to retrieve index IDs of resources available for reindexing. This can be done repeatedly by using the `_count` and `afterIndexId` parameters for pagination. The operation supports the following parameters to control the behavior: -``` json -{"resourceType":"OperationOutcome","issue":[{"severity":"information","code":"informational","diagnostics":"Reindex complete"}]} -``` +|name|type|description| +|----|----|-----------| +|`_count`|string|The maximum number of index IDs to retrieve. This may not exceed 1000. If not specified, the maxinum number retrieved is 1000.| +|`notModifiedAfter`|string|Only retrieve index IDs for resources not last updated after this timestamp. Format as a date YYYY-MM-DD or time YYYY-MM-DDTHH:MM:SSZ.| +|`afterIndexId`|string|Retrieve index IDs starting with the first index ID after this index ID. If this parameter is not specified, the retrieved index IDs start with the first index ID.| -The IBM FHIR Server tracks when a resource was last reindexed and only resources with a reindex_tstamp value less than the given tstamp parameter will be processed. When a resource is reindexed, its reindex_tstamp is set to the given tstamp value. In most cases, using the current date (for example "2020-10-27") is the best option for this value. +The `$retrieve-index` operation can be invoked via an HTTP(s) POST to `[base]/$retrieve-index` or `[base]/[type]/$retrieve-index`. Invoking this operation at the type-level only retrieves indexIDs for resources of that type. -To aid in the re-indexing process, the IBM FHIR Server team has expanded the fhir-bucket resource-loading tool to support driving the reindex. The fhir-bucket tool uses a thread-pool to make concurrent POST requests to the IBM FHIR Server `$reindex` custom operation. +The `$reindex` operation is called to reindex the resources with index IDs in the specified list. The operation supports the following parameters to control the behavior: -For more information on driving the reindex operation from fhir-bucket, see https://github.com/IBM/FHIR/tree/main/fhir-bucket#driving-the-reindex-custom-operation. +|name|type|description| +|----|----|-----------| +|`indexIds`|string|Reindex only resources with an index ID in the specified list, formatted as a comma-delimited list of strings. If number of index IDs in the list is too large, the processing time might exceed the transaction timeout and fail.| + +By specifying the index IDs on the `$reindex` operation, the IBM FHIR Server avoids the database overhead of choosing the next resource to reindex and updating the reindex_tstamp. Though it requires the client side to track the reindex progress, it should allow for an overall faster reindex. -Reindexing is resource-intensive and can take several hours or even days to complete depending on the number of resources currently in the system and the capability of the hosting platform. +### 2.3 fhir-bucket +To aid in the re-indexing process, the IBM FHIR Server team has expanded the fhir-bucket resource-loading tool to support driving the reindex, with the option of using either the server-side-driven or client-side-driven approach. The fhir-bucket tool uses a thread-pool to make concurrent POST requests to the IBM FHIR Server `$retrieve-index` and `$reindex` custom operations. + +For more information on driving the reindex operation from fhir-bucket, see https://github.com/IBM/FHIR/tree/main/fhir-bucket#driving-the-reindex-custom-operation. --- FHIR® is the registered trademark of HL7 and is used with the permission of HL7. diff --git a/docs/src/pages/guides/FHIRServerUsersGuide.md b/docs/src/pages/guides/FHIRServerUsersGuide.md index 2650558bcb4..3eaa739a315 100644 --- a/docs/src/pages/guides/FHIRServerUsersGuide.md +++ b/docs/src/pages/guides/FHIRServerUsersGuide.md @@ -511,7 +511,7 @@ In addition to the standard REST API (create, update, search, and so forth), the ### 4.1.1 Packaged operations The FHIR team provides implementations for the standard `$validate`, `$document`, `$everything`, `$expand`, `$lookup`, `$subsumes`, `$closure`, `$export`, `$import`, `$convert`, `$apply` and `$translate` operations, as well as a custom operation named `$healthcheck`, which queries the configured persistence layer to report its health. -The server also bundles `$reindex` to reindex instances of Resources so they are searchable, and `$erase` to hard delete instances of Resources. To learn more about the $erase operation, read the [design document](https://github.com/IBM/FHIR/tree/main/operation/fhir-operation-erase/README.md). +The server also bundles `$reindex` to reindex instances of Resources so they are searchable, `$retrieve-index` to retrieve lists of resources available to be reindexed, and `$erase` to hard delete instances of Resources. To learn more about the $erase operation, read the [design document](https://github.com/IBM/FHIR/tree/main/operation/fhir-operation-erase/README.md). To extend the server with additional operations, see [Section 4.1.2 Custom operations](#412-custom-operations) diff --git a/fhir-bucket/README.md b/fhir-bucket/README.md index c4a3a61f6ce..cc9a01c3262 100644 --- a/fhir-bucket/README.md +++ b/fhir-bucket/README.md @@ -355,13 +355,15 @@ java.util.logging.FileHandler.pattern=fhirbucket-%u-%g.log When the IBM FHIR Server stores a FHIR resource, it extracts a configurable set of searchable parameter values and stores them in specially indexed tables which are used to support search queries. When the search parameter configuration is changed (perhaps because a profile has been updated), users may want to apply this new configuration to resources already stored. By default, such configuration changes only apply to new resources. -The IBM FHIR Server supports a custom operation to rebuild or "reindex" the search parameters extracted from resources currently stored. The user selects a date or timestamp as the reindex "marker". This value is used to determine which resources have been reindexed, and which still need to be reindexed. When a resource is successfully reindexed, it is marked with this user-selected timestamp. Each reindex REST call will process up to the requested number of resources and return an OperationOutcome resource containing issues describing which resources were processed. When there are no resources left to update, the call returns an OperationOutcome with one issue indicating that the reindex is complete. +The IBM FHIR Server supports a custom operation to rebuild or "reindex" the search parameters extracted from resources currently stored. There are two approaches for driving the reindex, server-side-driven or client-side-driven. Using server-side-driven is the default; to use client-side-driven, include the `--index-client-side-driven` parameter. -To avoid read timeouts, the number of resources processed in a single reindex call can be limited. Reindex calls can be made in parallel to increase throughput. The best number for concurrent requests depends on the capabilities of the underlying platform and any desire to balance load with other users. Concurrency up to 200 threads have been tested. Monitor the IBM FHIR Server response times when increasing concurrency. Also, make sure that the connection pool configured in the FHIR server cluster can support the required number of threads. This also means that the database needs to be configured to support this number of connections (sessions) plus any overhead. +With server-side-driven, the fhir-bucket will repeatedly call the `$reindex` operation. The user selects a date or timestamp as the reindex "marker", which is used to determine which resources have been reindexed, and which still need to be reindexed. When a resource is successfully reindexed, it is marked with this user-selected timestamp. Each reindex REST call will process up to the requested number of resources and return an OperationOutcome resource containing issues describing which resources were processed. When there are no resources left to update, the call returns an OperationOutcome with one issue, with an issue diagnostic value "Reindex complete", indicating that the reindex is complete. +With client-side-driven, the fhir-bucket will repeatedly call two operations in parallel; the `$retrieve-index` operation to determine the list of resources available to reindex, and the `$reindex` operation with a list of resources to reindex. Driving the reindex this way avoids database contention associated with updating the reindex timestamp of each resource with reindex "marker", which is used by the server-side-driven approach to keep track of the next resource to reindex. -The fhir-bucket main app has been extended to support driving a reindex operation with high concurrency. Processing will stop once the FHIR server returns the OperationOutcome with an issue diagnostic value "Reindex complete". +To avoid read timeouts, the number of resources processed in a single reindex call can be limited. Reindex calls can be made in parallel to increase throughput. The best number for concurrent requests depends on the capabilities of the underlying platform and any desire to balance load with other users. Concurrency up to 200 threads have been tested. Monitor the IBM FHIR Server response times when increasing concurrency. Also, make sure that the connection pool configured in the FHIR server cluster can support the required number of threads. This also means that the database needs to be configured to support this number of connections (sessions) plus any overhead. +The fhir-bucket main app has been extended to support driving a reindex operation with high concurrency. ``` java \ @@ -373,7 +375,8 @@ java \ --no-scan \ --reindex-tstamp 2020-12-01T00:00:00Z \ --reindex-resource-count 50 \ - --reindex-concurrent-requests 20 + --reindex-concurrent-requests 20 \ + --reindex-client-side-driven ``` The format of the reindex timestamp can be a date `YYYY-MM-DD` representing `00:00:00` UTC on the given day, or an ISO timestamp `YYYY-MM-DDThh:mm:ssZ`. diff --git a/fhir-bucket/src/main/java/com/ibm/fhir/bucket/app/Main.java b/fhir-bucket/src/main/java/com/ibm/fhir/bucket/app/Main.java index 0bdb3ef6b35..46de866e6c9 100644 --- a/fhir-bucket/src/main/java/com/ibm/fhir/bucket/app/Main.java +++ b/fhir-bucket/src/main/java/com/ibm/fhir/bucket/app/Main.java @@ -35,7 +35,9 @@ import com.ibm.fhir.bucket.persistence.FhirBucketSchema; import com.ibm.fhir.bucket.persistence.MergeResourceTypes; import com.ibm.fhir.bucket.persistence.MergeResourceTypesPostgres; +import com.ibm.fhir.bucket.reindex.ClientDrivenReindexOperation; import com.ibm.fhir.bucket.reindex.DriveReindexOperation; +import com.ibm.fhir.bucket.reindex.ServerDrivenReindexOperation; import com.ibm.fhir.bucket.scanner.BundleBreakerResourceProcessor; import com.ibm.fhir.bucket.scanner.COSReader; import com.ibm.fhir.bucket.scanner.CosScanner; @@ -201,6 +203,9 @@ public class Main { // How many times should we cycle through the patient buffer before refilling private int bufferRecycleCount = 1; + // Whether to use client-side-driven reindex, which uses $retrieve-index and $reindex in parallel + private boolean clientSideDrivenReindex = false; + /** * Parse command line arguments * @param args @@ -417,6 +422,9 @@ public void parseArgs(String[] args) { throw new IllegalArgumentException("missing value for --reindex-concurrent-requests"); } break; + case "--reindex-client-side-driven": + this.clientSideDrivenReindex = true; + break; default: throw new IllegalArgumentException("Bad arg: " + arg); } @@ -862,7 +870,11 @@ protected void scanAndLoad() { // Optionally start the $reindex loops if (this.reindexTstampParam != null) { - this.driveReindexOperation = new DriveReindexOperation(fhirClient, reindexConcurrentRequests, reindexTstampParam, reindexResourceCount); + if (this.clientSideDrivenReindex) { + this.driveReindexOperation = new ClientDrivenReindexOperation(fhirClient, reindexConcurrentRequests, reindexTstampParam, reindexResourceCount); + } else { + this.driveReindexOperation = new ServerDrivenReindexOperation(fhirClient, reindexConcurrentRequests, reindexTstampParam, reindexResourceCount); + } this.driveReindexOperation.init(); } diff --git a/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/ClientDrivenReindexOperation.java b/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/ClientDrivenReindexOperation.java new file mode 100644 index 00000000000..812aef63388 --- /dev/null +++ b/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/ClientDrivenReindexOperation.java @@ -0,0 +1,365 @@ +/* + * (C) Copyright IBM Corp. 2021 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.bucket.reindex; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import org.apache.http.HttpStatus; + +import com.ibm.fhir.bucket.client.FHIRBucketClient; +import com.ibm.fhir.bucket.client.FHIRBucketClientUtil; +import com.ibm.fhir.bucket.client.FhirServerResponse; +import com.ibm.fhir.database.utils.api.DataAccessException; +import com.ibm.fhir.model.resource.Parameters; +import com.ibm.fhir.model.resource.Parameters.Builder; +import com.ibm.fhir.model.resource.Parameters.Parameter; +import com.ibm.fhir.model.resource.Resource; + +/** + * Drives the $reindex custom operation in parallel from the client side via use of the $retrieve-index operation. + * Processing continues until index IDs indicate that no resources remain to be reindexed. + */ +public class ClientDrivenReindexOperation extends DriveReindexOperation { + private static final Logger logger = Logger.getLogger(ClientDrivenReindexOperation.class.getName()); + + private static final String COUNT_PARAM = "_count"; + private static final String NOT_MODIFIED_AFTER_PARAM = "notModifiedAfter"; + private static final String AFTER_INDEX_ID_PARAM = "afterIndexId"; + private static final String INDEX_IDS_PARAM = "indexIds"; + private static final int MAX_RETRIEVE_COUNT = 1000; + private static final int OFFER_TIMEOUT_IN_SEC = 30; + private static final int POLL_TIMEOUT_IN_SEC = 5; + private static final String RETRIEVE_INDEX_URL = "$retrieve-index"; + private static final String REINDEX_URL = "$reindex"; + + // Flags to indicate if we should be running + private volatile boolean running = true; + private volatile boolean active = false; + private volatile boolean doneRetrieving = false; + + // FHIR client + private final FHIRBucketClient fhirClient; + + // Maximum number of concurrent requests + private final int maxConcurrentRequests; + + // Timestamp the reindex began + private final String reindexTimestamp; + + // Maximum number of resources reindexed per thread + private final int maxResourceCount; + + // Queue for holding index IDs of resources to reindex + private BlockingQueue blockingQueue; + + // Last index ID found by monitor thread + private String lastIndexId; + + // Monitor thread + private Thread monitorThread; + + // Thread pool for processing requests + private final ExecutorService pool = Executors.newCachedThreadPool(); + + // Number of threads currently running + private AtomicInteger currentlyRunning = new AtomicInteger(); + + /** + * Public constructor. + * @param fhirClient the FHIR client + * @param maxConcurrentRequests the number of threads to spin up + * @param reindexTimestamp timestamp the reindex began + * @param maxResourceCount resources processed per request per thread + */ + public ClientDrivenReindexOperation(FHIRBucketClient fhirClient, int maxConcurrentRequests, String reindexTimestamp, int maxResourceCount) { + this.fhirClient = fhirClient; + this.maxConcurrentRequests = maxConcurrentRequests; + this.reindexTimestamp = reindexTimestamp; + this.maxResourceCount = maxResourceCount; + this.blockingQueue = new LinkedBlockingDeque<>(MAX_RETRIEVE_COUNT + (maxResourceCount * maxConcurrentRequests)); + } + + /** + * Start the main loop + */ + @Override + public void init() { + if (!running) { + throw new IllegalStateException("Already shutdown"); + } + + // Initiate the monitorThread. This will fill the pool + // with worker threads, and monitor for completion or failure + logger.info("Starting monitor thread"); + this.monitorThread = new Thread(() -> monitorLoop()); + this.monitorThread.start(); + } + + /** + * Program is stopping, so tell the threads they can stop too + */ + @Override + public void signalStop() { + this.running = false; + + // make sure the pool doesn't start new work + pool.shutdown(); + } + + /** + * Wait until things are stopped + */ + @Override + public void waitForStop() { + if (this.running) { + signalStop(); + } + + try { + pool.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException x) { + logger.warning("Wait for pool shutdown interrupted"); + } + + try { + // break any sleep inside the monitorThread + this.monitorThread.interrupt(); + this.monitorThread.join(); + } catch (InterruptedException x) { + logger.warning("Interrupted waiting for monitorThread completion"); + } + } + + /** + * The main monitor loop. + */ + public void monitorLoop() { + while (this.running) { + if (!this.active) { + // See if we can make one successful request before filling the pool + // with hundreds of parallel requests + int currentThreadCount = this.currentlyRunning.get(); + if (currentThreadCount == 0) { + // Nothing currently running, so make one test call to verify things are working + doneRetrieving = doneRetrieving || !callRetrieveIndex(); + if (!blockingQueue.isEmpty()) { + // Should be OK now to fill the pool with workers + logger.info("Index IDs available for processing - filling worker pool"); + this.active = true; + } + + for (int i=0; i callReindexOperationInLoop()); + + // Slow down the ramp-up so we don't hit a new server with + // hundreds of requests in one go + safeSleep(1000); + } + + // Keep attempting to retrieve index IDs if we need to + while (this.active && this.running) { + doneRetrieving = doneRetrieving || !callRetrieveIndex(); + if (doneRetrieving) { + if (blockingQueue.isEmpty()) { + // Tell all the running threads they can stop now + logger.info("Nothing left to do, tell all the worker threads to exit"); + this.running = false; + } else { + // Worker threads are still processing, so sleep for a bit before we check again + safeSleep(1000); + } + } + } + + } else { + // Need to wait for all the existing threads to die off before we try to restart. This + // could take a while because we have a long tx timeout in Liberty. + logger.info("Waiting for current threads to complete before restart: " + currentThreadCount); + safeSleep(5000); + } + } else { // active + // Worker threads are active, so sleep for a bit before we check again + safeSleep(5000); + } + } + } + + /** + * Call the FHIR server $retrieve-index operation. + * @return true if the call was successful (200 OK) and index IDs were found, otherwise false + */ + private boolean callRetrieveIndex() { + boolean result = false; + + Builder builder = Parameters.builder(); + builder.parameter(Parameter.builder().name(str(COUNT_PARAM)).value(intValue(MAX_RETRIEVE_COUNT)).build()); + builder.parameter(Parameter.builder().name(str(NOT_MODIFIED_AFTER_PARAM)).value(str(reindexTimestamp)).build()); + if (lastIndexId != null) { + builder.parameter(Parameter.builder().name(str(AFTER_INDEX_ID_PARAM)).value(str(lastIndexId)).build()); + } + Parameters parameters = builder.build(); + String requestBody = FHIRBucketClientUtil.resourceToString(parameters); + + // Get index IDs of resources available to be reindexed + long start = System.nanoTime(); + FhirServerResponse response = fhirClient.post(RETRIEVE_INDEX_URL, requestBody); + long end = System.nanoTime(); + + double elapsed = (end - start) / 1e9; + logger.info(String.format("called $retrieve-index: %d %s [took %5.3f s]", response.getStatusCode(), response.getStatusMessage(), elapsed)); + + if (response.getStatusCode() == HttpStatus.SC_OK) { + Resource resource = response.getResource(); + if (resource != null) { + if (resource.is(Parameters.class)) { + // Check the result to see if we should keep running + result = extractIndexIds((Parameters) resource); + if (!result) { + logger.info("No more index IDs to retrieve"); + } + } else { + logger.severe("FHIR Server retrieve-index response is not an Parameters: " + response.getStatusCode() + " " + response.getStatusMessage()); + logger.severe("Actual response: " + FHIRBucketClientUtil.resourceToString(resource)); + } + } else { + // This would be a bit weird + logger.severe("FHIR Server retrieve-index operation returned no Parameters: " + response.getStatusCode() + " " + response.getStatusMessage()); + } + } else { + // Stop as soon as we hit an error + logger.severe("FHIR Server retrieve-index operation returned an error: " + response.getStatusCode() + " " + response.getStatusMessage()); + } + + return result; + } + + /** + * Extract the index IDs from the retrieve-index operation output. + * @param output the retrieve-index operation output + * @return true if index IDs were found (even if not successfully queued), otherwise if no index IDs found + */ + private boolean extractIndexIds(Parameters output) { + for (Parameter parameter : output.getParameter()) { + if (INDEX_IDS_PARAM.equals(parameter.getName().getValue())) { + String indexIdsString = parameter.getValue().as(com.ibm.fhir.model.type.String.class).getValue(); + if (indexIdsString != null) { + String[] indexIdsArray = indexIdsString.split(","); + for (String indexId : indexIdsArray) { + boolean queued = false; + while (!queued && this.running) { + try { + queued = blockingQueue.offer(indexId, OFFER_TIMEOUT_IN_SEC, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // NOP + } + + if (queued) { + lastIndexId = indexId; + } else { + logger.warning("Unable to add indexId '" + indexId + "' to queue"); + if (!this.active) { + logger.warning("Worker threads are not active yet, so try adding again later"); + return true; + } + } + } + } + return indexIdsArray.length > 0; + } + } + } + return false; + } + + /** + * Thread to repeatedly call the $reindex operation until done or error. + */ + private void callReindexOperationInLoop() { + while (this.running && this.active) { + String indexIds = getIndexIdsToReindex(); + if (!indexIds.isEmpty()) { + boolean ok = false; + try { + ok = callReindexOperation(indexIds); + } catch (DataAccessException x) { + // allow active be set to false. This will notify monitorLoop something is wrong. + // Probably all threads will encounter the same exception and monitorLoop will + // try to refill the pool if all threads exit. + logger.severe("DataAccessException caught when contacting FHIR server. FHIR client thread will exit." + x.toString() ); + } catch (IllegalStateException x) { + // Fail for this exception too. fhir-bucket fhir client suggests this exception results from config error. + // So probably this will be caught first time monitorLoop calls callOnce and not here. + logger.severe("IllegalStateException caught. FHIR client thread will exit." + x.toString() ); + } + if (!ok) { + // stop everything on the first failure + this.active = false; + } + } + } + + this.currentlyRunning.decrementAndGet(); + } + + /** + * Call the FHIR server $reindex operation. + * @param indexIds the index IDs to reindex + * @return true if the call was successful (200 OK), otherwise false + */ + private boolean callReindexOperation(String indexIds) { + boolean result = true; + + Builder builder = Parameters.builder(); + builder.parameter(Parameter.builder().name(str(INDEX_IDS_PARAM)).value(str(indexIds)).build()); + Parameters parameters = builder.build(); + String requestBody = FHIRBucketClientUtil.resourceToString(parameters); + + // Tell the FHIR Server to reindex the specified resources + long start = System.nanoTime(); + FhirServerResponse response = fhirClient.post(REINDEX_URL, requestBody); + long end = System.nanoTime(); + + double elapsed = (end - start) / 1e9; + logger.info(String.format("called $reindex: %d %s [took %5.3f s]", response.getStatusCode(), response.getStatusMessage(), elapsed)); + + if (response.getStatusCode() != HttpStatus.SC_OK) { + // Stop as soon as we hit an error + logger.severe("FHIR Server reindex operation returned an error: " + response.getStatusCode() + " " + response.getStatusMessage()); + result = false; + } + + return result; + } + + /** + * Get a comma-delimited string of the next index IDs to reindex. + * @return a comma-delimited string + */ + private String getIndexIdsToReindex() { + List drainToList = new ArrayList<>(maxResourceCount); + try { + String indexId = blockingQueue.poll(POLL_TIMEOUT_IN_SEC, TimeUnit.SECONDS); + if (indexId != null) { + drainToList.add(indexId); + blockingQueue.drainTo(drainToList, maxResourceCount - 1); + } + } catch (InterruptedException e) { + // NOP + } + return drainToList.stream().collect(Collectors.joining(",")); + } +} diff --git a/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/DriveReindexOperation.java b/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/DriveReindexOperation.java index bd7817016c0..9fcf0ba6787 100644 --- a/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/DriveReindexOperation.java +++ b/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/DriveReindexOperation.java @@ -1,158 +1,34 @@ /* - * (C) Copyright IBM Corp. 2020 + * (C) Copyright IBM Corp. 2021 * * SPDX-License-Identifier: Apache-2.0 */ package com.ibm.fhir.bucket.reindex; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.http.HttpStatus; - -import com.ibm.fhir.bucket.client.FHIRBucketClient; -import com.ibm.fhir.bucket.client.FHIRBucketClientUtil; -import com.ibm.fhir.bucket.client.FhirServerResponse; -import com.ibm.fhir.model.resource.OperationOutcome; -import com.ibm.fhir.model.resource.OperationOutcome.Issue; -import com.ibm.fhir.model.resource.Parameters; -import com.ibm.fhir.model.resource.Parameters.Parameter; -import com.ibm.fhir.model.resource.Resource; - -import com.ibm.fhir.database.utils.api.DataAccessException; - /** - * Drives the $reindex custom operation in parallel. Each thread keeps running - * until the OperationOutcome indicates that no work remains to be processed. + * Drives the $reindex custom operation in parallel. */ -public class DriveReindexOperation { - private static final Logger logger = Logger.getLogger(DriveReindexOperation.class.getName()); - - // the maximum number of requests we permit - private final int maxConcurrentRequests; - - // flag to indicate if we should be running - private volatile boolean running = true; - - private volatile boolean active = false; - - // count of how many threads are currently running - private AtomicInteger currentlyRunning = new AtomicInteger(); - - // thread pool for processing requests - private final ExecutorService pool = Executors.newCachedThreadPool(); - - private final FHIRBucketClient fhirClient; - - private final String url = "$reindex"; - - // The serialized Parameters resource sent with each POST - private final String requestBody; - - private Thread monitorThread; - - /** - * Public constructor - * @param client the FHIR client - * @param maxConcurrentRequests the number of threads to spin up - */ - public DriveReindexOperation(FHIRBucketClient fhirClient, int maxConcurrentRequests, String tstampParam, int resourceCountParam) { - this.fhirClient = fhirClient; - this.maxConcurrentRequests = maxConcurrentRequests; - - Parameters parameters = Parameters.builder() - .parameter(Parameter.builder().name(str("tstamp")).value(str(tstampParam)).build()) - .parameter(Parameter.builder().name(str("resourceCount")).value(intValue(resourceCountParam)).build()) - .build(); - - // Serialize into the requestBody string used by all the threads - this.requestBody = FHIRBucketClientUtil.resourceToString(parameters); - - if (logger.isLoggable(Level.FINE)) { - logger.fine("Reindex request parameters: " + requestBody); - } - } +public abstract class DriveReindexOperation { /** - * Syntactic sugar for providing string values - * @param str - * @return + * Start the main loop. */ - private static com.ibm.fhir.model.type.String str(String str) { - return com.ibm.fhir.model.type.String.of(str); - } - - private static com.ibm.fhir.model.type.Integer intValue(int val) { - return com.ibm.fhir.model.type.Integer.of(val); - } + public abstract void init(); /** - * Start the main loop + * Program is stopping, so tell the threads they can stop, too. */ - public void init() { - if (!running) { - throw new IllegalStateException("Already shutdown"); - } - - // Initiate the monitorThread. This will fill the pool - // with worker threads, and monitor for completion or failure - logger.info("Starting monitor thread"); - this.monitorThread = new Thread(() -> monitorLoop()); - this.monitorThread.start(); - } + public abstract void signalStop(); /** - * The main monitor loop. + * Wait until things are stopped. */ - public void monitorLoop() { - while (this.running) { - if (!this.active) { - // See if we can make one successful request before filling the pool - // with hundreds of parallel requests - int currentThreadCount = this.currentlyRunning.get(); - if (currentThreadCount == 0) { - // Nothing currently running, so make one test call to verify things are working - logger.info("monitor probe - checking reindex operation"); - if (callOnce() && this.running) { - // should be OK now to fill the pool with workers - logger.info("Test probe successful - filling worker pool"); - this.active = true; - - for (int i=0; i callReindexOperation()); - - // Slow down the ramp-up so we don't hit a new server with - // hundreds of requests in one go - safeSleep(1000); - } - } else if (this.running) { - // call failed, so wait a bit before we try again - safeSleep(5000); - } - } else { - // need to wait for all the existing threads to die off before we try to restart. This - // could take a while because we have a long tx timeout in Liberty. - logger.info("Waiting for current threads to complete before restart: " + currentThreadCount); - safeSleep(5000); - } - - } else { // active - // worker threads are active, so sleep for a bit before we check again - safeSleep(5000); - } - } - } + public abstract void waitForStop(); /** - * Sleep for the given number of ms, or until interrupted - * @param ms + * Sleep for the given number of milliseconds, or until interrupted. + * @param ms milliseconds */ protected void safeSleep(long ms) { try { @@ -163,118 +39,20 @@ protected void safeSleep(long ms) { } /** - * Program is stopping, so tell the threads they can stop too + * Wrapper for strings. + * @param str the string + * @return the string */ - public void signalStop() { - this.running = false; - - // make sure the pool doesn't start new work - pool.shutdown(); - } - - /** - * Wait until things are stopped - */ - public void waitForStop() { - if (this.running) { - signalStop(); - } - - try { - pool.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException x) { - logger.warning("Wait for pool shutdown interrupted"); - } - - try { - // break any sleep inside the monitorThread - this.monitorThread.interrupt(); - this.monitorThread.join(); - } catch (InterruptedException x) { - logger.warning("Interrupted waiting for monitorThread completion"); - } - } - - /** - * Thread to repeatedly call the $reindex operation until the response - * indicates all the work is complete. - */ - private void callReindexOperation() { - while (this.running && this.active) { - boolean ok = false; - try { - ok = callOnce(); - } catch (DataAccessException x) { - // allow active be set to false. This will notify monitorLoop something is wrong. - // Probably all threads will encounter the same exception and monitorLoop will - // try to refill the pool if all threads exit. - logger.severe("DataAccessException caught when contacting FHIR server. FHIR client thread will exit." + x.toString() ); - } catch (IllegalStateException x) { - // Fail for this exception too. fhir-bucket fhir client suggests this exception results from config error. - // So probably this will be caught first time monitorLoop calls callOnce and not here. - logger.severe("IllegalStateException caught. FHIR client thread will exit." + x.toString() ); - } - if (!ok) { - // stop everything on the first failure - this.active = false; - } - } - - this.currentlyRunning.decrementAndGet(); - } - - /** - * Make one call to the FHIR server $reindex operation - * @return true if the call was successful (200 OK) - */ - private boolean callOnce() { - boolean result = false; - - // tell the FHIR Server to reindex a number of resources - long start = System.nanoTime(); - FhirServerResponse response = fhirClient.post(url, requestBody); - long end = System.nanoTime(); - - double elapsed = (end - start) / 1e9; - logger.info(String.format("called $reindex: %d %s [took %5.3f s]", response.getStatusCode(), response.getStatusMessage(), elapsed)); - - if (response.getStatusCode() == HttpStatus.SC_OK) { - Resource resource = response.getResource(); - if (resource != null) { - if (resource.is(OperationOutcome.class)) { - // check the result to see if we should stop running - checkResult((OperationOutcome)resource); - result = true; - } else { - logger.severe("FHIR Server reindex response is not an OperationOutcome: " + response.getStatusCode() + " " + response.getStatusMessage()); - logger.severe("Actual response: " + FHIRBucketClientUtil.resourceToString(resource)); - } - } else { - // this would be a bit weird - logger.severe("FHIR Server reindex operation returned no OperationOutcome: " + response.getStatusCode() + " " + response.getStatusMessage()); - } - } else { - // Stop as soon as we hit an error - logger.severe("FHIR Server reindex operation returned an error: " + response.getStatusCode() + " " + response.getStatusMessage()); - } - - return result; + protected static com.ibm.fhir.model.type.String str(String str) { + return com.ibm.fhir.model.type.String.of(str); } /** - * Check the result to see if the server is telling us it's done - * @param result + * Wrapper for integers. + * @param val the integer + * @return the integer */ - private void checkResult(OperationOutcome result) { - List issues = result.getIssue(); - if (issues.size() == 1) { - Issue one = issues.get(0); - if ("Reindex complete".equals(one.getDiagnostics().getValue())) { - logger.info("Reindex - all done"); - - // tell all the running threads they can stop now - this.running = false; - } - } + protected static com.ibm.fhir.model.type.Integer intValue(int val) { + return com.ibm.fhir.model.type.Integer.of(val); } } diff --git a/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/ServerDrivenReindexOperation.java b/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/ServerDrivenReindexOperation.java new file mode 100644 index 00000000000..70498d59cab --- /dev/null +++ b/fhir-bucket/src/main/java/com/ibm/fhir/bucket/reindex/ServerDrivenReindexOperation.java @@ -0,0 +1,270 @@ +/* + * (C) Copyright IBM Corp. 2020, 2021 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.bucket.reindex; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.http.HttpStatus; + +import com.ibm.fhir.bucket.client.FHIRBucketClient; +import com.ibm.fhir.bucket.client.FHIRBucketClientUtil; +import com.ibm.fhir.bucket.client.FhirServerResponse; +import com.ibm.fhir.database.utils.api.DataAccessException; +import com.ibm.fhir.model.resource.OperationOutcome; +import com.ibm.fhir.model.resource.OperationOutcome.Issue; +import com.ibm.fhir.model.resource.Parameters; +import com.ibm.fhir.model.resource.Parameters.Parameter; +import com.ibm.fhir.model.resource.Resource; + +/** + * Drives the $reindex custom operation in parallel. Each thread keeps running + * until the OperationOutcome indicates that no work remains to be processed. + */ +public class ServerDrivenReindexOperation extends DriveReindexOperation { + private static final Logger logger = Logger.getLogger(ServerDrivenReindexOperation.class.getName()); + + // the maximum number of requests we permit + private final int maxConcurrentRequests; + + // flag to indicate if we should be running + private volatile boolean running = true; + + private volatile boolean active = false; + + // count of how many threads are currently running + private AtomicInteger currentlyRunning = new AtomicInteger(); + + // thread pool for processing requests + private final ExecutorService pool = Executors.newCachedThreadPool(); + + private final FHIRBucketClient fhirClient; + + private final String url = "$reindex"; + + // The serialized Parameters resource sent with each POST + private final String requestBody; + + private Thread monitorThread; + + /** + * Public constructor + * @param client the FHIR client + * @param maxConcurrentRequests the number of threads to spin up + */ + public ServerDrivenReindexOperation(FHIRBucketClient fhirClient, int maxConcurrentRequests, String tstampParam, int resourceCountParam) { + this.fhirClient = fhirClient; + this.maxConcurrentRequests = maxConcurrentRequests; + + Parameters parameters = Parameters.builder() + .parameter(Parameter.builder().name(str("tstamp")).value(str(tstampParam)).build()) + .parameter(Parameter.builder().name(str("resourceCount")).value(intValue(resourceCountParam)).build()) + .build(); + + // Serialize into the requestBody string used by all the threads + this.requestBody = FHIRBucketClientUtil.resourceToString(parameters); + + if (logger.isLoggable(Level.FINE)) { + logger.fine("Reindex request parameters: " + requestBody); + } + } + + /** + * Start the main loop + */ + @Override + public void init() { + if (!running) { + throw new IllegalStateException("Already shutdown"); + } + + // Initiate the monitorThread. This will fill the pool + // with worker threads, and monitor for completion or failure + logger.info("Starting monitor thread"); + this.monitorThread = new Thread(() -> monitorLoop()); + this.monitorThread.start(); + } + + /** + * The main monitor loop. + */ + public void monitorLoop() { + while (this.running) { + if (!this.active) { + // See if we can make one successful request before filling the pool + // with hundreds of parallel requests + int currentThreadCount = this.currentlyRunning.get(); + if (currentThreadCount == 0) { + // Nothing currently running, so make one test call to verify things are working + logger.info("monitor probe - checking reindex operation"); + if (callOnce() && this.running) { + // should be OK now to fill the pool with workers + logger.info("Test probe successful - filling worker pool"); + this.active = true; + + for (int i=0; i callReindexOperation()); + + // Slow down the ramp-up so we don't hit a new server with + // hundreds of requests in one go + safeSleep(1000); + } + } else if (this.running) { + // call failed, so wait a bit before we try again + safeSleep(5000); + } + } else { + // need to wait for all the existing threads to die off before we try to restart. This + // could take a while because we have a long tx timeout in Liberty. + logger.info("Waiting for current threads to complete before restart: " + currentThreadCount); + safeSleep(5000); + } + + } else { // active + // worker threads are active, so sleep for a bit before we check again + safeSleep(5000); + } + } + } + + /** + * Sleep for the given number of ms, or until interrupted + * @param ms + */ + @Override + protected void safeSleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException x) { + // NOP + } + } + + /** + * Program is stopping, so tell the threads they can stop too + */ + @Override + public void signalStop() { + this.running = false; + + // make sure the pool doesn't start new work + pool.shutdown(); + } + + /** + * Wait until things are stopped + */ + @Override + public void waitForStop() { + if (this.running) { + signalStop(); + } + + try { + pool.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException x) { + logger.warning("Wait for pool shutdown interrupted"); + } + + try { + // break any sleep inside the monitorThread + this.monitorThread.interrupt(); + this.monitorThread.join(); + } catch (InterruptedException x) { + logger.warning("Interrupted waiting for monitorThread completion"); + } + } + + /** + * Thread to repeatedly call the $reindex operation until the response + * indicates all the work is complete. + */ + private void callReindexOperation() { + while (this.running && this.active) { + boolean ok = false; + try { + ok = callOnce(); + } catch (DataAccessException x) { + // allow active be set to false. This will notify monitorLoop something is wrong. + // Probably all threads will encounter the same exception and monitorLoop will + // try to refill the pool if all threads exit. + logger.severe("DataAccessException caught when contacting FHIR server. FHIR client thread will exit." + x.toString() ); + } catch (IllegalStateException x) { + // Fail for this exception too. fhir-bucket fhir client suggests this exception results from config error. + // So probably this will be caught first time monitorLoop calls callOnce and not here. + logger.severe("IllegalStateException caught. FHIR client thread will exit." + x.toString() ); + } + if (!ok) { + // stop everything on the first failure + this.active = false; + } + } + + this.currentlyRunning.decrementAndGet(); + } + + /** + * Make one call to the FHIR server $reindex operation + * @return true if the call was successful (200 OK) + */ + private boolean callOnce() { + boolean result = false; + + // tell the FHIR Server to reindex a number of resources + long start = System.nanoTime(); + FhirServerResponse response = fhirClient.post(url, requestBody); + long end = System.nanoTime(); + + double elapsed = (end - start) / 1e9; + logger.info(String.format("called $reindex: %d %s [took %5.3f s]", response.getStatusCode(), response.getStatusMessage(), elapsed)); + + if (response.getStatusCode() == HttpStatus.SC_OK) { + Resource resource = response.getResource(); + if (resource != null) { + if (resource.is(OperationOutcome.class)) { + // check the result to see if we should stop running + checkResult((OperationOutcome)resource); + result = true; + } else { + logger.severe("FHIR Server reindex response is not an OperationOutcome: " + response.getStatusCode() + " " + response.getStatusMessage()); + logger.severe("Actual response: " + FHIRBucketClientUtil.resourceToString(resource)); + } + } else { + // this would be a bit weird + logger.severe("FHIR Server reindex operation returned no OperationOutcome: " + response.getStatusCode() + " " + response.getStatusMessage()); + } + } else { + // Stop as soon as we hit an error + logger.severe("FHIR Server reindex operation returned an error: " + response.getStatusCode() + " " + response.getStatusMessage()); + } + + return result; + } + + /** + * Check the result to see if the server is telling us it's done + * @param result + */ + private void checkResult(OperationOutcome result) { + List issues = result.getIssue(); + if (issues.size() == 1) { + Issue one = issues.get(0); + if ("Reindex complete".equals(one.getDiagnostics().getValue())) { + logger.info("Reindex - all done"); + + // tell all the running threads they can stop now + this.running = false; + } + } + } +} diff --git a/fhir-client/src/main/java/com/ibm/fhir/client/impl/FHIRClientImpl.java b/fhir-client/src/main/java/com/ibm/fhir/client/impl/FHIRClientImpl.java index b3edb44d63d..543c777e05d 100644 --- a/fhir-client/src/main/java/com/ibm/fhir/client/impl/FHIRClientImpl.java +++ b/fhir-client/src/main/java/com/ibm/fhir/client/impl/FHIRClientImpl.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Properties; -import jakarta.json.JsonObject; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLSession; import javax.ws.rs.RuntimeType; @@ -52,6 +51,8 @@ import com.ibm.fhir.provider.FHIRJsonProvider; import com.ibm.fhir.provider.FHIRProvider; +import jakarta.json.JsonObject; + /** * Provides an implementation of the FHIRClient interface, which can be used as a high-level API for invoking FHIR REST * APIs. @@ -528,7 +529,8 @@ public FHIRResponse validate(Resource resource, FHIRRequestHeader... headers) th if (resource == null) { throw new IllegalArgumentException("The 'resource' argument is required but was null."); } - return _validate(resource, headers); + String resourceType = resource.getClass().getSimpleName(); + return _validate(resource, resourceType, headers); } @Override @@ -536,13 +538,14 @@ public FHIRResponse validate(JsonObject resource, FHIRRequestHeader... headers) if (resource == null) { throw new IllegalArgumentException("The 'resource' argument is required but was null."); } - return _validate(resource, headers); + String resourceType = resource.getString("resourceType"); + return _validate(resource, resourceType, headers); } - private FHIRResponse _validate(T resource, FHIRRequestHeader...headers) throws Exception { + private FHIRResponse _validate(T resource, String resourceType, FHIRRequestHeader...headers) throws Exception { WebTarget endpoint = getWebTarget(); Entity entity = Entity.entity(resource, getDefaultMimeType()); - Invocation.Builder builder = endpoint.path("Resource").path("$validate").request(getDefaultMimeType()); + Invocation.Builder builder = endpoint.path(resourceType).path("$validate").request(getDefaultMimeType()); builder = addRequestHeaders(builder, headers); Response response = builder.post(entity); return new FHIRResponseImpl(response); diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/ReindexResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/ReindexResourceDAO.java index af48042e009..3d6df64c5f6 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/ReindexResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/ReindexResourceDAO.java @@ -48,14 +48,20 @@ public class ReindexResourceDAO extends ResourceDAOImpl { private final ParameterDAO parameterDao; - // Note that currently the global logical_resources table does not carry - // the is_deleted flag. Until it does, the queries will return deleted - // resources, which can be skipped for reindex. (issue-2055) + private static final String PICK_SINGLE_LOGICAL_RESOURCE = "" + + " SELECT lr.logical_resource_id, lr.resource_type_id, lr.logical_id, lr.reindex_txid " + + " FROM logical_resources lr " + + " WHERE lr.logical_resource_id = ? " + + " AND lr.is_deleted = 'N' " + + " AND lr.reindex_tstamp < ? " + ; + private static final String PICK_SINGLE_RESOURCE = "" + " SELECT lr.logical_resource_id, lr.resource_type_id, lr.logical_id, lr.reindex_txid " + " FROM logical_resources lr " + " WHERE lr.resource_type_id = ? " + " AND lr.logical_id = ? " + + " AND lr.is_deleted = 'N' " + " AND lr.reindex_tstamp < ? " ; @@ -63,6 +69,7 @@ public class ReindexResourceDAO extends ResourceDAOImpl { + " SELECT lr.logical_resource_id, lr.resource_type_id, lr.logical_id, lr.reindex_txid " + " FROM logical_resources lr " + " WHERE lr.resource_type_id = ? " + + " AND lr.is_deleted = 'N' " + " AND lr.reindex_tstamp < ? " + "OFFSET ? ROWS FETCH FIRST 1 ROWS ONLY " ; @@ -70,7 +77,8 @@ public class ReindexResourceDAO extends ResourceDAOImpl { private static final String PICK_ANY_RESOURCE = "" + " SELECT lr.logical_resource_id, lr.resource_type_id, lr.logical_id, lr.reindex_txid " + " FROM logical_resources lr " - + " WHERE lr.reindex_tstamp < ? " + + " WHERE lr.is_deleted = 'N' " + + " AND lr.reindex_tstamp < ? " + "OFFSET ? ROWS FETCH FIRST 1 ROWS ONLY " ; @@ -112,15 +120,56 @@ public ReindexResourceDAO(Connection connection, IDatabaseTranslator translator, * Getter for the translator currently held by this DAO * @return */ + @Override protected IDatabaseTranslator getTranslator() { return this.translator; } /** - * Pick the next resource to process resource and lock it. Specializations for different - * databases may use different techniques to optimize locking/concurrency control - * @param reindexTstamp - * @return + * Pick a specific resource to process by logicalResourceId (primary key). + * Since the logicalResourceId is specified, we avoid updating the record as the caller of $reindex operation + * is passing in an explicit list of resources, so no need to lock for the purpose of picking a random resource. + * This can improve performance (especially with PostgreSQL by avoiding the generation of tombstones). + * @param reindexTstamp only get resource with a reindex_tstamp less than this + * @param logicalResourceId the logical resource ID (primary key) of a specific resource + * @return the resource record, or null when the resource is not found + * @throws Exception + */ + protected ResourceIndexRecord getResource(Instant reindexTstamp, Long logicalResourceId) throws Exception { + ResourceIndexRecord result = null; + + // no need to close + Connection connection = getConnection(); + IDatabaseTranslator translator = getTranslator(); + + final String select = PICK_SINGLE_LOGICAL_RESOURCE; + + try (PreparedStatement stmt = connection.prepareStatement(select)) { + if (logicalResourceId != null) { + // specific resource by logical resource ID (primary key) + stmt.setLong(1, logicalResourceId); + stmt.setTimestamp(2, Timestamp.from(reindexTstamp)); + } + ResultSet rs = stmt.executeQuery(); + if (rs.next()) { + result = new ResourceIndexRecord(rs.getLong(1), rs.getInt(2), rs.getString(3), rs.getLong(4)); + } + } catch (SQLException x) { + logger.log(Level.SEVERE, select, x); + throw translator.translate(x); + } + + return result; + } + + /** + * Pick the next resource to process, then also lock it. Specializations for different databases may use + * different techniques to optimize locking/concurrency control. + * @param random used to generate a random number + * @param reindexTstamp only get resource with a reindex_tstamp less than this + * @param resourceTypeId the resource type ID of a specific resource type, or null + * @param logicalId the resource ID of a specific resource, or null + * @return the resource record, or null when there is nothing left to do * @throws Exception */ protected ResourceIndexRecord getNextResource(SecureRandom random, Instant reindexTstamp, Integer resourceTypeId, String logicalId) throws Exception { @@ -218,19 +267,27 @@ protected ResourceIndexRecord getNextResource(SecureRandom random, Instant reind * Get the resource record we want to reindex. This might take a few attempts, because * there could be hundreds of threads all trying to do the same thing, and we may see * collisions which will cause the FOR UPDATE to block, then return no rows. - * @param reindexTstamp - * @param resourceCount + * @param reindexTstamp only get resource with an index_tstamp less than this + * @param logicalResourceId logical resource ID (primary key) of resource to reindex, or null + * @param resourceTypeId the resource type ID of a specific resource type, or null; + * this parameter is ignored if the logicalResourceId parameter value is non-null + * @param logicalId the resource ID of a specific resource, or null; + * this parameter is ignored if the logicalResourceId parameter value is non-null * @return the resource record, or null when there is nothing left to do * @throws Exception */ - public ResourceIndexRecord getResourceToReindex(Instant reindexTstamp, Integer resourceTypeId, String logicalId) throws Exception { + public ResourceIndexRecord getResourceToReindex(Instant reindexTstamp, Long logicalResourceId, Integer resourceTypeId, String logicalId) throws Exception { ResourceIndexRecord result = null; // no need to close Connection connection = getConnection(); // Get a resource which needs to be reindexed - result = getNextResource(RANDOM, reindexTstamp, resourceTypeId, logicalId); + if (logicalResourceId != null) { + result = getResource(reindexTstamp, logicalResourceId); + } else { + result = getNextResource(RANDOM, reindexTstamp, resourceTypeId, logicalId); + } if (result != null) { @@ -249,6 +306,10 @@ public ResourceIndexRecord getResourceToReindex(Instant reindexTstamp, Integer r ResultSet rs = stmt.executeQuery(); if (rs.next()) { result.setResourceType(rs.getString(1)); + } else if (logicalResourceId != null) { + // When logicalResourceId is specified, the resource is not locked, so it could disappear + logger.fine("Logical resource no longer exists: logical_resource_id=" + result.getLogicalResourceId()); + result = null; } else { // Can't really happen, because the resource is selected for update, so it can't disappear logger.severe("Logical resource no longer exists: logical_resource_id=" + result.getLogicalResourceId()); diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/RetrieveIndexDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/RetrieveIndexDAO.java new file mode 100644 index 00000000000..7c83f85af62 --- /dev/null +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/RetrieveIndexDAO.java @@ -0,0 +1,129 @@ +/* + * (C) Copyright IBM Corp. 2021 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.persistence.jdbc.dao.impl; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.ibm.fhir.database.utils.api.IDatabaseTranslator; +import com.ibm.fhir.persistence.exception.FHIRPersistenceException; +import com.ibm.fhir.persistence.jdbc.FHIRPersistenceJDBCCache; + +/** + * Simple DAO to retrieve index IDs (i.e. logical resource IDs) from the LOGICAL_RESOURCES table. + */ +public class RetrieveIndexDAO { + private static final Logger logger = Logger.getLogger(RetrieveIndexDAO.class.getName()); + private static final Calendar UTC_CALENDAR = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + + private final IDatabaseTranslator translator; + private final String schemaName; + private final String resourceTypeName; + private final int count; + private final Long afterLogicalResourceId; + private final Instant notModifiedAfter; + private final FHIRPersistenceJDBCCache cache; + + /** + * Public constructor. + * @param tx translator + * @param schemaName schema name + * @param resourceTypeName the resource type name of index IDs to return, or null + * @param count maximum number of index IDs to return + * @param notModifiedAfter only return resources last updated at or before the specified instant, or null + * @param afterIndexId only return index IDs after this index ID, or null + * @param cache the cache + */ + public RetrieveIndexDAO(IDatabaseTranslator tx, String schemaName, String resourceTypeName, int count, Instant notModifiedAfter, Long afterIndexId, FHIRPersistenceJDBCCache cache) { + this.translator = tx; + this.schemaName = schemaName; + this.resourceTypeName = resourceTypeName; + this.count = count; + this.afterLogicalResourceId = afterIndexId; + this.notModifiedAfter = notModifiedAfter; + this.cache = cache; + } + + /** + * Run the DAO command on the database connection. + * @param c connection + * @return list of logical resource IDs + * @throws FHIRPersistenceException + */ + public List run(Connection c) throws FHIRPersistenceException { + List logicalResourceIds = new ArrayList<>(); + + // Attempt to get resource type ID from cache, but since it is possible it doesn't find it in the cache, + // since the cache is not loaded synchonously, fall back to using resource type name, if provided + Integer resourceTypeId = resourceTypeName != null ? cache.getResourceTypeCache().getId(resourceTypeName) : null; + + StringBuilder query = new StringBuilder(); + query.append(" SELECT lr.logical_resource_id"); + query.append(" FROM "); + if (resourceTypeId == null && resourceTypeName != null) { + query.append(schemaName).append(".resource_types rt, "); + } + query.append(schemaName).append(".logical_resources lr "); + query.append(" WHERE lr.is_deleted = 'N' "); + if (resourceTypeId != null) { + query.append(" AND lr.resource_type_id = ? "); + } else if (resourceTypeName != null) { + query.append(" AND rt.resource_type = ? "); + query.append(" AND rt.resource_type_id = lr.resource_type_id "); + } + if (notModifiedAfter != null) { + query.append(" AND lr.last_updated <= ? "); + } + if (afterLogicalResourceId != null) { + query.append(" AND lr.logical_resource_id > ? "); + } + + query.append(" ORDER BY lr.logical_resource_id "); + query.append(translator.limit(Integer.toString(count))); + + final String SQL = query.toString(); + + if (logger.isLoggable(Level.FINE)) { + logger.fine("RETRIEVE LOGICAL RESOURCE IDS: " + SQL + "; [" + resourceTypeName + ", " + notModifiedAfter + ", " + afterLogicalResourceId + "]"); + } + + try (PreparedStatement ps = c.prepareStatement(SQL)) { + int i = 1; + if (resourceTypeId != null) { + ps.setInt(i++, resourceTypeId); + } else if (resourceTypeName != null) { + ps.setString(i++, resourceTypeName); + } + if (notModifiedAfter != null) { + ps.setTimestamp(i++, Timestamp.from(notModifiedAfter), UTC_CALENDAR); + } + if (afterLogicalResourceId != null) { + ps.setLong(i++, afterLogicalResourceId); + } + + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + logicalResourceIds.add(rs.getLong(1)); + } + } catch (SQLException x) { + logger.log(Level.SEVERE, "Retrieve logical resource IDs query failed: " + SQL, x); + throw new FHIRPersistenceException("Retrieve index failed"); + } + + return logicalResourceIds; + } +} diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java index 43e9aa2f138..5484b7361ef 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java @@ -129,6 +129,7 @@ import com.ibm.fhir.persistence.jdbc.dao.impl.ResourceProfileRec; import com.ibm.fhir.persistence.jdbc.dao.impl.ResourceReferenceDAO; import com.ibm.fhir.persistence.jdbc.dao.impl.ResourceTokenValueRec; +import com.ibm.fhir.persistence.jdbc.dao.impl.RetrieveIndexDAO; import com.ibm.fhir.persistence.jdbc.dao.impl.TransactionDataImpl; import com.ibm.fhir.persistence.jdbc.dto.CompositeParmVal; import com.ibm.fhir.persistence.jdbc.dto.DateParmVal; @@ -2453,8 +2454,8 @@ public boolean isReindexSupported() { } @Override - public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder operationOutcomeResult, java.time.Instant tstamp, String resourceLogicalId) - throws FHIRPersistenceException { + public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder operationOutcomeResult, java.time.Instant tstamp, List indexIds, + String resourceLogicalId) throws FHIRPersistenceException { final String METHODNAME = "reindex"; log.entering(CLASSNAME, METHODNAME); @@ -2494,13 +2495,14 @@ public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oper // Look up the optional resourceTypeId for the given resourceType parameter resourceTypeId = cache.getResourceTypeCache().getId(resourceType); } + int indexIdsProcessed = 0; - // Need to skip over deleted resources so we have to loop until we find something not - // deleted, or reach the end. + // If list of indexIds was specified, loop over those. Otherwise, since we skip over + // deleted resources we have to loop until we find something not deleted, or reach the end. ResourceIndexRecord rir; do { long start = System.nanoTime(); - rir = reindexDAO.getResourceToReindex(tstamp, resourceTypeId, logicalId); + rir = reindexDAO.getResourceToReindex(tstamp, indexIds != null ? indexIds.get(indexIdsProcessed++) : null, resourceTypeId, logicalId); long end = System.nanoTime(); if (log.isLoggable(Level.FINER)) { @@ -2523,7 +2525,7 @@ public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oper // result is only 0 if getResourceToReindex doesn't give us anything because this indicates // there's nothing left to do - result = 1; + result++; } else { // Skip this particular resource because it has been deleted if (log.isLoggable(Level.FINE)) { @@ -2532,7 +2534,7 @@ public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oper rir.setDeleted(true); } } - } while (rir != null && rir.isDeleted()); + } while ((indexIds != null && indexIdsProcessed < indexIds.size()) || (indexIds == null && rir != null && rir.isDeleted())); } catch(FHIRPersistenceFKVException e) { getTransaction().setRollbackOnly(); @@ -2763,4 +2765,24 @@ private boolean allSearchParmsAreGlobal(List queryParms) { return true; } + @Override + public List retrieveIndex(int count, java.time.Instant notModifiedAfter, Long afterIndexId, String resourceTypeName) throws FHIRPersistenceException { + final String METHODNAME = "retrieveIndex"; + log.entering(CLASSNAME, METHODNAME); + + try (Connection connection = openConnection()) { + doCachePrefill(connection); + IDatabaseTranslator translator = FHIRResourceDAOFactory.getTranslatorForFlavor(connectionStrategy.getFlavor()); + RetrieveIndexDAO dao = new RetrieveIndexDAO(translator, schemaNameSupplier.getSchemaForRequestContext(connection), resourceTypeName, count, notModifiedAfter, afterIndexId, this.cache); + return dao.run(connection); + } catch(FHIRPersistenceException e) { + throw e; + } catch(Throwable e) { + FHIRPersistenceException fx = new FHIRPersistenceException("Unexpected error while retrieving logical resource IDs."); + log.log(Level.SEVERE, fx.getMessage(), e); + throw fx; + } finally { + log.exiting(CLASSNAME, METHODNAME); + } + } } \ No newline at end of file diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresReindexResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresReindexResourceDAO.java index 2a6bcaa962a..96eff1ce80d 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresReindexResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresReindexResourceDAO.java @@ -33,9 +33,6 @@ public class PostgresReindexResourceDAO extends ReindexResourceDAO { private static final Logger logger = Logger.getLogger(PostgresReindexResourceDAO.class.getName()); - // Note that currently the global logical_resources table does not carry - // the is_deleted flag. Until it does, the queries will return deleted - // resources, which can be skipped for reindex. (issue-2055) private static final String PICK_SINGLE_RESOURCE = "" + " UPDATE logical_resources " + " SET reindex_tstamp = ?," @@ -45,6 +42,7 @@ public class PostgresReindexResourceDAO extends ReindexResourceDAO { + " FROM logical_resources lr " + " WHERE lr.resource_type_id = ? " + " AND lr.logical_id = ? " + + " AND lr.is_deleted = 'N' " + " AND lr.reindex_tstamp < ? " + " ORDER BY lr.reindex_tstamp " + " FOR UPDATE SKIP LOCKED LIMIT 1) " @@ -59,6 +57,7 @@ public class PostgresReindexResourceDAO extends ReindexResourceDAO { + " SELECT lr.logical_resource_id " + " FROM logical_resources lr " + " WHERE lr.resource_type_id = ? " + + " AND lr.is_deleted = 'N' " + " AND lr.reindex_tstamp < ? " + " ORDER BY lr.reindex_tstamp " + " FOR UPDATE SKIP LOCKED LIMIT 1) " @@ -72,7 +71,8 @@ public class PostgresReindexResourceDAO extends ReindexResourceDAO { + " WHERE logical_resource_id = ( " + " SELECT lr.logical_resource_id " + " FROM logical_resources lr " - + " WHERE lr.reindex_tstamp < ? " + + " WHERE lr.is_deleted = 'N' " + + " AND lr.reindex_tstamp < ? " + " ORDER BY lr.reindex_tstamp " + " FOR UPDATE SKIP LOCKED LIMIT 1) " + "RETURNING logical_resource_id, resource_type_id, logical_id, reindex_txid " diff --git a/fhir-persistence-scout/src/main/java/com/ibm/fhir/persistence/scout/FHIRPersistenceScoutImpl.java b/fhir-persistence-scout/src/main/java/com/ibm/fhir/persistence/scout/FHIRPersistenceScoutImpl.java index 59b69a074c4..3b441e36681 100644 --- a/fhir-persistence-scout/src/main/java/com/ibm/fhir/persistence/scout/FHIRPersistenceScoutImpl.java +++ b/fhir-persistence-scout/src/main/java/com/ibm/fhir/persistence/scout/FHIRPersistenceScoutImpl.java @@ -1,5 +1,5 @@ /** - * (C) Copyright IBM Corp. 2020,2021 + * (C) Copyright IBM Corp. 2020, 2021 * * SPDX-License-Identifier: Apache-2.0 */ @@ -456,7 +456,8 @@ public String generateResourceId() { } @Override - public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, java.time.Instant tstamp, String resourceLogicalId) throws FHIRPersistenceException { + public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, java.time.Instant tstamp, List indexIds, + String resourceLogicalId) throws FHIRPersistenceException { return 0; } @@ -472,4 +473,9 @@ public List changes(int resourceCount, java.time.Instan throws FHIRPersistenceException { throw new FHIRPersistenceNotSupportedException("API not supported at this time"); } + + @Override + public List retrieveIndex(int count, java.time.Instant notModifiedAfter, Long afterIndexId, String resourceTypeName) throws FHIRPersistenceException { + throw new FHIRPersistenceNotSupportedException("API not supported at this time"); + } } \ No newline at end of file diff --git a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/FHIRPersistence.java b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/FHIRPersistence.java index 02672f2e31d..7414289d948 100644 --- a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/FHIRPersistence.java +++ b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/FHIRPersistence.java @@ -152,21 +152,20 @@ default boolean isReindexSupported() { } /** - * Initiates reindexing for resources not yet processed. Limits the number of resources - * processed to resourceCount. The number processed is returned in the OperationOutcome. + * Initiates reindexing for either a specified list of index IDs, + * or a randomly chosen resource. The number of resources processed is returned. * This can be used by a controller to continue processing until everything is complete. - * Increasing resourceCount reduces the number of calls required to reindex an entire - * database, but larger values risk exceeding the transaction timeout. Values around 100 - * are a good starting point for most systems. * @param context the FHIRPersistenceContext instance associated with the current request. - * @param operationOutcomeResult accumulate issues in this {@link Builder} - * @param tstamp reindex any resources with an index_tstamp less than this. - * @param resourceLogicalId optional resourceType/logicalId value to reindex a specific resource - * @return count of the number of resources reindexed by this call (0 or 1) + * @param operationOutcomeResult accumulate issues in this {@link OperationOutcome.Builder} + * @param tstamp only reindex resources with a reindex_tstamp less than this + * @param indexIds list of index IDs of resources to reindex, or null + * @param resourceLogicalId resourceType/logicalId value of a specific resource to reindex, or null; + * this parameter is ignored if the indexIds parameter value is non-null + * @return count of the number of resources reindexed by this call * @throws FHIRPersistenceException */ - int reindex(FHIRPersistenceContext context, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, String resourceLogicalId) - throws FHIRPersistenceException; + int reindex(FHIRPersistenceContext context, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, List indexIds, + String resourceLogicalId) throws FHIRPersistenceException; /** * Special function for high speed export of resource payloads. The process @@ -213,4 +212,16 @@ default boolean isChangesSupported() { default ResourceEraseRecord erase(EraseDTO eraseDto) throws FHIRPersistenceException { throw new FHIRPersistenceException("Erase is not supported"); } + + /** + * Retrieves a list of index IDs available for reindexing. + * @param count the maximum nuber of index IDs to retrieve + * @param notModifiedAfter only retrieve index IDs for resources not last updated after the specified timestamp + * @param afterIndexId retrieve index IDs starting after this specified index ID, or null to start with first index ID + * @param resourceTypeName the resource type of index IDs to return, or null + * @return list of index IDs available for reindexing + * @throws FHIRPersistenceException + */ + List retrieveIndex(int count, Instant notModifiedAfter, Long afterIndexId, String resourceTypeName) throws FHIRPersistenceException; + } \ No newline at end of file diff --git a/fhir-persistence/src/test/java/com/ibm/fhir/persistence/test/MockPersistenceImpl.java b/fhir-persistence/src/test/java/com/ibm/fhir/persistence/test/MockPersistenceImpl.java index 4e42fe09cad..b522c28d181 100644 --- a/fhir-persistence/src/test/java/com/ibm/fhir/persistence/test/MockPersistenceImpl.java +++ b/fhir-persistence/src/test/java/com/ibm/fhir/persistence/test/MockPersistenceImpl.java @@ -1,5 +1,5 @@ /* - * (C) Copyright IBM Corp. 2017,2021 + * (C) Copyright IBM Corp. 2017, 2021 * * SPDX-License-Identifier: Apache-2.0 */ @@ -77,7 +77,8 @@ public OperationOutcome getHealth() throws FHIRPersistenceException { } @Override - public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, Instant tstamp, String resourceLogicalId) throws FHIRPersistenceException { + public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, Instant tstamp, List indexIds, + String resourceLogicalId) throws FHIRPersistenceException { return 0; } @@ -98,4 +99,9 @@ public List changes(int resourceCount, Instant fromLast throws FHIRPersistenceException { return Collections.emptyList(); } + + @Override + public List retrieveIndex(int count, java.time.Instant notModifiedAfter, Long afterIndexId, String resourceTypeName) throws FHIRPersistenceException { + return Collections.emptyList(); + } } \ No newline at end of file diff --git a/fhir-server-test/src/test/java/com/ibm/fhir/server/test/FHIRValidateOperationTest.java b/fhir-server-test/src/test/java/com/ibm/fhir/server/test/FHIRValidateOperationTest.java index 2c1999a75ce..87b05f7f8ec 100644 --- a/fhir-server-test/src/test/java/com/ibm/fhir/server/test/FHIRValidateOperationTest.java +++ b/fhir-server-test/src/test/java/com/ibm/fhir/server/test/FHIRValidateOperationTest.java @@ -13,9 +13,6 @@ import java.io.StringWriter; import java.util.Collections; -import jakarta.json.Json; -import jakarta.json.JsonBuilderFactory; -import jakarta.json.JsonObject; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; @@ -32,6 +29,10 @@ import com.ibm.fhir.model.type.Uri; import com.ibm.fhir.model.type.code.IssueSeverity; +import jakarta.json.Json; +import jakarta.json.JsonBuilderFactory; +import jakarta.json.JsonObject; + public class FHIRValidateOperationTest extends FHIRServerTestBase { private static final JsonBuilderFactory BUILDER_FACTORY = Json.createBuilderFactory(null); @Test(groups = { "validate-operation" }) @@ -40,7 +41,7 @@ public void testValidatePatient() { Entity entity = Entity.entity(patient, FHIRMediaType.APPLICATION_JSON); WebTarget target = getWebTarget(); - Response response = target.path("Resource/$validate").request().post(entity, Response.class); + Response response = target.path("Patient/$validate").request().post(entity, Response.class); assertResponse(response, Response.Status.OK.getStatusCode()); OperationOutcome operationOutcome = response.readEntity(OperationOutcome.class); @@ -56,7 +57,7 @@ public void testValidateInvalidPatient() { Entity entity = Entity.entity(patient, FHIRMediaType.APPLICATION_JSON); WebTarget target = getWebTarget(); - Response response = target.path("Resource/$validate").request().post(entity, Response.class); + Response response = target.path("Patient/$validate").request().post(entity, Response.class); assertResponse(response, Response.Status.OK.getStatusCode()); OperationOutcome operationOutcome = response.readEntity(OperationOutcome.class); @@ -92,7 +93,7 @@ public void testValidateValidObsProfile() throws Exception { FHIRGenerator.generator(Format.JSON).generate(parameters, writer); WebTarget target = getWebTarget(); - Response response = target.path("Resource/$validate").request().post(Entity.json(writer.toString()), Response.class); + Response response = target.path("Observation/$validate").request().post(Entity.json(writer.toString()), Response.class); assertResponse(response, Response.Status.OK.getStatusCode()); OperationOutcome operationOutcome = response.readEntity(OperationOutcome.class); @@ -122,7 +123,7 @@ public void testValidateValidRespRate() throws Exception { FHIRGenerator.generator(Format.JSON).generate(parameters, writer); WebTarget target = getWebTarget(); - Response response = target.path("Resource/$validate").request().post(Entity.json(writer.toString()), Response.class); + Response response = target.path("Observation/$validate").request().post(Entity.json(writer.toString()), Response.class); assertResponse(response, Response.Status.OK.getStatusCode()); OperationOutcome operationOutcome = response.readEntity(OperationOutcome.class); @@ -152,7 +153,7 @@ public void testValidateInvalidObsProfile() throws Exception { FHIRGenerator.generator(Format.JSON).generate(parameters, writer); WebTarget target = getWebTarget(); - Response response = target.path("Resource/$validate").request().post(Entity.json(writer.toString()), Response.class); + Response response = target.path("Observation/$validate").request().post(Entity.json(writer.toString()), Response.class); assertResponse(response, Response.Status.OK.getStatusCode()); OperationOutcome operationOutcome = response.readEntity(OperationOutcome.class); diff --git a/fhir-server-test/src/test/java/com/ibm/fhir/server/test/operation/ReindexOperationTest.java b/fhir-server-test/src/test/java/com/ibm/fhir/server/test/operation/ReindexOperationTest.java index 8648e58b5e2..6629d907c24 100644 --- a/fhir-server-test/src/test/java/com/ibm/fhir/server/test/operation/ReindexOperationTest.java +++ b/fhir-server-test/src/test/java/com/ibm/fhir/server/test/operation/ReindexOperationTest.java @@ -437,4 +437,30 @@ public void testReindexWithPatientResourceType() { assertEquals(r.getStatus(), Status.OK.getStatusCode()); } + + @Test + public void testReindex_indexIds() { + List parameters = new ArrayList<>(); + parameters.add( + Parameter.builder() + .name(string("indexIds")) + .value(string("2,4,6,8,10")) + .build()); + + Parameters.Builder builder = Parameters.builder(); + builder.id(UUID.randomUUID().toString()); + builder.parameter(parameters); + Parameters ps = builder.build(); + + Entity entity = Entity.entity(ps, FHIRMediaType.APPLICATION_FHIR_JSON); + + Response r = getWebTarget() + .path("/$reindex") + .request(FHIRMediaType.APPLICATION_FHIR_JSON) + .header("X-FHIR-TENANT-ID", "default") + .header("X-FHIR-DSID", "default") + .post(entity, Response.class); + + assertEquals(r.getStatus(), Status.OK.getStatusCode()); + } } \ No newline at end of file diff --git a/fhir-server-test/src/test/java/com/ibm/fhir/server/test/operation/RetrieveIndexOperationTest.java b/fhir-server-test/src/test/java/com/ibm/fhir/server/test/operation/RetrieveIndexOperationTest.java new file mode 100644 index 00000000000..74185c0ad68 --- /dev/null +++ b/fhir-server-test/src/test/java/com/ibm/fhir/server/test/operation/RetrieveIndexOperationTest.java @@ -0,0 +1,145 @@ +/* + * (C) Copyright IBM Corp. 2021 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.server.test.operation; + +import static com.ibm.fhir.model.type.Integer.of; +import static com.ibm.fhir.model.type.String.string; +import static org.testng.Assert.assertEquals; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.testng.annotations.Test; + +import com.ibm.fhir.core.FHIRMediaType; +import com.ibm.fhir.model.resource.Parameters; +import com.ibm.fhir.model.resource.Parameters.Parameter; +import com.ibm.fhir.server.test.FHIRServerTestBase; + +/** + * This class tests the $retrieve-index operation. + */ +public class RetrieveIndexOperationTest extends FHIRServerTestBase { + + @Test + public void testRetrieveIndex() { + WebTarget target = getWebTarget(); + target = target.path("/$retrieve-index"); + + List parameters = new ArrayList<>(); + parameters.add( + Parameter.builder() + .name(string("_count")) + .value(of(5)) + .build()); + parameters.add( + Parameter.builder() + .name(string("notModifiedAfter")) + .value(string(Instant.now().toString())) + .build()); + parameters.add( + Parameter.builder() + .name(string("afterIndexId")) + .value(string("8")) + .build()); + + Parameters.Builder builder = Parameters.builder(); + builder.id(UUID.randomUUID().toString()); + builder.parameter(parameters); + Parameters ps = builder.build(); + + Entity entity = Entity.entity(ps, FHIRMediaType.APPLICATION_FHIR_JSON); + + Response r = target.request(FHIRMediaType.APPLICATION_FHIR_JSON) + .header("X-FHIR-TENANT-ID", "default") + .header("X-FHIR-DSID", "default") + .post(entity, Response.class); + + assertEquals(r.getStatus(), Status.OK.getStatusCode()); + } + + @Test + public void testRetrieveIndex_type() { + WebTarget target = getWebTarget(); + target = target.path("/Patient/$retrieve-index"); + + List parameters = new ArrayList<>(); + parameters.add( + Parameter.builder() + .name(string("_count")) + .value(of(5)) + .build()); + parameters.add( + Parameter.builder() + .name(string("notModifiedAfter")) + .value(string(Instant.now().toString())) + .build()); + parameters.add( + Parameter.builder() + .name(string("afterIndexId")) + .value(string("8")) + .build()); + + Parameters.Builder builder = Parameters.builder(); + builder.id(UUID.randomUUID().toString()); + builder.parameter(parameters); + Parameters ps = builder.build(); + + Entity entity = Entity.entity(ps, FHIRMediaType.APPLICATION_FHIR_JSON); + + Response r = target.request(FHIRMediaType.APPLICATION_FHIR_JSON) + .header("X-FHIR-TENANT-ID", "default") + .header("X-FHIR-DSID", "default") + .post(entity, Response.class); + + assertEquals(r.getStatus(), Status.OK.getStatusCode()); + } + + @Test + public void testRetrieveIndex_invalidType() { + WebTarget target = getWebTarget(); + target = target.path("/Resource/$retrieve-index"); + + List parameters = new ArrayList<>(); + parameters.add( + Parameter.builder() + .name(string("_count")) + .value(of(5)) + .build()); + parameters.add( + Parameter.builder() + .name(string("notModifiedAfter")) + .value(string(Instant.now().toString())) + .build()); + parameters.add( + Parameter.builder() + .name(string("afterIndexId")) + .value(string("8")) + .build()); + + Parameters.Builder builder = Parameters.builder(); + builder.id(UUID.randomUUID().toString()); + builder.parameter(parameters); + Parameters ps = builder.build(); + + Entity entity = Entity.entity(ps, FHIRMediaType.APPLICATION_FHIR_JSON); + + Response r = target.request(FHIRMediaType.APPLICATION_FHIR_JSON) + .header("X-FHIR-TENANT-ID", "default") + .header("X-FHIR-DSID", "default") + .post(entity, Response.class); + + assertEquals(r.getStatus(), Status.BAD_REQUEST.getStatusCode()); + } +} \ No newline at end of file diff --git a/fhir-server/src/main/java/com/ibm/fhir/server/operation/spi/AbstractOperation.java b/fhir-server/src/main/java/com/ibm/fhir/server/operation/spi/AbstractOperation.java index 7296a4ec0dd..54ebd4bf4e2 100644 --- a/fhir-server/src/main/java/com/ibm/fhir/server/operation/spi/AbstractOperation.java +++ b/fhir-server/src/main/java/com/ibm/fhir/server/operation/spi/AbstractOperation.java @@ -147,27 +147,14 @@ protected void validateOperationContext(FHIROperationContext operationContext, C String msg = "Operation context INSTANCE is not allowed for operation: '" + getName() + "'"; throw buildExceptionWithIssue(msg, IssueType.INVALID); } + validateResourceType(operationContext, resourceType); break; case RESOURCE_TYPE: if (definition.getType().getValue() == false) { String msg = "Operation context RESOURCE_TYPE is not allowed for operation: '" + getName() + "'"; throw buildExceptionWithIssue(msg, IssueType.INVALID); - } else { - if (resourceType == null) { - String actualPath = "null-path"; - Object val = operationContext.getProperty(FHIROperationContext.PROPNAME_PATH_PARAMETER); - if (val instanceof java.lang.String) { - actualPath = (String) val; - } - throw buildUnsupportedResourceTypeException(actualPath); - } - String resourceTypeName = resourceType.getSimpleName(); - List resourceTypeNames = getResourceTypeNames(); - if (!resourceTypeNames.contains(resourceTypeName) && !resourceTypeNames.contains("Resource")) { - String msg = "Resource type: '" + resourceTypeName + "' is not allowed for operation: '" + getName() + "'"; - throw buildExceptionWithIssue(msg, IssueType.INVALID); - } } + validateResourceType(operationContext, resourceType); break; case SYSTEM: if (definition.getSystem().getValue() == false) { @@ -180,6 +167,34 @@ protected void validateOperationContext(FHIROperationContext operationContext, C } } + /** + * Determines if the operation disallows abstract resource types, Resource and DomainResource. + * TODO: Remove this method when Issue #2526 is implemented, at which time, abstract resource types + * will be disallowed for any operation. + * @return true or false + */ + protected boolean isAbstractResourceTypesDisallowed() { + return false; + } + + private void validateResourceType(FHIROperationContext operationContext, Class resourceType) throws FHIROperationException { + if (resourceType == null) { + String actualPath = "null-path"; + Object val = operationContext.getProperty(FHIROperationContext.PROPNAME_PATH_PARAMETER); + if (val instanceof java.lang.String) { + actualPath = (String) val; + } + throw buildUnsupportedResourceTypeException(actualPath); + } + String resourceTypeName = resourceType.getSimpleName(); + List resourceTypeNames = getResourceTypeNames(); + if ((isAbstractResourceTypesDisallowed() && !ModelSupport.isConcreteResourceType(resourceTypeName)) || + (!resourceTypeNames.contains(resourceTypeName) && !resourceTypeNames.contains("Resource"))) { + String msg = "Resource type: '" + resourceTypeName + "' is not allowed for operation: '" + getName() + "'"; + throw buildExceptionWithIssue(msg, IssueType.INVALID); + } + } + private FHIROperationException buildUnsupportedResourceTypeException(String resourceTypeName) { String msg = "'" + resourceTypeName + "' is not a valid resource type."; Issue issue = OperationOutcome.Issue.builder() diff --git a/fhir-server/src/main/java/com/ibm/fhir/server/operation/spi/FHIRResourceHelpers.java b/fhir-server/src/main/java/com/ibm/fhir/server/operation/spi/FHIRResourceHelpers.java index ded5f779edd..76aaa0189d4 100644 --- a/fhir-server/src/main/java/com/ibm/fhir/server/operation/spi/FHIRResourceHelpers.java +++ b/fhir-server/src/main/java/com/ibm/fhir/server/operation/spi/FHIRResourceHelpers.java @@ -7,6 +7,7 @@ package com.ibm.fhir.server.operation.spi; import java.time.Instant; +import java.util.List; import javax.ws.rs.core.MultivaluedMap; @@ -309,16 +310,19 @@ Resource doInvoke(FHIROperationContext operationContext, String resourceTypeName FHIRPersistenceTransaction getTransaction() throws Exception; /** - * Invoke the FHIR persistence reindex operation for a randomly chosen resource which was - * last reindexed before the given date - * @param operationContext - * @param operationOutcomeResult - * @param tstamp - * @param resourceLogicalId a reference to a resource e.g. "Patient/abc123". Can be null - * @return number of resources reindexed (0 if no resources were found to reindex) + * Invoke the FHIR persistence reindex operation for either a specified list of indexIds, + * or a randomly chosen resource, last reindexed before the given timestamp. + * @param operationContext the operation context + * @param operationOutcomeResult accumulate issues in this {@link OperationOutcome.Builder} + * @param tstamp only reindex resources with a reindex_tstamp less than this + * @param indexIds list of index IDs of resources to reindex, or null + * @param resourceLogicalId resourceType (e.g. "Patient"), or resourceType/logicalId a specific resource (e.g. "Patient/abc123"), to reindex, or null; + * this parameter is ignored if the indexIds parameter value is non-null + * @return count of the number of resources reindexed by this call * @throws Exception */ - int doReindex(FHIROperationContext operationContext, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, String resourceLogicalId) throws Exception; + int doReindex(FHIROperationContext operationContext, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, List indexIds, + String resourceLogicalId) throws Exception; /** * Invoke the FHIR Persistence erase operation for a specific instance of the erase. @@ -333,4 +337,17 @@ default ResourceEraseRecord doErase(FHIROperationContext operationContext, Erase */ throw new FHIROperationException("Unsupported for the given platform"); } + + /** + * Invoke the FHIR persistence retrieve index operation to retrieve a list of indexIds available for reindexing. + * @param operationContext the operation context + * @param resourceTypeName the resource type of index IDs to return, or null + * @param count the maximum nuber of index IDs to retrieve + * @param notModifiedAfter only retrieve index IDs for resources not last updated after the specified timestamp + * @param afterIndexId retrieve index IDs starting after this specified ID, or null to start with first ID + * @return list of index IDs available for reindexing + * @throws Exception + */ + List doRetrieveIndex(FHIROperationContext operationContext, String resourceTypeName, int count, Instant notModifiedAfter, Long afterIndexId) throws Exception; + } \ No newline at end of file diff --git a/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIROperationUtil.java b/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIROperationUtil.java index e7f7ffbce71..a10b741b357 100644 --- a/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIROperationUtil.java +++ b/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIROperationUtil.java @@ -28,6 +28,7 @@ import com.ibm.fhir.model.type.Code; import com.ibm.fhir.model.type.Date; import com.ibm.fhir.model.type.DateTime; +import com.ibm.fhir.model.type.Element; import com.ibm.fhir.model.type.Id; import com.ibm.fhir.model.type.Instant; import com.ibm.fhir.model.type.Oid; @@ -179,6 +180,7 @@ public static Parameters getOutputParameters(Resource resource) { return getOutputParameters("return", resource); } + /** * generates an output parameter with a specific name. * @@ -195,6 +197,23 @@ public static Parameters getOutputParameters(String name, Resource resource) { .build(); } + /** + * Generates an output parameters, with a parameter for a specified element. + * @param name the parameter name + * @param element the element, or null + * @return output parameters + */ + public static Parameters getOutputParameters(String name, Element element) { + Parameters.Builder builder = Parameters.builder(); + if (element != null) { + builder.parameter(Parameter.builder() + .name(string(name)) + .value(element) + .build()); + } + return builder.build(); + } + public static boolean hasSingleResourceOutputParameter(Parameters parameters) { if (parameters == null) { return false; diff --git a/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIRRestHelper.java b/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIRRestHelper.java index a99d02bcf38..9d2f9959d67 100644 --- a/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIRRestHelper.java +++ b/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIRRestHelper.java @@ -3069,7 +3069,8 @@ private void setOperationContextProperties(FHIROperationContext operationContext } @Override - public int doReindex(FHIROperationContext operationContext, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, String resourceLogicalId) throws Exception { + public int doReindex(FHIROperationContext operationContext, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, List indexIds, + String resourceLogicalId) throws Exception { int result = 0; // handle some retries in case of deadlock exceptions final int TX_ATTEMPTS = 5; @@ -3079,7 +3080,7 @@ public int doReindex(FHIROperationContext operationContext, OperationOutcome.Bui txn.begin(); try { FHIRPersistenceContext persistenceContext = null; - result = persistence.reindex(persistenceContext, operationOutcomeResult, tstamp, resourceLogicalId); + result = persistence.reindex(persistenceContext, operationOutcomeResult, tstamp, indexIds, resourceLogicalId); attempt = TX_ATTEMPTS; // end the retry loop } catch (FHIRPersistenceDataAccessException x) { if (x.isTransactionRetryable() && attempt < TX_ATTEMPTS) { @@ -3474,4 +3475,22 @@ public ResourceEraseRecord doErase(FHIROperationContext operationContext, EraseD } while (attempt++ < TX_ATTEMPTS); return eraseRecord; } + + @Override + public List doRetrieveIndex(FHIROperationContext operationContext, String resourceTypeName, int count, Instant notModifiedAfter, Long afterIndexId) throws Exception { + List indexIds = null; + + FHIRTransactionHelper txn = null; + try { + txn = new FHIRTransactionHelper(getTransaction()); + txn.begin(); + indexIds = persistence.retrieveIndex(count, notModifiedAfter, afterIndexId, resourceTypeName); + } finally { + if (txn != null) { + txn.end(); + } + } + + return indexIds; + } } \ No newline at end of file diff --git a/fhir-server/src/test/java/com/ibm/fhir/server/test/MockPersistenceImpl.java b/fhir-server/src/test/java/com/ibm/fhir/server/test/MockPersistenceImpl.java index 1ff15392345..18a08f95f33 100644 --- a/fhir-server/src/test/java/com/ibm/fhir/server/test/MockPersistenceImpl.java +++ b/fhir-server/src/test/java/com/ibm/fhir/server/test/MockPersistenceImpl.java @@ -135,7 +135,8 @@ public String generateResourceId() { } @Override - public int reindex(FHIRPersistenceContext context, Builder operationOutcomeResult, java.time.Instant tstamp, String resourceLogicalId) throws FHIRPersistenceException { + public int reindex(FHIRPersistenceContext context, Builder operationOutcomeResult, java.time.Instant tstamp, List indexIds, + String resourceLogicalId) throws FHIRPersistenceException { return 0; } @@ -162,4 +163,10 @@ public List changes(int resourceCount, java.time.Instan // NOP return null; } + + @Override + public List retrieveIndex(int count, java.time.Instant notModifiedAfter, Long afterIndexId, String resourceTypeName) throws FHIRPersistenceException { + // NOP + return null; + } } \ No newline at end of file diff --git a/fhir-server/src/test/java/com/ibm/fhir/server/test/ServerResolveFunctionTest.java b/fhir-server/src/test/java/com/ibm/fhir/server/test/ServerResolveFunctionTest.java index 09f4ca219c1..95839ebec45 100644 --- a/fhir-server/src/test/java/com/ibm/fhir/server/test/ServerResolveFunctionTest.java +++ b/fhir-server/src/test/java/com/ibm/fhir/server/test/ServerResolveFunctionTest.java @@ -369,6 +369,7 @@ public int reindex( FHIRPersistenceContext context, Builder operationOutcomeResult, java.time.Instant tstamp, + List indexIds, String resourceLogicalId) throws FHIRPersistenceException { throw new UnsupportedOperationException(); } @@ -419,6 +420,11 @@ private SingleResourceResult createOrUpdate(T resource) return resultBuilder.build(); } + + @Override + public List retrieveIndex(int count, java.time.Instant notModifiedAfter, Long afterIndexId, String resourceTypeName) throws FHIRPersistenceException { + throw new UnsupportedOperationException(); + } } public static class PersistenceTransactionImpl implements FHIRPersistenceTransaction { diff --git a/fhir-smart/src/test/java/com/ibm/fhir/smart/test/MockPersistenceImpl.java b/fhir-smart/src/test/java/com/ibm/fhir/smart/test/MockPersistenceImpl.java index e8ef8fcaeff..478084cbd3d 100644 --- a/fhir-smart/src/test/java/com/ibm/fhir/smart/test/MockPersistenceImpl.java +++ b/fhir-smart/src/test/java/com/ibm/fhir/smart/test/MockPersistenceImpl.java @@ -152,7 +152,8 @@ public OperationOutcome getHealth() throws FHIRPersistenceException { } @Override - public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, Instant tstamp, String resourceLogicalId) throws FHIRPersistenceException { + public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, Instant tstamp, List indexIds, + String resourceLogicalId) throws FHIRPersistenceException { return 0; } @@ -172,4 +173,9 @@ public List changes(int resourceCount, Instant fromLast throws FHIRPersistenceException { return null; } + + @Override + public List retrieveIndex(int count, java.time.Instant notModifiedAfter, Long afterIndexId, String resourceTypeName) throws FHIRPersistenceException { + return null; + } } diff --git a/operation/fhir-operation-erase/src/test/java/com/ibm/fhir/operation/erase/mock/MockFHIRResourceHelpers.java b/operation/fhir-operation-erase/src/test/java/com/ibm/fhir/operation/erase/mock/MockFHIRResourceHelpers.java index 01c36ac06fb..f0f5559ec4e 100644 --- a/operation/fhir-operation-erase/src/test/java/com/ibm/fhir/operation/erase/mock/MockFHIRResourceHelpers.java +++ b/operation/fhir-operation-erase/src/test/java/com/ibm/fhir/operation/erase/mock/MockFHIRResourceHelpers.java @@ -7,6 +7,7 @@ package com.ibm.fhir.operation.erase.mock; import java.time.Instant; +import java.util.List; import javax.ws.rs.core.MultivaluedMap; @@ -63,8 +64,8 @@ public FHIRPersistenceTransaction getTransaction() throws Exception { } @Override - public int doReindex(FHIROperationContext operationContext, Builder operationOutcomeResult, Instant tstamp, String resourceLogicalId) - throws Exception { + public int doReindex(FHIROperationContext operationContext, Builder operationOutcomeResult, Instant tstamp, List indexIds, + String resourceLogicalId) throws Exception { return 0; } @@ -159,4 +160,10 @@ public Bundle doBundle(Bundle bundle, boolean skippableUpdates) throws Exception return null; } + + @Override + public List doRetrieveIndex(FHIROperationContext operationContext, String resourceTypeName, int count, java.time.Instant notModifiedAfter, Long afterIndexId) throws Exception { + + return null; + } } diff --git a/operation/fhir-operation-reindex/src/main/java/com/ibm/fhir/operation/reindex/ReindexOperation.java b/operation/fhir-operation-reindex/src/main/java/com/ibm/fhir/operation/reindex/ReindexOperation.java index fb6db31c0dd..62fe2a8949a 100644 --- a/operation/fhir-operation-reindex/src/main/java/com/ibm/fhir/operation/reindex/ReindexOperation.java +++ b/operation/fhir-operation-reindex/src/main/java/com/ibm/fhir/operation/reindex/ReindexOperation.java @@ -12,7 +12,10 @@ import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; +import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -39,6 +42,7 @@ public class ReindexOperation extends AbstractOperation { private static final Logger logger = Logger.getLogger(ReindexOperation.class.getName()); private static final String PARAM_TSTAMP = "tstamp"; + private static final String PARAM_INDEX_IDS = "indexIds"; private static final String PARAM_RESOURCE_COUNT = "resourceCount"; private static final String PARAM_RESOURCE_LOGICAL_ID = "resourceLogicalId"; @@ -64,6 +68,11 @@ protected OperationDefinition buildOperationDefinition() { } } + @Override + protected boolean isAbstractResourceTypesDisallowed() { + return true; + } + @Override protected Parameters doInvoke(FHIROperationContext operationContext, Class resourceType, String logicalId, String versionId, Parameters parameters, FHIRResourceHelpers resourceHelper) @@ -77,16 +86,17 @@ protected Parameters doInvoke(FHIROperationContext operationContext, Class indexIds = null; int resourceCount = 10; String resourceLogicalId = null; - String specificResourceType = null; + boolean hasSpecificResourceType = false; if (resourceType != null) { - specificResourceType = resourceType.getSimpleName(); - resourceLogicalId = specificResourceType; + resourceLogicalId = resourceType.getSimpleName(); if (logicalId != null) { resourceLogicalId += "/" + logicalId; } + hasSpecificResourceType = true; } if (parameters != null) { @@ -106,6 +116,23 @@ protected Parameters doInvoke(FHIROperationContext operationContext, Class lrIdSet = new LinkedHashSet<>(); + String[] lrIdArray = lrIdsString.split("\\s*,\\s*"); + if (lrIdArray.length == 0) { + lrIdSet.add(Long.valueOf(lrIdsString)); + } + for (String lrIdString : lrIdArray) { + lrIdSet.add(Long.valueOf(lrIdString)); + } + indexIds = new ArrayList<>(lrIdSet); + if (indexIds.size() > MAX_RESOURCE_COUNT) { + throw FHIROperationUtil.buildExceptionWithIssue("The specified number of index IDs exceeds the maximum allowed number of resources to reindex", IssueType.INVALID); + } + } } else if (PARAM_RESOURCE_COUNT.equals(parameter.getName().getValue())) { Integer val = parameter.getValue().as(com.ibm.fhir.model.type.Integer.class).getValue(); if (val != null) { @@ -116,32 +143,37 @@ protected Parameters doInvoke(FHIROperationContext operationContext, Class 0; i++) { - processed = resourceHelper.doReindex(operationContext, result, tstamp, resourceLogicalId); - totalProcessed += processed; + if (indexIds != null) { + // All resources in one transaction + totalProcessed = resourceHelper.doReindex(operationContext, result, tstamp, indexIds, null); + } else { + int processed = 1; + // One resource per transaction + for (int i=0; i 0; i++) { + processed = resourceHelper.doReindex(operationContext, result, tstamp, null, resourceLogicalId); + totalProcessed += processed; + } } if (totalProcessed == 0) { diff --git a/operation/fhir-operation-reindex/src/main/java/com/ibm/fhir/operation/reindex/RetrieveIndexOperation.java b/operation/fhir-operation-reindex/src/main/java/com/ibm/fhir/operation/reindex/RetrieveIndexOperation.java new file mode 100644 index 00000000000..c2fbb947c77 --- /dev/null +++ b/operation/fhir-operation-reindex/src/main/java/com/ibm/fhir/operation/reindex/RetrieveIndexOperation.java @@ -0,0 +1,140 @@ +/* + * (C) Copyright IBM Corp. 2021 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.operation.reindex; + +import static com.ibm.fhir.model.type.String.string; + +import java.io.InputStream; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import com.ibm.fhir.exception.FHIROperationException; +import com.ibm.fhir.model.format.Format; +import com.ibm.fhir.model.parser.FHIRParser; +import com.ibm.fhir.model.resource.OperationDefinition; +import com.ibm.fhir.model.resource.Parameters; +import com.ibm.fhir.model.resource.Resource; +import com.ibm.fhir.server.operation.spi.AbstractOperation; +import com.ibm.fhir.server.operation.spi.FHIROperationContext; +import com.ibm.fhir.server.operation.spi.FHIRResourceHelpers; +import com.ibm.fhir.server.util.FHIROperationUtil; + +/** + * Custom operation to invoke the persistence layer to retrieve a list of index IDs. + */ +public class RetrieveIndexOperation extends AbstractOperation { + private static final Logger logger = Logger.getLogger(RetrieveIndexOperation.class.getName()); + + private static final String PARAM_COUNT = "_count"; + private static final String PARAM_AFTER_INDEX_ID = "afterIndexId"; + private static final String PARAM_NOT_MODIFIED_AFTER = "notModifiedAfter"; + private static final String PARAM_INDEX_IDS = "indexIds"; + + // The max number of index IDs we allow to be retrieved by one request + private static final int MAX_COUNT = 1000; + + static final DateTimeFormatter DAY_FORMAT = new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd") + .parseDefaulting(ChronoField.NANO_OF_DAY, 0) + .toFormatter() + .withZone(ZoneId.of("UTC")); + + public RetrieveIndexOperation() { + super(); + } + + @Override + protected OperationDefinition buildOperationDefinition() { + try (InputStream in = getClass().getClassLoader().getResourceAsStream("retrieve-index.json")) { + return FHIRParser.parser(Format.JSON).parse(in); + } catch (Exception e) { + throw new Error(e); + } + } + + @Override + protected boolean isAbstractResourceTypesDisallowed() { + return true; + } + + @Override + protected Parameters doInvoke(FHIROperationContext operationContext, Class resourceType, + String logicalId, String versionId, Parameters parameters, FHIRResourceHelpers resourceHelper) + throws FHIROperationException { + + // Only POST is allowed + String method = (String) operationContext.getProperty(FHIROperationContext.PROPNAME_METHOD_TYPE); + if (!"POST".equalsIgnoreCase(method)) { + throw new FHIROperationException("HTTP method not supported: " + method); + } + + try { + String indexIdsString = ""; + int count = MAX_COUNT; + Long afterIndexId = null; + Instant notModifiedAfter = Instant.now(); + String resourceTypeName = resourceType != null ? resourceType.getSimpleName() : null; + + if (parameters != null) { + for (Parameters.Parameter parameter : parameters.getParameter()) { + if (parameter.getValue() != null && logger.isLoggable(Level.FINE)) { + logger.fine("retrieve-index param: " + parameter.getName().getValue() + " = " + parameter.getValue().toString()); + } + + if (PARAM_COUNT.equals(parameter.getName().getValue())) { + Integer val = parameter.getValue().as(com.ibm.fhir.model.type.Integer.class).getValue(); + if (val != null) { + if (val > MAX_COUNT) { + logger.info("Clamping _count '" + val + "' to max allowed: " + MAX_COUNT); + val = MAX_COUNT; + } + count = val; + } + } else if (PARAM_NOT_MODIFIED_AFTER.equals(parameter.getName().getValue())) { + // Only retrieve index IDs for resources not last updated after the specified timestamp + String val = parameter.getValue().as(com.ibm.fhir.model.type.String.class).getValue(); + if (val.length() == 10) { + notModifiedAfter = DAY_FORMAT.parse(val, Instant::from); + } else { + // assume full ISO format + notModifiedAfter = Instant.parse(val); + } + } else if (PARAM_AFTER_INDEX_ID.equals(parameter.getName().getValue())) { + // Start retrieving index IDs after this specified index ID + afterIndexId = Long.valueOf(parameter.getValue().as(com.ibm.fhir.model.type.String.class).getValue()); + } + } + } + + // Get index IDs + List indexIds = resourceHelper.doRetrieveIndex(operationContext, resourceTypeName, count, notModifiedAfter, afterIndexId); + if (indexIds != null) { + indexIdsString = indexIds.stream().map(l -> String.valueOf(l)).collect(Collectors.joining(",")); + } + + // Return output + return FHIROperationUtil.getOutputParameters(PARAM_INDEX_IDS, !indexIdsString.isEmpty() ? string(indexIdsString) : null); + + } catch (FHIROperationException e) { + throw e; + } catch (Throwable t) { + throw new FHIROperationException("Unexpected error occurred while processing request for operation '" + + getName() + "': " + getCausedByMessage(t), t); + } + } + + private String getCausedByMessage(Throwable throwable) { + return throwable.getClass().getName() + ": " + throwable.getMessage(); + } +} diff --git a/operation/fhir-operation-reindex/src/main/resources/META-INF/services/com.ibm.fhir.server.operation.spi.FHIROperation b/operation/fhir-operation-reindex/src/main/resources/META-INF/services/com.ibm.fhir.server.operation.spi.FHIROperation index 77edf50b20a..4f231399c5b 100644 --- a/operation/fhir-operation-reindex/src/main/resources/META-INF/services/com.ibm.fhir.server.operation.spi.FHIROperation +++ b/operation/fhir-operation-reindex/src/main/resources/META-INF/services/com.ibm.fhir.server.operation.spi.FHIROperation @@ -1 +1,2 @@ com.ibm.fhir.operation.reindex.ReindexOperation +com.ibm.fhir.operation.reindex.RetrieveIndexOperation diff --git a/operation/fhir-operation-reindex/src/main/resources/reindex.json b/operation/fhir-operation-reindex/src/main/resources/reindex.json index 237c1e9cb17..45abc9f7ce3 100644 --- a/operation/fhir-operation-reindex/src/main/resources/reindex.json +++ b/operation/fhir-operation-reindex/src/main/resources/reindex.json @@ -22,27 +22,35 @@ "affectsState": true, "parameter": [ { - "name": "resourceCount", + "name": "tstamp", "use": "in", "min": 0, "max": "1", - "documentation": "The maximum number of resources to reindex in this call. If this number is too large, the processing time might exceed the transaction timeout and fail.", - "type": "integer" + "documentation": "Reindex only resources not previously reindexed since this timestamp. Format as a date YYYY-MM-DD or time YYYY-MM-DDTHH:MM:SSZ.", + "type": "string" }, { - "name": "tstamp", - "use": "in", + "name": "indexIds", + "use": "in", "min": 0, "max": "1", - "documentation": "Reindex any resource not previously reindexed before this timestamp. Format as a date YYYY-MM-DD or time YYYY-MM-DDTHH:MM:DDZ.", + "documentation": "Reindex only resources with an index ID in the specified list, formatted as a comma-delimited list of strings. If number of index IDs in the list is too large, the processing time might exceed the transaction timeout and fail.", "type": "string" }, + { + "name": "resourceCount", + "use": "in", + "min": 0, + "max": "1", + "documentation": "The maximum number of resources to reindex in this call. If this number is too large, the processing time might exceed the transaction timeout and fail. If indexIds is specified, this parameter is not used.", + "type": "integer" + }, { "name": "resourceLogicalId", "use": "in", "min": 0, "max": "1", - "documentation": "Reindex only the specified resource or resources of the given resource type when no id is provided. Format as Patient/abc123 or Patient", + "documentation": "Reindex only the specified resource or resources of the given resource type when no id is provided. Format as Patient/abc123 or Patient. If indexIds is specified, this parameter is not used.", "type": "string" } ] diff --git a/operation/fhir-operation-reindex/src/main/resources/retrieve-index.json b/operation/fhir-operation-reindex/src/main/resources/retrieve-index.json new file mode 100644 index 00000000000..668c24ecfc8 --- /dev/null +++ b/operation/fhir-operation-reindex/src/main/resources/retrieve-index.json @@ -0,0 +1,56 @@ +{ + "resourceType": "OperationDefinition", + "id": "retrieve-index", + "text": { + "status": "generated", + "div": "

retrieve-index

OPERATION: Retrieve index IDs of resources available to reindex

The retrieve-index operation retrieves the index IDs from the database. These index IDs can then be passed into the reindex operation to indicate the specific resources to reindex.

URL: [base]/$retrieve-index

" + }, + "url": "http://ibm.com/fhir/OperationDefinition/retrieve-index", + "name": "retrieve-index", + "status": "draft", + "kind": "operation", + "publisher": "IBM FHIR Server", + "date": "2021-06-11", + "description": "The retrieve-index operation retrieves the index IDs from the database.", + "code": "retrieve-index", + "system": true, + "type": true, + "instance": false, + "resource": [ + "Resource" + ], + "parameter": [ + { + "name": "_count", + "use": "in", + "min": 0, + "max": "1", + "documentation": "The maximum number of index IDs to retrieve. This may not exceed 1000. If not specified, the maxinum number retrieved is 1000.", + "type": "integer" + }, + { + "name": "notModifiedAfter", + "use": "in", + "min": 0, + "max": "1", + "documentation": "Only retrieve index IDs for resources not last updated after this timestamp. Format as a date YYYY-MM-DD or time YYYY-MM-DDTHH:MM:SSZ.", + "type": "string" + }, + { + "name": "afterIndexId", + "use": "in", + "min": 0, + "max": "1", + "documentation": "Retrieve index IDs starting with the first index ID after this index ID. If this parameter is not specified, the retrieved index IDs start with the first index ID.", + "type": "string" + }, + { + "name": "indexIds", + "use": "out", + "min": 0, + "max": "1", + "documentation": "The index IDs, formatted as a comma-delimited list of strings. This parameter is not returned if there are no index IDs retrieved.", + "type": "string" + } + ] +} \ No newline at end of file