Skip to content

Commit

Permalink
Merge pull request #2559 from IBM/azure-preflight-a
Browse files Browse the repository at this point in the history
Add Azure Preflight
  • Loading branch information
tbieste authored Jun 28, 2021
2 parents 9995630 + f1e5c82 commit 45ba58d
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 26 deletions.
4 changes: 2 additions & 2 deletions build/docker/db2/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# ----------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2019, 2020
# (C) Copyright IBM Corp. 2019, 2021
#
# SPDX-License-Identifier: Apache-2.0
# ----------------------------------------------------------------------------

FROM ibmcom/db2
FROM ibmcom/db2:11.5.5.1

# Create a non-admin user for the FHIR server to use to access db2
RUN groupadd -g 1002 fhir && \
Expand Down
2 changes: 1 addition & 1 deletion build/reindex/db2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# SPDX-License-Identifier: Apache-2.0
# ----------------------------------------------------------------------------

FROM ibmcom/db2
FROM ibmcom/db2:11.5.5.1

# Create a non-admin user for the FHIR server to use to access db2
RUN groupadd -g 1002 fhir && \
Expand Down
6 changes: 6 additions & 0 deletions docs/src/pages/guides/FHIRServerUsersGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,8 @@ This section contains reference information about each of the configuration prop
|`fhirServer/bulkdata/core/file/writeTriggerSizeMB`|number|The size, in megabytes, at which to write the buffer to file.|
|`fhirServer/bulkdata/core/file/sizeThresholdMB`|number|The size, in megabytes, at which to finish writing a given file. Use `0` to indicate that all resources of a given type should be written to a single file.|
|`fhirServer/bulkdata/core/file/resourceCountThreshold`|number|The number of resources at which to finish writing a given file. The actual number of resources written to a single file may be slightly above this number, dependent on the configured page size. Use `0` to indicate that there is no limit to the number of resources to be written to a single file.|
|`fhirServer/bulkdata/core/azure/objectSizeThresholdMB`|number|The size, in megabytes, at which to finish writing a given object.|
|`fhirServer/bulkdata/core/azure/objectResourceCountThreshold`|number|The number of resources at which to finish writing a given object. The actual number of resources written to a single object may be slightly above this number, dependent on the configured page size.|
|`fhirServer/bulkdata/core/batchIdEncryptionKey`|string|Encoding key for JavaBatch job id |
|`fhirServer/bulkdata/core/pageSize`|number|The search page size for patient/group export and the legacy export, the default value is 1000 |
|`fhirServer/bulkdata/core/maxPartitions`|number| The maximum number of simultaneous partitions that are processed per Export and Import |
Expand Down Expand Up @@ -2214,6 +2216,8 @@ This section contains reference information about each of the configuration prop
|`fhirServer/bulkdata/core/cos/socketTimeout`|120|
|`fhirServer/bulkdata/core/cos/useServerTruststore`|false|
|`fhirServer/bulkdata/core/cos/presignedExpiry`|86400|
|`fhirServer/bulkdata/core/azure/objectSizeThresholdMB`|200|
|`fhirServer/bulkdata/core/azure/objectResourceCountThreshold`|200000|
|`fhirServer/bulkdata/core/pageSize`|1000|
|`fhirServer/bulkdata/core/maxPartitions`|5|
|`fhirServer/bulkdata/core/maxInputs`|5|
Expand Down Expand Up @@ -2354,6 +2358,8 @@ must restart the server for that change to take effect.
|`fhirServer/bulkdata/core/cos/socketTimeout`|N|N|
|`fhirServer/bulkdata/core/cos/useServerTruststore`|Y|Y|
|`fhirServer/bulkdata/core/cos/presignedExpiry`|Y|Y|
|`fhirServer/bulkdata/core/azure/objectSizeThresholdMB`|N|N|
|`fhirServer/bulkdata/core/azure/objectResourceCountThreshold`|N|N|
|`fhirServer/bulkdata/core/batchIdEncryptionKey`|Y|N|
|`fhirServer/bulkdata/core/pageSize`|Y|Y|
|`fhirServer/bulkdata/core/maxPartitions`|Y|Y|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ExportCheckpointAlgorithm implements CheckpointAlgorithm {
StepContext stepCtx;

Boolean isFileExport;
Boolean isAzureExport;

ConfigurationAdapter config = ConfigurationFactory.getInstance();

Expand All @@ -61,6 +62,14 @@ public void beginCheckpoint() {
isFileExport = "file".equals(config.getStorageProviderType(ctx.getSource()));
}

if (isAzureExport == null) {
JobOperator jobOperator = BatchRuntime.getJobOperator();
JobExecution jobExecution = jobOperator.getJobExecution(jobCtx.getExecutionId());
BatchContextAdapter contextAdapter = new BatchContextAdapter(jobExecution.getJobParameters());
BulkDataContext ctx = contextAdapter.getStepContextForSystemChunkWriter();
isAzureExport = "azure-blob".equals(config.getStorageProviderType(ctx.getSource()));
}

if (logger.isLoggable(Level.FINE)) {
ExportTransientUserData chunkData = (ExportTransientUserData) stepCtx.getTransientUserData();
if (chunkData != null) {
Expand Down Expand Up @@ -102,12 +111,23 @@ public boolean isReadyToCheckpoint() {
return false;
}

long resourceCountThreshold = isFileExport ? config.getCoreFileResourceCountThreshold()
: config.getCoreCosObjectResourceCountThreshold();
long sizeThreshold = isFileExport ? config.getCoreFileSizeThreshold()
: config.getCoreCosObjectSizeThreshold();
long writeTrigger = isFileExport ? config.getCoreFileWriteTriggerSize()
: config.getCoreCosPartUploadTriggerSize();
// Split for our three types of Providers
long resourceCountThreshold;
long sizeThreshold;
long writeTrigger;
if (isAzureExport) {
resourceCountThreshold = config.getCoreAzureObjectResourceCountThreshold();
sizeThreshold = config.getCoreAzureObjectSizeThreshold();
writeTrigger = config.getCoreCosPartUploadTriggerSize();
} else if (isFileExport) {
resourceCountThreshold = config.getCoreFileResourceCountThreshold();
sizeThreshold = config.getCoreFileSizeThreshold();
writeTrigger = config.getCoreFileWriteTriggerSize();
} else {
resourceCountThreshold = config.getCoreCosObjectResourceCountThreshold();
sizeThreshold = config.getCoreCosObjectSizeThreshold();
writeTrigger = config.getCoreCosPartUploadTriggerSize();
}

// Set to true if we have enough bytes to write a part
boolean readyToWrite = chunkData.getBufferStream().size() >= writeTrigger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ public void open(Serializable checkpoint) throws Exception {
cosBucketName = adapter.getStorageProviderBucketName(source);
cosBucketPathPrefix = ctx.getCosBucketPathPrefix();

// Azure Provider needs some specific configuration points that are unique.
if (StorageType.AZURE.value().equals(adapter.getStorageProviderType(source))) {
maxObjectSize = adapter.getCoreAzureObjectSizeThreshold();
resourcesPerObject = adapter.getCoreAzureObjectResourceCountThreshold();
}

// Initialize the configuration from the injected string values
String fhirSearchFromDate = ctx.getFhirSearchFromDate();
if (fhirSearchFromDate != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.ibm.fhir.persistence.context.FHIRPersistenceContext;
import com.ibm.fhir.persistence.context.FHIRPersistenceContextFactory;
import com.ibm.fhir.persistence.exception.FHIRPersistenceException;
import com.ibm.fhir.persistence.exception.FHIRPersistenceResourceDeletedException;
import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper;
import com.ibm.fhir.persistence.helper.FHIRTransactionHelper;
import com.ibm.fhir.validation.exception.FHIRValidationException;
Expand Down Expand Up @@ -299,7 +300,13 @@ public OperationOutcome conditionalFingerprintUpdate(ImportTransientUserData chu
if (oldBaseLine == null) {
// Go get the latest resource in the database and fingerprint the resource.
// If the resource exists, then we need to fingerprint.
oldResource = persistence.read(context, resource.getClass(), logicalId).getResource();
try {
// This execution is in a try-catch-block since we want to catch
// the resource deleted exception.
oldResource = persistence.read(context, resource.getClass(), logicalId).getResource();
} catch (FHIRPersistenceResourceDeletedException fpde) {
logger.throwing("ChunkWriter", "conditionalFingerprintUpdate", fpde);
}
if (oldResource != null) {
ResourceFingerprintVisitor fpOld = new ResourceFingerprintVisitor();
oldResource.accept(fpOld);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.ibm.fhir.model.resource.Resource;
import com.ibm.fhir.model.type.code.IssueType;
import com.ibm.fhir.model.util.FHIRUtil;
import com.ibm.fhir.operation.bulkdata.config.ConfigurationAdapter;
import com.ibm.fhir.operation.bulkdata.config.ConfigurationFactory;

/**
Expand Down Expand Up @@ -184,7 +185,7 @@ public void writeDirectly(String workItem, InputStream in, int size) throws Exce
initializeBlobClient(workItem);

AppendBlobClient aClient = client.getAppendBlobClient();
if (!client.exists().booleanValue()) {
if (!client.exists().booleanValue() && size > 0) {
aClient.create();
}

Expand All @@ -199,7 +200,9 @@ public void writeDirectly(String workItem, InputStream in, int size) throws Exce
tmpPayload = Arrays.copyOfRange(payload, 0, len);
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(tmpPayload)) {
aClient.appendBlock(bais, len);
if (len > 0) {
aClient.appendBlock(bais, len);
}
}
len = in.read(payload, 0, MAX_BLOCK_SIZE);
}
Expand All @@ -218,11 +221,13 @@ public void writeResources(String mediaType, List<ReadResultDTO> dtos) throws Ex
initializeBlobClient(workItem);

AppendBlobClient aClient = client.getAppendBlobClient();
if (!client.exists().booleanValue()) {
aClient.create();
}

byte[] baos = chunkData.getBufferStream().toByteArray();

// Only create if it's not empty.
if (!client.exists().booleanValue() && baos.length > 0) {
aClient.create();
}
int current = 0;
for (int i = 0; i <= (Math.ceil(baos.length/MAX_BLOCK_SIZE)); i++) {
int payloadLength = MAX_BLOCK_SIZE;
Expand All @@ -237,13 +242,19 @@ public void writeResources(String mediaType, List<ReadResultDTO> dtos) throws Ex
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Byte Progress: current='" + current + "' total='" + baos.length + "' payload='" + payload.length);
}
aClient.appendBlock(
new ByteArrayInputStream(payload),
payload.length);
if (payload.length > 0) {
aClient.appendBlock(
new ByteArrayInputStream(payload),
payload.length);
}
}

LOG.fine(() -> "Export Write is finished");

if (dtos != null) {
dtos.clear();
}
chunkData.setPartNum(chunkData.getPartNum() + 1);
chunkData.getBufferStream().reset();

if (chunkData.isFinishCurrentUpload()) {
Expand All @@ -260,6 +271,9 @@ public void writeResources(String mediaType, List<ReadResultDTO> dtos) throws Ex
}
}

ConfigurationAdapter config = ConfigurationFactory.getInstance();
long resourceCountThreshold = config.getCoreAzureObjectResourceCountThreshold();
long sizeThreshold = config.getCoreAzureObjectSizeThreshold();
if (chunkData.getPageNum() < chunkData.getLastPageNum()) {
chunkData.setPartNum(1);
chunkData.setUploadId(null);
Expand All @@ -268,6 +282,8 @@ public void writeResources(String mediaType, List<ReadResultDTO> dtos) throws Ex
chunkData.setFinishCurrentUpload(false);
chunkData.getCosDataPacks().clear();
chunkData.setUploadCount(chunkData.getUploadCount() + 1);
} else if (chunkData.getCurrentUploadSize() >= sizeThreshold || resourceCountThreshold >= chunkData.getCurrentUploadResourceNum()){
chunkData.setUploadCount(chunkData.getUploadCount() + 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,19 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertFalse;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;
import jakarta.json.JsonReaderFactory;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
Expand All @@ -47,6 +45,11 @@
import com.ibm.fhir.model.type.Url;
import com.ibm.fhir.server.test.FHIRServerTestBase;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;
import jakarta.json.JsonReaderFactory;

/**
* These tests exercise the $import operation a bulkdata proposal
*
Expand Down Expand Up @@ -222,7 +225,45 @@ public Response polling(String statusUrl) throws InterruptedException {
return response;
}

@Test(groups = { TEST_GROUP_NAME })
@Test
public void testCreateDeletedPatient() throws Exception {
WebTarget target = getWebTarget();
// Build a new Patient and then call the 'create' API.
Patient patient = TestUtil.readLocalResource("Patient_JohnDoe.json");
patient = patient.toBuilder().id("12345-DELETED-12345").build();
Entity<Patient> entity = Entity.entity(patient, FHIRMediaType.APPLICATION_FHIR_JSON);
//@formatter:off
Response response =
target.path("Patient/12345-DELETED-12345")
.request()
.header("X-FHIR-TENANT-ID", tenantName)
.header("X-FHIR-DSID", dataStoreId)
.put(entity, Response.class);
//@formatter:on
assertResponse(response, Response.Status.CREATED.getStatusCode());
URI location = response.getLocation();
assertNotNull(location);
assertNotNull(location.toString());
assertFalse(location.toString().isEmpty());

// Next, call the 'read' API to retrieve the new patient and verify it.
//@formatter:off
response = target.path("Patient/12345-DELETED-12345")
.request(FHIRMediaType.APPLICATION_FHIR_JSON)
.header("X-FHIR-TENANT-ID", tenantName)
.header("X-FHIR-DSID", dataStoreId)
.get();
//@formatter:on
assertResponse(response, Response.Status.OK.getStatusCode());
response = target.path("Patient/12345-DELETED-12345")
.request(FHIRMediaType.APPLICATION_FHIR_JSON)
.header("X-FHIR-TENANT-ID", tenantName)
.header("X-FHIR-DSID", dataStoreId)
.delete();
assertResponse(response, Response.Status.OK.getStatusCode());
}

@Test(groups = { TEST_GROUP_NAME }, dependsOnMethods = {"testCreateDeletedPatient"})
public void testImportFromFileDefault() throws Exception {
if (ON) {
String path = BASE_VALID_URL;
Expand Down Expand Up @@ -251,7 +292,7 @@ public void testImportFromFileDefault() throws Exception {
}
}

@Test(groups = { TEST_GROUP_NAME })
@Test(groups = { TEST_GROUP_NAME }, dependsOnMethods = {"testCreateDeletedPatient"})
public void testImportFromFileDefaultEmpty() throws Exception {
if (ON) {
String path = BASE_VALID_URL;
Expand Down Expand Up @@ -336,7 +377,7 @@ public void checkOnResource(String id) {
assertTrue(bundle.getEntry().size() >= 1);
}

@Test(groups = { TEST_GROUP_NAME })
@Test(groups = { TEST_GROUP_NAME }, dependsOnMethods = {"testCreateDeletedPatient"})
public void testImportFromS3() throws Exception {
if (ON) {
String path = BASE_VALID_URL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
{"resourceType":"Patient","id":"72b0d93c-93d0-43d9-94a8-d5154ce07152","meta":{"versionId":"1","lastUpdated":"2020-04-17T00:09:36.910431Z"},"name":[{"family":"Doe","given":["import-1"]}],"telecom":[{"system":"phone","value":"555-1122","use":"home"}],"birthDate":"1970-01-01"}
{"resourceType":"Patient","id":"72b0d93c-93d0-43d9-94a8-d5154ce07152","meta":{"versionId":"1","lastUpdated":"2020-04-17T00:09:36.910431Z"},"name":[{"family":"Doe","given":["import-1"]}],"telecom":[{"system":"phone","value":"555-1122","use":"home"}],"birthDate":"1970-01-01"}
{"resourceType":"Patient","id":"72b0d93c-93d0-43d9-94a8-d5154ce07152","meta":{"versionId":"1","lastUpdated":"2020-04-17T00:09:36.910431Z"},"name":[{"family":"Doe","given":["import-1"]}],"telecom":[{"system":"phone","value":"555-1122","use":"home"}],"birthDate":"1970-01-01"}
{"resourceType":"Patient","id":"72b0d93c-93d0-43d9-94a8-d5154ce07152","meta":{"versionId":"1","lastUpdated":"2020-04-17T00:09:36.910431Z"},"name":[{"family":"Doe","given":["import-1"]}],"telecom":[{"system":"phone","value":"555-1122","use":"home"}],"birthDate":"1970-01-01"}
{"resourceType":"Patient","id":"72b0d93c-93d0-43d9-94a8-d5154ce07152","meta":{"versionId":"1","lastUpdated":"2020-04-17T00:09:36.910431Z"},"name":[{"family":"Doe","given":["import-1"]}],"telecom":[{"system":"phone","value":"555-1122","use":"home"}],"birthDate":"1970-01-01"}
{"resourceType":"Patient","id":"12345-DELETED-12345","meta":{"versionId":"1","lastUpdated":"2020-04-17T00:09:36.910431Z"},"name":[{"family":"Doe","given":["import-1"]}],"telecom":[{"system":"phone","value":"555-1122","use":"home"}],"birthDate":"1970-01-01"}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@ public interface ConfigurationAdapter {
*/
int getCoreCosObjectResourceCountThreshold();

/**
* The size (in bytes) at which to finish writing to a given AzureBlob object,
*
* @implNote System value.
*
* @return
*/
long getCoreAzureObjectSizeThreshold();

/**
* The number of resources at which to finish writing to a given AzureBlob object,
*
* @implNote System value.
*
* @return
*/
int getCoreAzureObjectResourceCountThreshold();

/**
* @implNote System value.
* @return the system wide setting for using the server truststore.
Expand Down
Loading

0 comments on commit 45ba58d

Please sign in to comment.