Skip to content

Commit

Permalink
Merge pull request #1116 from IBM/albert-master
Browse files Browse the repository at this point in the history
issue #779 #1150 partitionize bulkexport JavaBatch Job & fix CWWKY0041W warnings
  • Loading branch information
albertwang-ibm authored May 26, 2020
2 parents 9e19a8a + 356ab6a commit 85e7102
Show file tree
Hide file tree
Showing 30 changed files with 705 additions and 378 deletions.
6 changes: 6 additions & 0 deletions docs/src/pages/guides/FHIRServerUsersGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,8 @@ This section contains reference information about each of the configuration prop
|`fhirServer/bulkdata/validBaseUrls`|string|The list of supported urls which are approved for the fhir server to access|
|`fhirServer/bulkdata/validBaseUrlsDisabled`|boolean|Disables the URL checking feature|
|`fhirServer/bulkdata/maxInputPerRequest`|integer|The maximum inputs per bulk import|
|`fhirServer/bulkdata/cosFileMaxResources`|int|The maximum number of FHIR resources per COS file, "-1" means no limit, the default value is 500000 |
|`fhirServer/bulkdata/cosFileMaxSize`|int|The maximum COS file size in bytes, "-1" means no limit, the default value is 209715200 (200M) |


### 5.1.2 Default property values
Expand Down Expand Up @@ -1532,6 +1534,8 @@ This section contains reference information about each of the configuration prop
|`fhirServer/audit/serviceProperties/geoCounty`|US|
|`fhirServer/bulkdata/isExportPublic`|true|
|`fhirServer/bulkdata/validBaseUrlsDisabled`|false|
|`fhirServer/bulkdata/cosFileMaxResources`|500000|
|`fhirServer/bulkdata/cosFileMaxSize`|209715200|


### 5.1.3 Property attributes
Expand Down Expand Up @@ -1596,6 +1600,8 @@ must restart the server for that change to take effect.
|`fhirServer/bulkdata/validBaseUrls`|Y|Y|
|`fhirServer/bulkdata/maxInputPerRequest`|Y|Y|
|`fhirServer/bulkdata/validBaseUrlsDisabled`|Y|Y|
|`fhirServer/bulkdata/cosFileMaxResources`|N|Y|
|`fhirServer/bulkdata/cosFileMaxSize`|N|Y|

## 5.2 Keystores, truststores, and the FHIR server

Expand Down
4 changes: 4 additions & 0 deletions fhir-bulkimportexport-webapp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>org.glassfish</groupId>
<artifactId>jakarta.json</artifactId>
</dependency>
<dependency>
<groupId>jakarta.enterprise</groupId>
<artifactId>jakarta.enterprise.cdi-api</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>fhir-persistence</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd" id="bulkexportchunkjob" restartable="true" version="1.0">
<properties>
<property name="cos.pagesperobject" value="#{jobParameters['cos.pagesperobject']}?:10;" />
</properties>
<listeners>
<listener ref="com.ibm.fhir.bulkexport.system.ExportJobListener"/>
</listeners>
<step id="step1">
<chunk checkpoint-policy="custom" item-count="#{jobProperties['cos.pagesperobject']}">
<chunk checkpoint-policy="custom">
<reader ref="com.ibm.fhir.bulkexport.system.ChunkReader">
<properties >
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
<property name="fhir.tenant" value="#{jobParameters['fhir.tenant']}"/>
<property name="fhir.datastoreid" value="#{jobParameters['fhir.datastoreid']}"/>
<property name="fhir.resourcetype" value="#{jobParameters['fhir.resourcetype']}"/>
<property name="fhir.search.fromdate" value="#{jobParameters['fhir.search.fromdate']}"/>
<property name="fhir.search.todate" value="#{jobParameters['fhir.search.todate']}"/>
<property name="fhir.search.pagesize" value="#{jobParameters['fhir.search.pagesize']}"/>
<property name="cos.bucket.objectname" value="#{jobParameters['cos.bucket.objectname']}"/>
<property name="fhir.typeFilters" value="#{jobParameters['fhir.typeFilters']}"/>
</properties>
</reader>
Expand All @@ -26,16 +25,24 @@
<property name="cos.credential.ibm" value="#{jobParameters['cos.credential.ibm']}"/>
<property name="cos.bucket.name" value="#{jobParameters['cos.bucket.name']}"/>
<property name="cos.bucket.pathprefix" value="#{jobParameters['cos.bucket.pathprefix']}"/>
<property name="cos.bucket.objectname" value="#{jobParameters['cos.bucket.objectname']}"/>
<property name="fhir.resourcetype" value="#{jobParameters['fhir.resourcetype']}"/>
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
</properties>
</writer>
<checkpoint-algorithm ref="com.ibm.fhir.bulkexport.common.CheckPointAlgorithm">
<properties>
<property name="cos.pagesperobject" value="#{jobParameters['cos.pagesperobject']}"/>
<property name="cos.bucket.maxfilesize" value="#{jobParameters['cos.bucket.maxfilesize']}"/>
<property name="cos.bucket.filemaxsize" value="#{jobParameters['cos.bucket.filemaxsize']}"/>
<property name="cos.bucket.filemaxresources" value="#{jobParameters['cos.bucket.filemaxresources']}"/>
</properties>
</checkpoint-algorithm>
</chunk>
<partition>
<mapper ref="com.ibm.fhir.bulkexport.system.SystemExportPartitionMapper">
<properties>
<property name="fhir.resourcetype" value="#{jobParameters['fhir.resourcetype']}"/>
</properties>
</mapper>
<collector ref="com.ibm.fhir.bulkexport.system.ExportPartitionCollector"/>
<analyzer ref="com.ibm.fhir.bulkexport.system.ExportPartitionAnalyzer"/>
</partition>
</step>
</job>
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd" id="bulkgroupexportchunkjob" restartable="true" version="1.0">
<listeners>
<listener ref="com.ibm.fhir.bulkexport.system.ExportJobListener"/>
</listeners>
<step id="step1">
<chunk checkpoint-policy="item" item-count="1">
<chunk checkpoint-policy="custom">
<reader ref="com.ibm.fhir.bulkexport.group.ChunkReader">
<properties >
<property name="fhir.tenant" value="#{jobParameters['fhir.tenant']}"/>
<property name="fhir.datastoreid" value="#{jobParameters['fhir.datastoreid']}"/>
<property name="fhir.resourcetype" value="#{jobParameters['fhir.resourcetype']}"/>
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
<property name="fhir.search.fromdate" value="#{jobParameters['fhir.search.fromdate']}"/>
<property name="fhir.search.todate" value="#{jobParameters['fhir.search.todate']}"/>
<property name="fhir.search.pagesize" value="#{jobParameters['fhir.search.pagesize']}"/>
<property name="fhir.search.patientgroupid" value="#{jobParameters['fhir.search.patientgroupid']}"/>
<property name="fhir.typeFilters" value="#{jobParameters['fhir.typeFilters']}"/>
</properties>
</reader>
<writer ref="com.ibm.fhir.bulkexport.patient.ChunkWriter">
<writer ref="com.ibm.fhir.bulkexport.system.ChunkWriter">
<properties>
<property name="cos.api.key" value="#{jobParameters['cos.api.key']}"/>
<property name="cos.srvinst.id" value="#{jobParameters['cos.srvinst.id']}"/>
Expand All @@ -23,9 +26,24 @@
<property name="cos.credential.ibm" value="#{jobParameters['cos.credential.ibm']}"/>
<property name="cos.bucket.name" value="#{jobParameters['cos.bucket.name']}"/>
<property name="cos.bucket.pathprefix" value="#{jobParameters['cos.bucket.pathprefix']}"/>
<property name="fhir.resourcetype" value="#{jobParameters['fhir.resourcetype']}"/>
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
</properties>
</writer>
<checkpoint-algorithm ref="com.ibm.fhir.bulkexport.common.CheckPointAlgorithm">
<properties>
<property name="cos.bucket.filemaxsize" value="#{jobParameters['cos.bucket.filemaxsize']}"/>
<property name="cos.bucket.filemaxresources" value="#{jobParameters['cos.bucket.filemaxresources']}"/>
</properties>
</checkpoint-algorithm>
</chunk>
<partition>
<mapper ref="com.ibm.fhir.bulkexport.patient.PatientExportPartitionMapper">
<properties>
<property name="fhir.resourcetype" value="#{jobParameters['fhir.resourcetype']}"/>
</properties>
</mapper>
<collector ref="com.ibm.fhir.bulkexport.system.ExportPartitionCollector"/>
<analyzer ref="com.ibm.fhir.bulkexport.system.ExportPartitionAnalyzer"/>
</partition>
</step>
</job>
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd" id="bulkpatientexportchunkjob" restartable="true" version="1.0">
<properties>
<property name="cos.pagesperobject" value="#{jobParameters['cos.pagesperobject']}?:10;" />
</properties>
<listeners>
<listener ref="com.ibm.fhir.bulkexport.system.ExportJobListener"/>
</listeners>
<step id="step1">
<chunk checkpoint-policy="custom" item-count="#{jobProperties['cos.pagesperobject']}">
<chunk checkpoint-policy="custom">
<reader ref="com.ibm.fhir.bulkexport.patient.ChunkReader">
<properties >
<property name="fhir.tenant" value="#{jobParameters['fhir.tenant']}"/>
<property name="fhir.datastoreid" value="#{jobParameters['fhir.datastoreid']}"/>
<property name="fhir.resourcetype" value="#{jobParameters['fhir.resourcetype']}"/>
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
<property name="fhir.search.fromdate" value="#{jobParameters['fhir.search.fromdate']}"/>
<property name="fhir.search.todate" value="#{jobParameters['fhir.search.todate']}"/>
<property name="fhir.search.pagesize" value="#{jobParameters['fhir.search.pagesize']}"/>
<property name="fhir.typeFilters" value="#{jobParameters['fhir.typeFilters']}"/>
</properties>
</reader>
<writer ref="com.ibm.fhir.bulkexport.patient.ChunkWriter">
<writer ref="com.ibm.fhir.bulkexport.system.ChunkWriter">
<properties>
<property name="cos.api.key" value="#{jobParameters['cos.api.key']}"/>
<property name="cos.srvinst.id" value="#{jobParameters['cos.srvinst.id']}"/>
Expand All @@ -25,15 +25,24 @@
<property name="cos.credential.ibm" value="#{jobParameters['cos.credential.ibm']}"/>
<property name="cos.bucket.name" value="#{jobParameters['cos.bucket.name']}"/>
<property name="cos.bucket.pathprefix" value="#{jobParameters['cos.bucket.pathprefix']}"/>
<property name="fhir.resourcetype" value="#{jobParameters['fhir.resourcetype']}"/>
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
</properties>
</writer>
<checkpoint-algorithm ref="com.ibm.fhir.bulkexport.common.CheckPointAlgorithm">
<properties>
<property name="cos.pagesperobject" value="#{jobParameters['cos.pagesperobject']}"/>
<property name="cos.bucket.maxfilesize" value="#{jobParameters['cos.bucket.maxfilesize']}"/>
<property name="cos.bucket.filemaxsize" value="#{jobParameters['cos.bucket.filemaxsize']}"/>
<property name="cos.bucket.filemaxresources" value="#{jobParameters['cos.bucket.filemaxresources']}"/>
</properties>
</checkpoint-algorithm>
</chunk>
<partition>
<mapper ref="com.ibm.fhir.bulkexport.patient.PatientExportPartitionMapper">
<properties>
<property name="fhir.resourcetype" value="#{jobParameters['fhir.resourcetype']}"/>
</properties>
</mapper>
<collector ref="com.ibm.fhir.bulkexport.system.ExportPartitionCollector"/>
<analyzer ref="com.ibm.fhir.bulkexport.system.ExportPartitionAnalyzer"/>
</partition>
</step>
</job>
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
<chunk checkpoint-policy="item" item-count="#{jobProperties['fhir.cosreadsperdbbatch']}">
<reader ref="com.ibm.fhir.bulkimport.ChunkReader">
<properties>
<property name="import.partiton.workitem" value="#{partitionPlan['import.partiton.workitem']}"/>
<property name="import.partiton.resourcetype" value="#{partitionPlan['import.partiton.resourcetype']}"/>
<property name="import.partition.workitem" value="#{partitionPlan['import.partition.workitem']}"/>
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
<property name="cos.api.key" value="#{jobParameters['cos.api.key']}"/>
<property name="cos.srvinst.id" value="#{jobParameters['cos.srvinst.id']}"/>
<property name="cos.endpointurl" value="#{jobParameters['cos.endpointurl']}"/>
Expand All @@ -29,7 +29,7 @@
<properties >
<property name="fhir.tenant" value="#{jobParameters['fhir.tenant']}"/>
<property name="fhir.datastoreid" value="#{jobParameters['fhir.datastoreid']}"/>
<property name="import.partiton.resourcetype" value="#{partitionPlan['import.partiton.resourcetype']}"/>
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
<property name="cos.api.key" value="#{jobParameters['cos.api.key']}"/>
<property name="cos.srvinst.id" value="#{jobParameters['cos.srvinst.id']}"/>
<property name="cos.endpointurl" value="#{jobParameters['cos.endpointurl']}"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -48,6 +50,7 @@
import com.ibm.cloud.objectstorage.services.s3.model.GetObjectRequest;
import com.ibm.cloud.objectstorage.services.s3.model.InitiateMultipartUploadRequest;
import com.ibm.cloud.objectstorage.services.s3.model.InitiateMultipartUploadResult;
import com.ibm.cloud.objectstorage.services.s3.model.ObjectMetadata;
import com.ibm.cloud.objectstorage.services.s3.model.PartETag;
import com.ibm.cloud.objectstorage.services.s3.model.S3Object;
import com.ibm.cloud.objectstorage.services.s3.model.S3ObjectInputStream;
Expand Down Expand Up @@ -88,6 +91,11 @@ public static String startPartUpload(AmazonS3 cosClient, String bucketName, Stri
InitiateMultipartUploadRequest initMultipartUploadReq = new InitiateMultipartUploadRequest(bucketName, itemName);
if (isPublicAccess) {
initMultipartUploadReq.setCannedACL(CannedAccessControlList.PublicRead);
ObjectMetadata metadata = new ObjectMetadata();
// Set expiration time to 2 hours(7200 seconds).
// Note: IBM COS doesn't honor this but also doesn't fail on this.
metadata.setExpirationTime(Date.from(Instant.now().plusSeconds(7200)));
initMultipartUploadReq.setObjectMetadata(metadata);
}

InitiateMultipartUploadResult mpResult = cosClient.initiateMultipartUpload(initMultipartUploadReq);
Expand Down Expand Up @@ -420,7 +428,7 @@ public static Map<Class<? extends Resource>, List<Map<String, List<String>>>> ge
}
return searchParametersForResoureTypes;
}

public static JsonArray getDataSourcesFromJobInput(String dataSourcesInfo) {
try (JsonReader reader =
Json.createReader(new StringReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ public class Constants {
// The minimal size (5M bytes) for COS multiple-parts upload.
public static final int COS_PART_MINIMALSIZE = 5242880;
public static final int DEFAULT_SEARCH_PAGE_SIZE = 1000;
public static final int DEFAULT_NUMOFPAGES_EACH_COS_OBJECT = 10;
public static final int DEFAULT_NUMOFOBJECTS_PERREAD = 1;
public static final int DEFAULT_MAXCOSFILE_SIZE = 104857600;
public static final int DEFAULT_COSFILE_MAX_SIZE = 209715200;
public static final int DEFAULT_COSFILE_MAX_RESOURCESNUMBER = 500000;
public static final String FHIR_SEARCH_LASTUPDATED = "_lastUpdated";
public static final byte[] NDJSON_LINESEPERATOR = "\r\n".getBytes();

public static final int IMPORT_MAX_PARTITIONPROCESSING_THREADNUMBER = 10;
public static final int EXPORT_MAX_PARTITIONPROCESSING_THREADNUMBER = 10;
// The number of resources to commit to DB in each batch, the slower the DB connection, the smaller
// this value should be set.
public static final int IMPORT_NUMOFFHIRRESOURCES_PERREAD = 20;
Expand All @@ -40,17 +40,26 @@ public class Constants {
public static final String COS_LOCATION = "cos.location";
public static final String COS_BUCKET_NAME = "cos.bucket.name";
public static final String COS_IS_IBM_CREDENTIAL = "cos.credential.ibm";
public static final String COS_BUCKET_FILE_MAX_SIZE = "cos.bucket.filemaxsize";
public static final String COS_BUCKET_FILE_MAX_RESOURCES = "cos.bucket.filemaxresources";
// COS bucket for import OperationOutcomes
public static final String COS_OPERATIONOUTCOMES_BUCKET_NAME = "cos.operationoutcomes.bucket.name";
public static final String FHIR_TENANT = "fhir.tenant";
public static final String FHIR_DATASTORE_ID = "fhir.datastoreid";
public static final String FHIR_RESOURCETYPES = "fhir.resourcetype";
public static final String IMPORT_FHIR_STORAGE_TYPE = "import.fhir.storagetype";
public static final String IMPORT_FHIR_IS_VALIDATION_ON = "import.fhir.validation";
public static final String IMPORT_FHIR_DATASOURCES = "fhir.dataSourcesInfo";

public static final String EXPORT_FHIR_SEARCH_FROMDATE = "fhir.search.fromdate";
public static final String EXPORT_FHIR_SEARCH_TODATE = "fhir.search.todate";
public static final String EXPORT_FHIR_SEARCH_PAGESIZE = "fhir.search.pagesize";
public static final String EXPORT_FHIR_SEARCH_TYPEFILTERS = "fhir.typeFilters";
public static final String EXPORT_FHIR_SEARCH_PATIENTGROUPID = "fhir.search.patientgroupid";
public static final String EXPORT_COS_OBJECT_PATHPREFIX = "cos.bucket.pathprefix";

// Partition work item info generated in ImportPartitionMapper.
public static final String IMPORT_PARTITTION_WORKITEM = "import.partiton.workitem";
public static final String IMPORT_PARTITTION_RESOURCE_TYPE = "import.partiton.resourcetype";
public static final String IMPORT_PARTITTION_WORKITEM = "import.partition.workitem";
public static final String PARTITION_RESOURCE_TYPE = "partition.resourcetype";

// Control if push OperationOutcomes to COS/S3.
public static final boolean IMPORT_IS_COLLECT_OPERATIONOUTCOMES = true;
Expand Down
Loading

0 comments on commit 85e7102

Please sign in to comment.