Skip to content

Commit

Permalink
issue #1708 addressed code review comments and fixed reindexing as it…
Browse files Browse the repository at this point in the history
… is required for storing compartment references

Signed-off-by: Robin Arnold <robin.arnold23@ibm.com>
  • Loading branch information
punktilious committed Nov 22, 2020
1 parent d8aca7d commit 77fd427
Show file tree
Hide file tree
Showing 18 changed files with 317 additions and 240 deletions.
8 changes: 4 additions & 4 deletions docs/src/pages/guides/FHIRServerUsersGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ More information about multi-tenant support can be found in [Section 4.9 Multi-t
## 3.3.1 Compartment Search Performance
The IBM FHIR Server now supports the ability to compute and store compartment membership values during ingestion. Once stored, these values can help accelerate compartment-related search queries. To use this feature, update the IBM FHIR Server to the latest version and run a reindex operation. See the release notes for Release 4.5.0 for details. The reindex operation reprocesses the resources stored in the database, computing and storing the new compartment reference values. After the reindex operation has completed, add the following configuration element to the relevant tenant fhir-server-config.json file to allow the search queries to use the pre-computed values:
The IBM FHIR Server now supports the ability to compute and store compartment membership values during ingestion. Once stored, these values can help accelerate compartment-related search queries. To use this feature, update the IBM FHIR Server at least version 4.5.1 and run a reindex operation as described in the release notes for Release 4.5.0. The reindex operation reprocesses the resources stored in the database, computing and storing the new compartment reference values. After the reindex operation has completed, add the `useStoredCompartmentParam` configuration element to the relevant tenant fhir-server-config.json file to allow the search queries to use the pre-computed values:
```
{
Expand All @@ -186,9 +186,9 @@ The IBM FHIR Server now supports the ability to compute and store compartment me
```
## 3.4 Persistence layer configuration
The IBM FHIR server allows deployers to select a persistence layer implementation that fits their needs. Currently, the server includes a JDBC persistence layer which supports Apache Derby, IBM Db2, and PostgreSQL. However, Apache Derby is not recommended for production usage.
The IBM FHIR Server allows deployers to select a persistence layer implementation that fits their needs. Currently, the server includes a JDBC persistence layer which supports Apache Derby, IBM Db2, and PostgreSQL. However, Apache Derby is not recommended for production usage.
The FHIR server is delivered with a default configuration that is already configured to use the JDBC persistence layer implementation with an Embedded Derby database. This provides the easiest out-of-the-box experience since it requires very little setup. The sections that follow in this chapter will focus on how to configure the JDBC persistence layer implementation with either Embedded Derby or Db2.
The IBM FHIR Server is delivered with a default configuration that is already configured to use the JDBC persistence layer implementation with an Embedded Derby database. This provides the easiest out-of-the-box experience since it requires very little setup. The sections that follow in this chapter will focus on how to configure the JDBC persistence layer implementation with either Embedded Derby or Db2.
### 3.4.1 Configuring the JDBC persistence layer
#### 3.4.1.1 Database preparation
Expand Down Expand Up @@ -1864,7 +1864,7 @@ This section contains reference information about each of the configuration prop
|`fhirServer/audit/serviceProperties/geoState`|string|The Geo State configure for CADF audit logging service.|
|`fhirServer/audit/serviceProperties/geoCounty`|string|The Geo Country configure for CADF audit logging service.|
|`fhirServer/search/useBoundingRadius`|boolean|True, the bounding area is a Radius, else the bounding area is a box.|
|`fhirServer/search/useStoredCompartmentParam`|boolean|False, Compute and store parameter to accelerate compartment searches. Requires reindex using latest IBM FHIR Server version before this feature is enabled |
|`fhirServer/search/useStoredCompartmentParam`|boolean|False, Compute and store parameter to accelerate compartment searches. Requires reindex using at least IBM FHIR Server version 4.5.1 before this feature is enabled |
|`fhirServer/bulkdata/applicationName`| string|Fixed value, always set to fhir-bulkimportexport-webapp |
|`fhirServer/bulkdata/moduleName`|string| Fixed value, always set to fhir-bulkimportexport.war |
|`fhirServer/bulkdata/jobParameters/cos.bucket.name`|string|Object store bucket name |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,39 @@
import com.ibm.fhir.model.resource.OperationOutcome;
import com.ibm.fhir.model.resource.OperationOutcome.Issue;
import com.ibm.fhir.model.resource.Parameters;
import com.ibm.fhir.model.resource.Resource;
import com.ibm.fhir.model.resource.Parameters.Parameter;
import com.ibm.fhir.model.resource.Resource;

/**
* Drives the $reindex custom operation in parallel. Each thread keeps running
* until the OperationOutcome indicates that no work remains to be processed.
*/
public class DriveReindexOperation {
private static final Logger logger = Logger.getLogger(DriveReindexOperation.class.getName());

// the maximum number of requests we permit
private final int maxConcurrentRequests;

// flag to indicate if we should be running
private volatile boolean running = true;

private volatile boolean active = false;

// count of how many threads are currently running
private AtomicInteger currentlyRunning = new AtomicInteger();

// thread pool for processing requests
private final ExecutorService pool = Executors.newCachedThreadPool();

private final FHIRBucketClient fhirClient;

private final String url = "$reindex";

// The serialized Parameters resource sent with each POST
private final String requestBody;

private Thread monitorThread;

/**
* Public constructor
* @param client the FHIR client
Expand All @@ -65,13 +65,13 @@ public DriveReindexOperation(FHIRBucketClient fhirClient, int maxConcurrentReque
this.maxConcurrentRequests = maxConcurrentRequests;

Parameters parameters = Parameters.builder()
.parameter(Parameter.builder().name(str("_tstamp")).value(str(tstampParam)).build())
.parameter(Parameter.builder().name(str("_resourceCount")).value(intValue(resourceCountParam)).build())
.parameter(Parameter.builder().name(str("tstamp")).value(str(tstampParam)).build())
.parameter(Parameter.builder().name(str("resourceCount")).value(intValue(resourceCountParam)).build())
.build();

// Serialize into the requestBody string used by all the threads
this.requestBody = FHIRBucketClientUtil.resourceToString(parameters);

if (logger.isLoggable(Level.FINE)) {
logger.fine("Reindex request parameters: " + requestBody);
}
Expand All @@ -85,7 +85,7 @@ public DriveReindexOperation(FHIRBucketClient fhirClient, int maxConcurrentReque
private static com.ibm.fhir.model.type.String str(String str) {
return com.ibm.fhir.model.type.String.of(str);
}

private static com.ibm.fhir.model.type.Integer intValue(int val) {
return com.ibm.fhir.model.type.Integer.of(val);
}
Expand All @@ -97,7 +97,7 @@ public void init() {
if (!running) {
throw new IllegalStateException("Already shutdown");
}

// Initiate the monitorThread. This will fill the pool
// with worker threads, and monitor for completion or failure
logger.info("Starting monitor thread");
Expand All @@ -106,7 +106,7 @@ public void init() {
}

/**
* The main monitor loop.
* The main monitor loop.
*/
public void monitorLoop() {
while (this.running) {
Expand All @@ -121,11 +121,11 @@ public void monitorLoop() {
// should be OK now to fill the pool with workers
logger.info("Test probe successful - filling worker pool");
this.active = true;

for (int i=0; i<this.maxConcurrentRequests && this.running && this.active; i++) {
this.currentlyRunning.addAndGet(1);
pool.execute(() -> callReindexOperation());

// Slow down the ramp-up so we don't hit a new server with
// hundreds of requests in one go
safeSleep(1000);
Expand All @@ -140,7 +140,7 @@ public void monitorLoop() {
logger.info("Waiting for current threads to complete before restart: " + currentThreadCount);
safeSleep(5000);
}

} else { // active
// worker threads are active, so sleep for a bit before we check again
safeSleep(5000);
Expand All @@ -165,25 +165,25 @@ protected void safeSleep(long ms) {
*/
public void signalStop() {
this.running = false;

// make sure the pool doesn't start new work
pool.shutdown();
}

/**
* Wait until things are stopped
*/
public void waitForStop() {
if (this.running) {
signalStop();
}

try {
pool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException x) {
logger.warning("Wait for pool shutdown interrupted");
}

try {
// break any sleep inside the monitorThread
this.monitorThread.interrupt();
Expand All @@ -205,25 +205,25 @@ private void callReindexOperation() {
this.active = false;
}
}

this.currentlyRunning.decrementAndGet();
}

/**
* Make one call to the FHIR server $reindex operation
* @return true if the call was successful (200 OK)
*/
private boolean callOnce() {
boolean result = false;

// tell the FHIR Server to reindex a number of resources
long start = System.nanoTime();
FhirServerResponse response = fhirClient.post(url, requestBody);
long end = System.nanoTime();

double elapsed = (end - start) / 1e9;
logger.info(String.format("called $reindex: %d %s [took %5.3f s]", response.getStatusCode(), response.getStatusMessage(), elapsed));

if (response.getStatusCode() == HttpStatus.SC_OK) {
Resource resource = response.getResource();
if (resource != null) {
Expand All @@ -243,7 +243,7 @@ private boolean callOnce() {
// Stop as soon as we hit an error
logger.severe("FHIR Server reindex operation returned an error: " + response.getStatusCode() + " " + response.getStatusMessage());
}

return result;
}

Expand All @@ -257,7 +257,7 @@ private void checkResult(OperationOutcome result) {
Issue one = issues.get(0);
if ("Reindex complete".equals(one.getDiagnostics().getValue())) {
logger.info("Reindex - all done");

// tell all the running threads they can stop now
this.running = false;
}
Expand Down
Loading

0 comments on commit 77fd427

Please sign in to comment.