From 0694a550adac3efff2277ddeda6c51bb1c0014fd Mon Sep 17 00:00:00 2001 From: Albert Wang Date: Mon, 18 May 2020 15:59:40 -0400 Subject: [PATCH 1/6] issue #779 partitionize bulkexport javabatch job Signed-off-by: Albert Wang --- .../batch-jobs/FhirBulkExportChunkJob.xml | 13 +++- .../FhirBulkExportGroupChunkJob.xml | 13 +++- .../FhirBulkExportPatientChunkJob.xml | 13 +++- .../batch-jobs/FhirBulkImportChunkJob.xml | 6 +- .../com/ibm/fhir/bulkcommon/Constants.java | 15 +++- .../bulkexport/common/CheckPointUserData.java | 23 +++--- .../bulkexport/common/TransientUserData.java | 6 +- .../fhir/bulkexport/group/ChunkReader.java | 24 +++---- .../fhir/bulkexport/patient/ChunkReader.java | 71 ++++++------------- .../fhir/bulkexport/patient/ChunkWriter.java | 47 ------------ .../patient/PatientExportPartitionMapper.java | 66 +++++++++++++++++ .../fhir/bulkexport/system/ChunkReader.java | 65 ++++++----------- .../fhir/bulkexport/system/ChunkWriter.java | 65 ++++++++--------- .../system/ExportPartitionAnalyzer.java | 49 +++++++++++++ .../system/ExportPartitionCollector.java | 44 ++++++++++++ .../system/SystemExportPartitionMapper.java | 60 ++++++++++++++++ .../com/ibm/fhir/bulkimport/ChunkReader.java | 2 +- .../com/ibm/fhir/bulkimport/ChunkWriter.java | 2 +- .../bulkimport/ImportPartitionMapper.java | 8 +-- 19 files changed, 365 insertions(+), 227 deletions(-) delete mode 100644 fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkWriter.java create mode 100644 fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/PatientExportPartitionMapper.java create mode 100644 fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java create mode 100644 fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java create mode 100644 fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/SystemExportPartitionMapper.java diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml index af4fd04b28f..4214fa2d49f 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml +++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml @@ -7,9 +7,9 @@ + - @@ -27,7 +27,7 @@ - + @@ -37,5 +37,14 @@ + + + + + + + + + \ No newline at end of file diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml index b675578fc47..ad8fc52fa1b 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml +++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml @@ -6,7 +6,7 @@ - + @@ -14,7 +14,7 @@ - + @@ -23,9 +23,16 @@ - + + + + + + + + \ No newline at end of file diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml index 43f3dc26813..a66457c6ee2 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml +++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml @@ -9,14 +9,14 @@ - + - + @@ -25,7 +25,7 @@ - + @@ -35,5 +35,12 @@ + + + + + + + \ No newline at end of file diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkImportChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkImportChunkJob.xml index 61daffbc45e..aaea80f622f 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkImportChunkJob.xml +++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkImportChunkJob.xml @@ -14,8 +14,8 @@ - - + + @@ -29,7 +29,7 @@ - + diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java index 1a95f597ff7..0e1d8290741 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java @@ -25,6 +25,7 @@ public class Constants { public static final byte[] NDJSON_LINESEPERATOR = "\r\n".getBytes(); public static final int IMPORT_MAX_PARTITIONPROCESSING_THREADNUMBER = 10; + public static final int EXPORT_MAX_PARTITIONPROCESSING_THREADNUMBER = 10; // The number of resources to commit to DB in each batch, the slower the DB connection, the smaller // this value should be set. public static final int IMPORT_NUMOFFHIRRESOURCES_PERREAD = 20; @@ -44,13 +45,21 @@ public class Constants { public static final String COS_OPERATIONOUTCOMES_BUCKET_NAME = "cos.operationoutcomes.bucket.name"; public static final String FHIR_TENANT = "fhir.tenant"; public static final String FHIR_DATASTORE_ID = "fhir.datastoreid"; + public static final String FHIR_RESOURCETYPES = "fhir.resourcetype"; public static final String IMPORT_FHIR_STORAGE_TYPE = "import.fhir.storagetype"; public static final String IMPORT_FHIR_IS_VALIDATION_ON = "import.fhir.validation"; public static final String IMPORT_FHIR_DATASOURCES = "fhir.dataSourcesInfo"; - + public static final String EXPORT_FHIR_SEARCH_FROMDATE = "fhir.search.fromdate"; + public static final String EXPORT_FHIR_SEARCH_TODATE = "fhir.search.todate"; + public static final String EXPORT_FHIR_SEARCH_PAGESIZE = "fhir.search.pagesize"; + public static final String EXPORT_FHIR_SEARCH_TYPEFILTERS = "fhir.typeFilters"; + public static final String EXPORT_FHIR_SEARCH_PATIENTGROUPID = "fhir.search.patientgroupid"; + public static final String EXPORT_COS_OBJECTNAME = "cos.bucket.objectname"; + public static final String EXPORT_COS_OBJECT_PATHPREFIX = "cos.bucket.pathprefix"; + // Partition work item info generated in ImportPartitionMapper. - public static final String IMPORT_PARTITTION_WORKITEM = "import.partiton.workitem"; - public static final String IMPORT_PARTITTION_RESOURCE_TYPE = "import.partiton.resourcetype"; + public static final String IMPORT_PARTITTION_WORKITEM = "import.partition.workitem"; + public static final String PARTITION_RESOURCE_TYPE = "partition.resourcetype"; // Control if push OperationOutcomes to COS/S3. public static final boolean IMPORT_IS_COLLECT_OPERATIONOUTCOMES = true; diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java index 6c19f3a79d8..6cd1a3fd436 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java @@ -22,24 +22,23 @@ public class CheckPointUserData implements java.io.Serializable { private String uploadId; private boolean isSingleCosObject = false; private List cosDataPacks; - private int indexOfCurrentResourceType; private int currentPartResourceNum = 0; // One resource type can have 0 to multiple typeFilters, indexOfCurrentTypeFilter is used to tell the currently processed typeFilter. private int indexOfCurrentTypeFilter; + private String resourceTypeSummary = null; - public CheckPointUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentResourceType, int indexOfCurrentTypeFilter) { + public CheckPointUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentTypeFilter) { super(); this.pageNum = pageNum; this.uploadId = uploadId; this.cosDataPacks = cosDataPacks; this.partNum = partNum; - this.indexOfCurrentResourceType = indexOfCurrentResourceType; this.indexOfCurrentTypeFilter = indexOfCurrentTypeFilter; } public static CheckPointUserData fromTransientUserData(TransientUserData userData) { return new CheckPointUserData(userData.getPageNum(), userData.getUploadId(), userData.getCosDataPacks(), - userData.getPartNum(), userData.getIndexOfCurrentResourceType(), userData.getIndexOfCurrentTypeFilter()); + userData.getPartNum(), userData.getIndexOfCurrentTypeFilter()); } public int getPageNum() { @@ -90,14 +89,6 @@ public void setLastPageNum(int lastPageNum) { this.lastPageNum = lastPageNum; } - public int getIndexOfCurrentResourceType() { - return indexOfCurrentResourceType; - } - - public void setIndexOfCurrentResourceType(int indexOfCurrentResourceType) { - this.indexOfCurrentResourceType = indexOfCurrentResourceType; - } - public int getCurrentPartResourceNum() { return currentPartResourceNum; } @@ -114,4 +105,12 @@ public void setIndexOfCurrentTypeFilter(int indexOfCurrentTypeFilter) { this.indexOfCurrentTypeFilter = indexOfCurrentTypeFilter; } + public String getResourceTypeSummary() { + return resourceTypeSummary; + } + + public void setResourceTypeSummary(String resourceTypeSummary) { + this.resourceTypeSummary = resourceTypeSummary; + } + } diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java index 107d9ff2403..d8f06187d5e 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java @@ -19,13 +19,13 @@ public class TransientUserData extends CheckPointUserData { private static final long serialVersionUID = -5892726731783560418L; private ByteArrayOutputStream bufferStream = new ByteArrayOutputStream(); - public TransientUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentResourceType, int indexOfCurrentTypeFilter) { - super(pageNum, uploadId, cosDataPacks, partNum, indexOfCurrentResourceType, indexOfCurrentTypeFilter); + public TransientUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentTypeFilter) { + super(pageNum, uploadId, cosDataPacks, partNum, indexOfCurrentTypeFilter); } public static TransientUserData fromCheckPointUserData(CheckPointUserData checkPointData) { return new TransientUserData(checkPointData.getPageNum(), checkPointData.getUploadId(), - checkPointData.getCosDataPacks(), checkPointData.getPartNum(), checkPointData.getIndexOfCurrentResourceType(), checkPointData.getIndexOfCurrentTypeFilter()); + checkPointData.getCosDataPacks(), checkPointData.getPartNum(), checkPointData.getIndexOfCurrentTypeFilter()); } public ByteArrayOutputStream getBufferStream() { diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java index 9d509f98c1c..a0a19d3723a 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java @@ -16,10 +16,11 @@ import java.util.stream.Collectors; import javax.batch.api.BatchProperty; -import javax.batch.runtime.context.JobContext; +import javax.batch.runtime.context.StepContext; import javax.inject.Inject; import com.ibm.cloud.objectstorage.services.s3.model.PartETag; +import com.ibm.fhir.bulkcommon.Constants; import com.ibm.fhir.bulkexport.common.TransientUserData; import com.ibm.fhir.model.resource.Group; import com.ibm.fhir.model.resource.Group.Member; @@ -42,11 +43,11 @@ public class ChunkReader extends com.ibm.fhir.bulkexport.patient.ChunkReader { * Fhir search patient group id. */ @Inject - @BatchProperty(name = "fhir.search.patientgroupid") + @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_PATIENTGROUPID) String fhirSearchPatientGroupId; @Inject - JobContext jobContext; + StepContext stepCtx; public ChunkReader() { super(); @@ -98,17 +99,9 @@ public Object readItem() throws Exception { throw new Exception("readItem: missing group id for this group export job!"); } - TransientUserData chunkData = (TransientUserData) jobContext.getTransientUserData(); + TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData(); if (chunkData != null && pageNum > chunkData.getLastPageNum()) { - if (resourceTypes.size() == indexOfCurrentResourceType + 1) { - // No more resource type and page to read, so return null to end the reading. - return null; - } else { - // More resource types to read, so reset pageNum, partNum and move resource type index to the next. - pageNum = 1; - chunkData.setPartNum(1); - indexOfCurrentResourceType++; - } + return null; } if (patientMembers == null) { @@ -123,10 +116,9 @@ public Object readItem() throws Exception { pageNum++; if (chunkData == null) { - chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, 0); - jobContext.setTransientUserData(chunkData); + chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0); + stepCtx.setTransientUserData(chunkData); } else { - chunkData.setIndexOfCurrentResourceType(indexOfCurrentResourceType); chunkData.setPageNum(pageNum); } chunkData.setLastPageNum((patientMembers.size() + pageSize -1)/pageSize ); diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java index 1570b7d0c1f..6ee50890dbc 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java @@ -21,7 +21,7 @@ import javax.batch.api.BatchProperty; import javax.batch.api.chunk.AbstractItemReader; -import javax.batch.runtime.context.JobContext; +import javax.batch.runtime.context.StepContext; import javax.inject.Inject; import com.ibm.cloud.objectstorage.services.s3.model.PartETag; @@ -41,7 +41,6 @@ import com.ibm.fhir.persistence.context.FHIRPersistenceContextFactory; import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper; import com.ibm.fhir.persistence.helper.FHIRTransactionHelper; -import com.ibm.fhir.search.compartment.CompartmentUtil; import com.ibm.fhir.search.context.FHIRSearchContext; import com.ibm.fhir.search.util.SearchUtil; @@ -52,12 +51,11 @@ public class ChunkReader extends AbstractItemReader { private final static Logger logger = Logger.getLogger(ChunkReader.class.getName()); protected int pageNum = 1; - protected int indexOfCurrentResourceType = 0; // Control the number of records to read in each "item". protected int pageSize = Constants.DEFAULT_SEARCH_PAGE_SIZE; protected FHIRPersistence fhirPersistence; - protected List resourceTypes; + Class resourceType; // Search parameters for resource types gotten from fhir.typeFilters job parameter. Map, List>>> searchParametersForResoureTypes = null; @@ -71,7 +69,7 @@ public class ChunkReader extends AbstractItemReader { * Fhir tenant id. */ @Inject - @BatchProperty(name = "fhir.tenant") + @BatchProperty(name = Constants.FHIR_TENANT) protected String fhirTenant; @@ -79,62 +77,58 @@ public class ChunkReader extends AbstractItemReader { * Fhir data store id. */ @Inject - @BatchProperty(name = "fhir.datastoreid") + @BatchProperty(name = Constants.FHIR_DATASTORE_ID) protected String fhirDatastoreId; /** - * Fhir ResourceType. + * Fhir resource type to process. */ @Inject - @BatchProperty(name = "fhir.resourcetype") - protected String fhirResourceType; + @BatchProperty(name = Constants.PARTITION_RESOURCE_TYPE) + String fhirResourceType; /** * Fhir Search from date. */ @Inject - @BatchProperty(name = "fhir.search.fromdate") + @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_FROMDATE) String fhirSearchFromDate; /** * Fhir search to date. */ @Inject - @BatchProperty(name = "fhir.search.todate") + @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_TODATE) String fhirSearchToDate; /** * Fhir search page size. */ @Inject - @BatchProperty(name = "fhir.search.pagesize") + @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_PAGESIZE) String fhirSearchPageSize; /** * Fhir export type filters. */ @Inject - @BatchProperty(name = "fhir.typeFilters") + @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_TYPEFILTERS) String fhirTypeFilters; @Inject - JobContext jobContext; + StepContext stepCtx; - /** - * @see AbstractItemReader#AbstractItemReader() - */ public ChunkReader() { super(); } protected void fillChunkDataBuffer(List patientIds) throws Exception { - TransientUserData chunkData = (TransientUserData) jobContext.getTransientUserData(); + TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData(); int indexOfCurrentTypeFilter = 0; int compartmentPageNum = 1; int resSubTotal = 0; FHIRSearchContext searchContext; - Class resourceType = ModelSupport.getResourceType(resourceTypes.get(indexOfCurrentResourceType)); if (chunkData != null) { do { @@ -163,8 +157,7 @@ protected void fillChunkDataBuffer(List patientIds) throws Exception { for (String patientId : patientIds) { - searchContext = SearchUtil.parseQueryParameters("Patient", patientId, - ModelSupport.getResourceType(resourceTypes.get(indexOfCurrentResourceType)), queryParameters, true); + searchContext = SearchUtil.parseQueryParameters("Patient", patientId, resourceType, queryParameters, true); do { searchContext.setPageSize(pageSize); searchContext.setPageNumber(compartmentPageNum); @@ -221,19 +214,10 @@ protected void fillChunkDataBuffer(List patientIds) throws Exception { @Override public Object readItem() throws Exception { - TransientUserData chunkData = (TransientUserData) jobContext.getTransientUserData(); + TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData(); if (chunkData != null && pageNum > chunkData.getLastPageNum()) { - if (resourceTypes.size() == indexOfCurrentResourceType + 1) { - // No more resource type and page to read, so return null to end the reading. + // No more page to read, so return null to end the reading. return null; - } else { - // More resource types to read, so reset pageNum, partNum and move resource type index to the next. - pageNum = 1; - chunkData.setPartNum(1); - indexOfCurrentResourceType++; - isDoDuplicationCheck = false; - loadedResourceIds.clear(); - } } FHIRSearchContext searchContext; @@ -266,17 +250,16 @@ public Object readItem() throws Exception { pageNum++; if (chunkData == null) { - chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, 0); + chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0); chunkData.setLastPageNum(searchContext.getLastPageNumber()); - jobContext.setTransientUserData(chunkData); + stepCtx.setTransientUserData(chunkData); } else { chunkData.setPageNum(pageNum); - chunkData.setIndexOfCurrentResourceType(indexOfCurrentResourceType); chunkData.setLastPageNum(searchContext.getLastPageNumber()); } if (resources != null) { - logger.fine("readItem(" + resourceTypes.get(indexOfCurrentResourceType) + "): loaded patients number - " + resources.size()); + logger.fine("readItem(" + fhirResourceType + "): loaded patients number - " + resources.size()); List patientIds = resources.stream().filter(item -> item.getId() != null).map(item -> item.getId()).collect(Collectors.toList()); if (patientIds != null && patientIds.size() > 0) { @@ -294,8 +277,7 @@ public void open(Serializable checkpoint) throws Exception { if (checkpoint != null) { CheckPointUserData checkPointData = (CheckPointUserData) checkpoint; pageNum = checkPointData.getPageNum(); - indexOfCurrentResourceType = checkPointData.getIndexOfCurrentResourceType(); - jobContext.setTransientUserData(TransientUserData.fromCheckPointUserData(checkPointData)); + stepCtx.setTransientUserData(TransientUserData.fromCheckPointUserData(checkPointData)); } if (fhirTenant == null) { @@ -319,16 +301,7 @@ public void open(Serializable checkpoint) throws Exception { fhirPersistence = fhirPersistenceHelper.getFHIRPersistenceImplementation(); searchParametersForResoureTypes = BulkDataUtils.getSearchParemetersFromTypeFilters(fhirTypeFilters); - List allCompartmentResourceTypes = CompartmentUtil.getCompartmentResourceTypes("Patient"); - if (fhirResourceType == null ) { - resourceTypes = allCompartmentResourceTypes; - } else { - List tmpResourceTypes = Arrays.asList(fhirResourceType.split("\\s*,\\s*")); - resourceTypes = tmpResourceTypes.stream().filter(item-> allCompartmentResourceTypes.contains(item)).collect(Collectors.toList()); - if (resourceTypes == null || resourceTypes.isEmpty()) { - throw new Exception("open: None of the input resource types is valid!"); - } - } + resourceType = ModelSupport.getResourceType(fhirResourceType); } @Override @@ -338,7 +311,7 @@ public void close() throws Exception { @Override public Serializable checkpointInfo() throws Exception { - return CheckPointUserData.fromTransientUserData((TransientUserData) jobContext.getTransientUserData()); + return CheckPointUserData.fromTransientUserData((TransientUserData) stepCtx.getTransientUserData()); } } diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkWriter.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkWriter.java deleted file mode 100644 index 6f5521902e5..00000000000 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkWriter.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * (C) Copyright IBM Corp. 2019, 2020 - * - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.ibm.fhir.bulkexport.patient; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -import javax.batch.api.BatchProperty; -import javax.inject.Inject; - -import com.ibm.fhir.search.compartment.CompartmentUtil; - -/** - * Bulk patient export Chunk implementation - the Writer. - * - */ - -public class ChunkWriter extends com.ibm.fhir.bulkexport.system.ChunkWriter { - - /** - * Fhir ResourceType. - */ - @Inject - @BatchProperty(name = "fhir.resourcetype") - String fhirResourceType; - - @Override - protected List getResourceTypes() throws Exception { - List resourceTypes; - List allCompartmentResourceTypes = CompartmentUtil.getCompartmentResourceTypes("Patient"); - if (fhirResourceType == null ) { - resourceTypes = allCompartmentResourceTypes; - } else { - List tmpResourceTypes = Arrays.asList(fhirResourceType.split("\\s*,\\s*")); - resourceTypes = tmpResourceTypes.stream().filter(item-> allCompartmentResourceTypes.contains(item)).collect(Collectors.toList()); - if (resourceTypes == null || resourceTypes.isEmpty()) { - throw new Exception("readItem: None of the input resource types is valid!"); - } - } - return resourceTypes; - } -} diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/PatientExportPartitionMapper.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/PatientExportPartitionMapper.java new file mode 100644 index 00000000000..78528f37d5c --- /dev/null +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/PatientExportPartitionMapper.java @@ -0,0 +1,66 @@ +/* + * (C) Copyright IBM Corp. 2020 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.bulkexport.patient; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +import javax.batch.api.BatchProperty; +import javax.batch.api.partition.PartitionMapper; +import javax.batch.api.partition.PartitionPlan; +import javax.batch.api.partition.PartitionPlanImpl; +import javax.inject.Inject; + +import com.ibm.fhir.bulkcommon.Constants; +import com.ibm.fhir.search.compartment.CompartmentUtil; + +public class PatientExportPartitionMapper implements PartitionMapper { + + /** + * Fhir ResourceType. + */ + @Inject + @BatchProperty(name = Constants.FHIR_RESOURCETYPES) + String fhirResourceType; + + + public PatientExportPartitionMapper() { + // No Operation + } + + @Override + public PartitionPlan mapPartitions() throws Exception { + List resourceTypes; + List allCompartmentResourceTypes = CompartmentUtil.getCompartmentResourceTypes("Patient"); + if (fhirResourceType == null ) { + resourceTypes = allCompartmentResourceTypes; + } else { + List tmpResourceTypes = Arrays.asList(fhirResourceType.split("\\s*,\\s*")); + resourceTypes = tmpResourceTypes.stream().filter(item-> allCompartmentResourceTypes.contains(item)).collect(Collectors.toList()); + if (resourceTypes == null || resourceTypes.isEmpty()) { + throw new Exception("open: None of the input resource types is valid!"); + } + } + + PartitionPlanImpl pp = new PartitionPlanImpl(); + pp.setPartitions(resourceTypes.size()); + pp.setThreads(Math.min(Constants.EXPORT_MAX_PARTITIONPROCESSING_THREADNUMBER, resourceTypes.size())); + Properties[] partitionProps = new Properties[resourceTypes.size()]; + + int propCount = 0; + for (String resourceType : resourceTypes) { + Properties p = new Properties(); + p.setProperty(Constants.PARTITION_RESOURCE_TYPE, resourceType); + partitionProps[propCount++] = p; + } + pp.setPartitionProperties(partitionProps); + + return pp; + } +} \ No newline at end of file diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java index 98ee0eb897f..a04ca84bcdd 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java @@ -20,7 +20,7 @@ import javax.batch.api.BatchProperty; import javax.batch.api.chunk.AbstractItemReader; -import javax.batch.runtime.context.JobContext; +import javax.batch.runtime.context.StepContext; import javax.inject.Inject; import com.ibm.cloud.objectstorage.services.s3.model.PartETag; @@ -50,7 +50,6 @@ public class ChunkReader extends AbstractItemReader { private final static Logger logger = Logger.getLogger(ChunkReader.class.getName()); boolean isSingleCosObject = false; int pageNum = 1; - int indexOfCurrentResourceType = 0; // Control the number of records to read in each "item". int pageSize = Constants.DEFAULT_SEARCH_PAGE_SIZE; // Search parameters for resource types gotten from fhir.typeFilters job parameter. @@ -64,73 +63,73 @@ public class ChunkReader extends AbstractItemReader { boolean isDoDuplicationCheck = false; FHIRPersistence fhirPersistence; - List resourceTypes; + Class resourceType; /** * Fhir tenant id. */ @Inject - @BatchProperty(name = "fhir.tenant") + @BatchProperty(name = Constants.FHIR_TENANT) String fhirTenant; /** * Fhir data store id. */ @Inject - @BatchProperty(name = "fhir.datastoreid") + @BatchProperty(name = Constants.FHIR_DATASTORE_ID) String fhirDatastoreId; /** - * Fhir ResourceType. + * Fhir resource type to process. */ @Inject - @BatchProperty(name = "fhir.resourcetype") + @BatchProperty(name = Constants.PARTITION_RESOURCE_TYPE) String fhirResourceType; /** * Fhir Search from date. */ @Inject - @BatchProperty(name = "fhir.search.fromdate") + @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_FROMDATE) String fhirSearchFromDate; /** * Fhir search to date. */ @Inject - @BatchProperty(name = "fhir.search.todate") + @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_TODATE) String fhirSearchToDate; /** * Fhir export type filters. */ @Inject - @BatchProperty(name = "fhir.typeFilters") + @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_TYPEFILTERS) String fhirTypeFilters; /** * Fhir search page size. */ @Inject - @BatchProperty(name = "fhir.search.pagesize") + @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_PAGESIZE) String fhirSearchPageSize; /** * The Cos object name. */ @Inject - @BatchProperty(name = "cos.bucket.objectname") + @BatchProperty(name = Constants.EXPORT_COS_OBJECTNAME) String cosBucketObjectName; @Inject - JobContext jobContext; + StepContext stepCtx; public ChunkReader() { super(); } private void fillChunkDataBuffer(List resources) throws Exception { - TransientUserData chunkData = (TransientUserData) jobContext.getTransientUserData(); + TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData(); int resSubTotal = 0; if (chunkData != null) { for (Resource res : resources) { @@ -170,25 +169,11 @@ private void fillChunkDataBuffer(List resources) throws Exception { @Override public Object readItem() throws Exception { - - TransientUserData chunkData = (TransientUserData) jobContext.getTransientUserData(); - // If the search already reaches the last page, then check if need to move to the next typeFilter or next resource type. + TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData(); + // If the search already reaches the last page, then check if need to move to the next typeFilter. if (chunkData != null && pageNum > chunkData.getLastPageNum()) { - Class resourceType = ModelSupport.getResourceType(resourceTypes.get(indexOfCurrentResourceType)); if (searchParametersForResoureTypes.get(resourceType) == null || searchParametersForResoureTypes.get(resourceType).size() <= indexOfCurrentTypeFilter + 1) { - // If there is no more typeFilter to process for current resource type, then check if there is any more resource type to process. - if (resourceTypes.size() == indexOfCurrentResourceType + 1) { - // No more resource type and page to read, so return null to end the reading. - return null; - } else { - // More resource types to read, so reset pageNum, partNum and move resource type index to the next and reset indexOfCurrentTypeFilter. - pageNum = 1; - chunkData.setPartNum(1); - indexOfCurrentResourceType++; - indexOfCurrentTypeFilter = 0; - isDoDuplicationCheck = false; - loadedResourceIds.clear(); - } + return null; } else { // If there is more typeFilter to process for current resource type, then reset pageNum only and move to the next typeFilter. pageNum = 1; @@ -196,8 +181,6 @@ public Object readItem() throws Exception { } } - Class resourceType = ModelSupport - .getResourceType(resourceTypes.get(indexOfCurrentResourceType)); FHIRSearchContext searchContext; FHIRPersistenceContext persistenceContext; Map> queryParameters = new HashMap<>(); @@ -236,15 +219,14 @@ public Object readItem() throws Exception { pageNum++; if (chunkData == null) { - chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, 0); + chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0); chunkData.setLastPageNum(searchContext.getLastPageNumber()); if (isSingleCosObject) { chunkData.setSingleCosObject(true); } - jobContext.setTransientUserData(chunkData); + stepCtx.setTransientUserData(chunkData); } else { chunkData.setPageNum(pageNum); - chunkData.setIndexOfCurrentResourceType(indexOfCurrentResourceType); chunkData.setIndexOfCurrentTypeFilter(indexOfCurrentTypeFilter); chunkData.setLastPageNum(searchContext.getLastPageNumber()); } @@ -264,9 +246,8 @@ public void open(Serializable checkpoint) throws Exception { if (checkpoint != null) { CheckPointUserData checkPointData = (CheckPointUserData) checkpoint; pageNum = checkPointData.getPageNum(); - indexOfCurrentResourceType = checkPointData.getIndexOfCurrentResourceType(); indexOfCurrentTypeFilter = checkPointData.getIndexOfCurrentTypeFilter(); - jobContext.setTransientUserData(TransientUserData.fromCheckPointUserData(checkPointData)); + stepCtx.setTransientUserData(TransientUserData.fromCheckPointUserData(checkPointData)); } if (fhirTenant == null) { @@ -287,9 +268,7 @@ public void open(Serializable checkpoint) throws Exception { } if (cosBucketObjectName != null - && cosBucketObjectName.trim().length() > 0 - // Single COS object uploading is for single resource type export only. - && resourceTypes.size() == 1) { + && cosBucketObjectName.trim().length() > 0) { isSingleCosObject = true; logger.info("open: Use single COS object for uploading!"); } @@ -298,8 +277,8 @@ public void open(Serializable checkpoint) throws Exception { FHIRPersistenceHelper fhirPersistenceHelper = new FHIRPersistenceHelper(); fhirPersistence = fhirPersistenceHelper.getFHIRPersistenceImplementation(); - resourceTypes = Arrays.asList(fhirResourceType.split("\\s*,\\s*")); searchParametersForResoureTypes = BulkDataUtils.getSearchParemetersFromTypeFilters(fhirTypeFilters); + resourceType = ModelSupport.getResourceType(fhirResourceType); } @Override @@ -309,7 +288,7 @@ public void close() throws Exception { @Override public Serializable checkpointInfo() throws Exception { - return CheckPointUserData.fromTransientUserData((TransientUserData) jobContext.getTransientUserData()); + return CheckPointUserData.fromTransientUserData((TransientUserData) stepCtx.getTransientUserData()); } } diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java index aac74f1d0c1..78c4c8a2f31 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java @@ -9,7 +9,6 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.time.Instant; -import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.logging.Logger; @@ -17,6 +16,7 @@ import javax.batch.api.BatchProperty; import javax.batch.api.chunk.AbstractItemWriter; import javax.batch.runtime.context.JobContext; +import javax.batch.runtime.context.StepContext; import javax.inject.Inject; import com.ibm.cloud.objectstorage.services.s3.AmazonS3; @@ -43,64 +43,67 @@ public class ChunkWriter extends AbstractItemWriter { * The IBM COS API key or S3 access key. */ @Inject - @BatchProperty(name = "cos.api.key") + @BatchProperty(name = Constants.COS_API_KEY) String cosApiKeyProperty; /** * The IBM COS service instance id or s3 secret key. */ @Inject - @BatchProperty(name = "cos.srvinst.id") + @BatchProperty(name = Constants.COS_SRVINST_ID) String cosSrvinstId; /** * The Cos End point URL. */ @Inject - @BatchProperty(name = "cos.endpointurl") + @BatchProperty(name = Constants.COS_ENDPOINT_URL) String cosEndpointUrl; /** * The Cos End point location. */ @Inject - @BatchProperty(name = "cos.location") + @BatchProperty(name = Constants.COS_LOCATION) String cosLocation; /** * The Cos bucket name. */ @Inject - @BatchProperty(name = "cos.bucket.name") + @BatchProperty(name = Constants.COS_BUCKET_NAME) String cosBucketName; /** * The Cos bucket path prefix. */ @Inject - @BatchProperty(name = "cos.bucket.pathprefix") + @BatchProperty(name = Constants.EXPORT_COS_OBJECT_PATHPREFIX) String cosBucketPathPrefix; /** * If use IBM credential or Amazon secret keys. */ @Inject - @BatchProperty(name = "cos.credential.ibm") + @BatchProperty(name = Constants.COS_IS_IBM_CREDENTIAL) String cosCredentialIbm; /** - * The Cos object name(only used by system export for exporting single resource type) + * Fhir resource type to process. */ @Inject - @BatchProperty(name = "cos.bucket.objectname") - String cosBucketObjectName; + @BatchProperty(name = Constants.PARTITION_RESOURCE_TYPE) + String fhirResourceType; /** - * Fhir ResourceType. + * The Cos object name(only used by system export for exporting single resource type) */ @Inject - @BatchProperty(name = "fhir.resourcetype") - String fhirResourceType; + @BatchProperty(name = Constants.EXPORT_COS_OBJECTNAME) + String cosBucketObjectName; + + @Inject + StepContext stepCtx; @Inject JobContext jobContext; @@ -112,20 +115,13 @@ public ChunkWriter() { super(); } - - protected List getResourceTypes() throws Exception { - return Arrays.asList(fhirResourceType.split("\\s*,\\s*")); - } - private void pushFhirJsonsToCos(InputStream in, int dataLength) throws Exception { if (cosClient == null) { logger.warning("pushFhirJsons2Cos: no cosClient!"); throw new Exception("pushFhirJsons2Cos: no cosClient!"); } - List ResourceTypes = getResourceTypes(); - - TransientUserData chunkData = (TransientUserData) jobContext.getTransientUserData(); + TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData(); if (chunkData == null) { logger.warning("pushFhirJsons2Cos: chunkData is null, this should never happen!"); throw new Exception("pushFhirJsons2Cos: chunkData is null, this should never happen!"); @@ -145,7 +141,7 @@ private void pushFhirJsonsToCos(InputStream in, int dataLength) throws Exception if (chunkData.getPageNum() > chunkData.getLastPageNum()) { BulkDataUtils.finishMultiPartUpload(cosClient, cosBucketName, cosBucketObjectName, chunkData.getUploadId(), chunkData.getCosDataPacks()); - jobContext.setExitStatus(cosBucketObjectName + "; " + ResourceTypes.get(chunkData.getIndexOfCurrentResourceType()) + stepCtx.setExitStatus(cosBucketObjectName + "; " + fhirResourceType + "[" + chunkData.getCurrentPartResourceNum() + "]"); } @@ -156,8 +152,7 @@ private void pushFhirJsonsToCos(InputStream in, int dataLength) throws Exception String itemName; PutObjectRequest req; if (cosBucketPathPrefix != null && cosBucketPathPrefix.trim().length() > 0) { - itemName = cosBucketPathPrefix + "/" + ResourceTypes.get(chunkData.getIndexOfCurrentResourceType()) - + "_" + chunkData.getPartNum() + ".ndjson"; + itemName = cosBucketPathPrefix + "/" + fhirResourceType + "_" + chunkData.getPartNum() + ".ndjson"; if (isExportPublic) { // Set expiration time to 2 hours(7200 seconds). // Note: IBM COS doesn't honor this but also doesn't fail on this. @@ -172,8 +167,7 @@ private void pushFhirJsonsToCos(InputStream in, int dataLength) throws Exception } } else { - itemName = "job" + jobContext.getExecutionId() + "/" + ResourceTypes.get(chunkData.getIndexOfCurrentResourceType()) - + "_" + chunkData.getPartNum() + ".ndjson"; + itemName = "job" + jobContext.getExecutionId() + "/" + fhirResourceType + "_" + chunkData.getPartNum() + ".ndjson"; req = new PutObjectRequest(cosBucketName, itemName, in, metadata); } @@ -181,22 +175,19 @@ private void pushFhirJsonsToCos(InputStream in, int dataLength) throws Exception logger.info( "pushFhirJsons2Cos: " + itemName + "(" + dataLength + " bytes) was successfully written to COS"); // Job exit status, e.g, Patient[1000,1000,200]:Observation[1000,1000,200] - if (jobContext.getExitStatus() == null) { - jobContext.setExitStatus(ResourceTypes.get(chunkData.getIndexOfCurrentResourceType()) - + "[" + chunkData.getCurrentPartResourceNum()); + if (chunkData.getResourceTypeSummary() == null) { + chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + fhirResourceType + "[" + chunkData.getCurrentPartResourceNum()); if (chunkData.getPageNum() > chunkData.getLastPageNum()) { - jobContext.setExitStatus(jobContext.getExitStatus() + "]"); + chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "]"); } } else { if (chunkData.getPartNum() == 1) { - jobContext.setExitStatus(jobContext.getExitStatus() + ":" - + ResourceTypes.get(chunkData.getIndexOfCurrentResourceType()) + "[" - + chunkData.getCurrentPartResourceNum()); + chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + ":" + fhirResourceType + "[" + chunkData.getCurrentPartResourceNum()); } else { - jobContext.setExitStatus(jobContext.getExitStatus() + "," + chunkData.getCurrentPartResourceNum()); + chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "," + chunkData.getCurrentPartResourceNum()); } if (chunkData.getPageNum() > chunkData.getLastPageNum()) { - jobContext.setExitStatus(jobContext.getExitStatus() + "]"); + chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "]"); } } chunkData.setPartNum(chunkData.getPartNum() + 1); @@ -229,7 +220,7 @@ public void writeItems(List arg0) throws Exception { cosClient.createBucket(req); } - TransientUserData chunkData = (TransientUserData) jobContext.getTransientUserData(); + TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData(); if (chunkData == null) { logger.warning("writeItems: chunkData is null, this should never happen!"); throw new Exception("writeItems: chunkData is null, this should never happen!"); diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java new file mode 100644 index 00000000000..e8969671c5b --- /dev/null +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java @@ -0,0 +1,49 @@ +/* + * (C) Copyright IBM Corp. 2020 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.bulkexport.system; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; + +import javax.batch.api.partition.PartitionAnalyzer; +import javax.batch.runtime.BatchStatus; +import javax.batch.runtime.context.JobContext; +import javax.inject.Inject; + +import com.ibm.fhir.bulkexport.common.CheckPointUserData; + +public class ExportPartitionAnalyzer implements PartitionAnalyzer { + private static final Logger logger = Logger.getLogger(ExportPartitionAnalyzer.class.getName()); + @Inject + JobContext jobContext; + + private List partitionSummaries = new ArrayList<>(); + + public ExportPartitionAnalyzer() { + } + + @Override + public void analyzeStatus(BatchStatus batchStatus, String exitStatus) { + + } + + @Override + public void analyzeCollectorData(Serializable data) { + if (data == null) { + return; + } + CheckPointUserData partitionSummary = (CheckPointUserData) data; + + if (partitionSummary != null) { + partitionSummaries.add(partitionSummary); + jobContext.setTransientUserData(partitionSummaries); + } + + } +} diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java new file mode 100644 index 00000000000..e0b13f66e55 --- /dev/null +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java @@ -0,0 +1,44 @@ +/* + * (C) Copyright IBM Corp. 2020 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.bulkexport.system; + +import java.io.Serializable; +import java.util.logging.Logger; + +import javax.batch.api.partition.PartitionCollector; +import javax.batch.runtime.BatchStatus; +import javax.batch.runtime.context.StepContext; +import javax.inject.Inject; + +import com.ibm.fhir.bulkexport.common.CheckPointUserData; +import com.ibm.fhir.bulkexport.common.TransientUserData; + +public class ExportPartitionCollector implements PartitionCollector { + private static final Logger logger = Logger.getLogger(ExportPartitionCollector.class.getName()); + @Inject + StepContext stepCtx; + + public ExportPartitionCollector() { + // The injected properties are not available at class construction time + // These values are lazy injected BEFORE calling 'collectPartitionData'. + } + + @Override + public Serializable collectPartitionData() throws Exception { + TransientUserData partitionSummaryData = (TransientUserData)stepCtx.getTransientUserData(); + BatchStatus batchStatus = stepCtx.getBatchStatus(); + + // If the job is being stopped or in other status except for "started", then collect nothing. + if (!batchStatus.equals(BatchStatus.STARTED)) { + return null; + } + + CheckPointUserData partitionSummaryForMetrics = CheckPointUserData.fromTransientUserData(partitionSummaryData); + return partitionSummaryForMetrics; + } + +} diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/SystemExportPartitionMapper.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/SystemExportPartitionMapper.java new file mode 100644 index 00000000000..12aa4f24f3c --- /dev/null +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/SystemExportPartitionMapper.java @@ -0,0 +1,60 @@ +/* + * (C) Copyright IBM Corp. 2020 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.bulkexport.system; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +import javax.batch.api.BatchProperty; +import javax.batch.api.partition.PartitionMapper; +import javax.batch.api.partition.PartitionPlan; +import javax.batch.api.partition.PartitionPlanImpl; +import javax.inject.Inject; + +import com.ibm.fhir.bulkcommon.Constants; +import com.ibm.fhir.model.util.ModelSupport; + +public class SystemExportPartitionMapper implements PartitionMapper { + + /** + * Fhir ResourceType. + */ + @Inject + @BatchProperty(name = Constants.FHIR_RESOURCETYPES) + String fhirResourceType; + + + public SystemExportPartitionMapper() { + // No Operation + } + + @Override + public PartitionPlan mapPartitions() throws Exception { + List resourceTypes = Arrays.asList(fhirResourceType.split("\\s*,\\s*")); + resourceTypes = resourceTypes.stream().filter(item-> ModelSupport.isResourceType(item)).collect(Collectors.toList()); + if (resourceTypes == null || resourceTypes.isEmpty()) { + throw new Exception("open: None of the input resource types is valid!"); + } + + PartitionPlanImpl pp = new PartitionPlanImpl(); + pp.setPartitions(resourceTypes.size()); + pp.setThreads(Math.min(Constants.IMPORT_MAX_PARTITIONPROCESSING_THREADNUMBER, resourceTypes.size())); + Properties[] partitionProps = new Properties[resourceTypes.size()]; + + int propCount = 0; + for (String resourceType : resourceTypes) { + Properties p = new Properties(); + p.setProperty(Constants.PARTITION_RESOURCE_TYPE, resourceType); + partitionProps[propCount++] = p; + } + pp.setPartitionProperties(partitionProps); + + return pp; + } +} \ No newline at end of file diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkReader.java index f4537da6727..854c746f324 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkReader.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkReader.java @@ -95,7 +95,7 @@ public class ChunkReader extends AbstractItemReader { * Fhir resource type to process. */ @Inject - @BatchProperty(name = Constants.IMPORT_PARTITTION_RESOURCE_TYPE) + @BatchProperty(name = Constants.PARTITION_RESOURCE_TYPE) String importPartitionResourceType; public ChunkReader() { diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkWriter.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkWriter.java index 017ad7f9214..1e91086f5f4 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkWriter.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkWriter.java @@ -106,7 +106,7 @@ public class ChunkWriter extends AbstractItemWriter { * Fhir resource type to process. */ @Inject - @BatchProperty(name = Constants.IMPORT_PARTITTION_RESOURCE_TYPE) + @BatchProperty(name = Constants.PARTITION_RESOURCE_TYPE) String importPartitionResourceType; diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionMapper.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionMapper.java index cde984ba60b..a43104733a3 100644 --- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionMapper.java +++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionMapper.java @@ -48,7 +48,7 @@ public class ImportPartitionMapper implements PartitionMapper { *

* https *

- * + * *
        [{
          "type": "Patient",
@@ -61,7 +61,7 @@ public class ImportPartitionMapper implements PartitionMapper {
      * 

* ibm-cos or aws-s3 *

- * + * *
        [{
           "type": "Patient",
@@ -74,7 +74,7 @@ public class ImportPartitionMapper implements PartitionMapper {
      * 

* file *

- * + * *
        [{
           "type": "Patient",
@@ -268,7 +268,7 @@ public PartitionPlan mapPartitions() throws Exception {
         for (FhirDataSource fhirDataSource : fhirDataSources) {
             Properties p = new Properties();
             p.setProperty(Constants.IMPORT_PARTITTION_WORKITEM, fhirDataSource.getUrl());
-            p.setProperty(Constants.IMPORT_PARTITTION_RESOURCE_TYPE, fhirDataSource.getType());
+            p.setProperty(Constants.PARTITION_RESOURCE_TYPE, fhirDataSource.getType());
 
             partitionProps[propCount++] = p;
         }

From 17dab0f293a6af33394834dc60cc5e0e27ecedb2 Mon Sep 17 00:00:00 2001
From: Albert Wang 
Date: Thu, 21 May 2020 08:41:01 -0400
Subject: [PATCH 2/6] issue #779 bulkexport results in job exit status and
 simple metrics

Signed-off-by: Albert Wang 
---
 .../batch-jobs/FhirBulkExportChunkJob.xml     |  3 +
 .../FhirBulkExportGroupChunkJob.xml           |  8 ++
 .../FhirBulkExportPatientChunkJob.xml         |  5 +
 .../bulkexport/common/CheckPointUserData.java | 29 +++++-
 .../bulkexport/common/TransientUserData.java  |  8 +-
 .../fhir/bulkexport/group/ChunkReader.java    |  8 +-
 .../fhir/bulkexport/patient/ChunkReader.java  | 14 ++-
 .../fhir/bulkexport/system/ChunkReader.java   | 14 ++-
 .../fhir/bulkexport/system/ChunkWriter.java   | 14 +--
 .../bulkexport/system/ExportJobListener.java  | 98 +++++++++++++++++++
 .../system/ExportPartitionAnalyzer.java       | 14 +--
 .../system/ExportPartitionCollector.java      | 15 +--
 12 files changed, 190 insertions(+), 40 deletions(-)
 create mode 100644 fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportJobListener.java

diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml
index 4214fa2d49f..2e83a189a40 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml
+++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml
@@ -3,6 +3,9 @@
     
         
     
+    
+        
+    
     
         
             
diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml
index ad8fc52fa1b..e6dbe22825c 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml
+++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml
@@ -1,5 +1,11 @@
 
 
+    
+        
+    
+    
+        
+    
     
         
             
@@ -33,6 +39,8 @@
                     
                 
             
+            
+            
         
     
 
\ No newline at end of file
diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml
index a66457c6ee2..014dd53d28b 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml
+++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml
@@ -3,6 +3,9 @@
     
         
     
+    
+        
+    
     
         
             
@@ -41,6 +44,8 @@
                     
                 
             
+            
+            
         
     
 
\ No newline at end of file
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java
index 6cd1a3fd436..123db7e45e0 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java
@@ -23,22 +23,31 @@ public class CheckPointUserData implements java.io.Serializable {
     private boolean isSingleCosObject = false;
     private List cosDataPacks;
     private int currentPartResourceNum = 0;
+    private int totalResourcesNum = 0;
     // One resource type can have 0 to multiple typeFilters, indexOfCurrentTypeFilter is used to tell the currently processed typeFilter.
     private int indexOfCurrentTypeFilter;
+    // Partition status for the exported resources, e.g, Patient[1000,1000,200]
     private String resourceTypeSummary = null;
+    // Used to mark the complete of the partition.
+    private boolean isMoreToExport = true;
 
-    public CheckPointUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentTypeFilter) {
+    public CheckPointUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentTypeFilter,
+            String resourceTypeSummary, int totalResourcesNum, int currentPartResourceNum) {
         super();
         this.pageNum = pageNum;
         this.uploadId = uploadId;
         this.cosDataPacks = cosDataPacks;
         this.partNum = partNum;
         this.indexOfCurrentTypeFilter = indexOfCurrentTypeFilter;
+        this.resourceTypeSummary = resourceTypeSummary;
+        this.totalResourcesNum = totalResourcesNum;
+        this.currentPartResourceNum = currentPartResourceNum;
     }
 
     public static CheckPointUserData fromTransientUserData(TransientUserData userData) {
         return new CheckPointUserData(userData.getPageNum(), userData.getUploadId(), userData.getCosDataPacks(),
-                userData.getPartNum(), userData.getIndexOfCurrentTypeFilter());
+                userData.getPartNum(), userData.getIndexOfCurrentTypeFilter(), userData.getResourceTypeSummary(),
+                userData.getTotalResourcesNum(), userData.getCurrentPartResourceNum());
     }
 
     public int getPageNum() {
@@ -113,4 +122,20 @@ public void setResourceTypeSummary(String resourceTypeSummary) {
         this.resourceTypeSummary = resourceTypeSummary;
     }
 
+    public boolean isMoreToExport() {
+        return isMoreToExport;
+    }
+
+    public void setMoreToExport(boolean isMoreToExport) {
+        this.isMoreToExport = isMoreToExport;
+    }
+
+    public int getTotalResourcesNum() {
+        return totalResourcesNum;
+    }
+
+    public void setTotalResourcesNum(int totalResourcesNum) {
+        this.totalResourcesNum = totalResourcesNum;
+    }
+
 }
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java
index d8f06187d5e..7069420a310 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java
@@ -19,13 +19,15 @@ public class TransientUserData extends CheckPointUserData {
     private static final long serialVersionUID = -5892726731783560418L;
     private ByteArrayOutputStream bufferStream = new ByteArrayOutputStream();
 
-    public TransientUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentTypeFilter) {
-        super(pageNum, uploadId, cosDataPacks, partNum, indexOfCurrentTypeFilter);
+    public TransientUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentTypeFilter,
+            String resourceTypeSummary, int totalResourcesNum, int currentPartResourceNum) {
+        super(pageNum, uploadId, cosDataPacks, partNum, indexOfCurrentTypeFilter, resourceTypeSummary, totalResourcesNum, currentPartResourceNum);
     }
 
     public static TransientUserData fromCheckPointUserData(CheckPointUserData checkPointData) {
         return new TransientUserData(checkPointData.getPageNum(), checkPointData.getUploadId(),
-                checkPointData.getCosDataPacks(), checkPointData.getPartNum(), checkPointData.getIndexOfCurrentTypeFilter());
+                checkPointData.getCosDataPacks(), checkPointData.getPartNum(), checkPointData.getIndexOfCurrentTypeFilter(),
+                checkPointData.getResourceTypeSummary(), checkPointData.getTotalResourcesNum(), checkPointData.getCurrentPartResourceNum());
     }
 
     public ByteArrayOutputStream getBufferStream() {
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java
index a0a19d3723a..d6c05527a54 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java
@@ -12,6 +12,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
 
@@ -101,6 +102,7 @@ public Object readItem() throws Exception {
 
         TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData();
         if (chunkData != null && pageNum > chunkData.getLastPageNum()) {
+            chunkData.setMoreToExport(false);
             return null;
         }
 
@@ -116,7 +118,7 @@ public Object readItem() throws Exception {
         pageNum++;
 
         if (chunkData == null) {
-            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0);
+            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, null, 0, 0);
             stepCtx.setTransientUserData(chunkData);
         } else {
             chunkData.setPageNum(pageNum);
@@ -124,7 +126,9 @@ public Object readItem() throws Exception {
         chunkData.setLastPageNum((patientMembers.size() + pageSize -1)/pageSize );
 
         if (!patientPageMembers.isEmpty()) {
-            logger.fine("readItem: loaded patients number - " + patientMembers.size());
+            if (logger.isLoggable(Level.FINE)) {
+                logger.fine("readItem: loaded patients number - " + patientMembers.size());
+            }
 
             List patientIds = patientPageMembers.stream().filter(patientRef -> patientRef != null).map(patientRef
                     -> patientRef.getEntity().getReference().getValue().substring(8)).collect(Collectors.toList());
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java
index 6ee50890dbc..5f3de3991e8 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java
@@ -200,6 +200,7 @@ protected void fillChunkDataBuffer(List patientIds) throws Exception {
             } while (searchParametersForResoureTypes.get(resourceType) != null && indexOfCurrentTypeFilter < searchParametersForResoureTypes.get(resourceType).size());
 
             chunkData.setCurrentPartResourceNum(chunkData.getCurrentPartResourceNum() + resSubTotal);
+            chunkData.setTotalResourcesNum(chunkData.getTotalResourcesNum() + resSubTotal);
             if (logger.isLoggable(Level.FINE)) {
                 logger.fine("fillChunkDataBuffer: Processed resources - " + resSubTotal + "; Bufferred data size - "
                         + chunkData.getBufferStream().size());
@@ -217,6 +218,7 @@ public Object readItem() throws Exception {
         TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData();
         if (chunkData != null && pageNum > chunkData.getLastPageNum()) {
                 // No more page to read, so return null to end the reading.
+                chunkData.setMoreToExport(false);
                 return null;
         }
 
@@ -250,7 +252,7 @@ public Object readItem() throws Exception {
         pageNum++;
 
         if (chunkData == null) {
-            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0);
+            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, null, 0, 0);
             chunkData.setLastPageNum(searchContext.getLastPageNumber());
             stepCtx.setTransientUserData(chunkData);
         } else {
@@ -259,7 +261,9 @@ public Object readItem() throws Exception {
         }
 
         if (resources != null) {
-            logger.fine("readItem(" + fhirResourceType + "): loaded patients number - " + resources.size());
+            if (logger.isLoggable(Level.FINE)) {
+                logger.fine("readItem(" + fhirResourceType + "): loaded patients number - " + resources.size());
+            }
 
             List patientIds = resources.stream().filter(item -> item.getId() != null).map(item -> item.getId()).collect(Collectors.toList());
             if (patientIds != null && patientIds.size() > 0) {
@@ -291,7 +295,9 @@ public void open(Serializable checkpoint) throws Exception {
         if (fhirSearchPageSize != null) {
             try {
                 pageSize = Integer.parseInt(fhirSearchPageSize);
-                logger.fine("open: Set page size to " + pageSize + ".");
+                if (logger.isLoggable(Level.FINE)) {
+                    logger.fine("open: Set page size to " + pageSize + ".");
+                }
             } catch (Exception e) {
                 logger.warning("open: Set page size to default(" + Constants.DEFAULT_SEARCH_PAGE_SIZE + ").");
             }
@@ -306,7 +312,7 @@ public void open(Serializable checkpoint) throws Exception {
 
     @Override
     public void close() throws Exception {
-
+        // do nothing
     }
 
     @Override
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java
index a04ca84bcdd..4ed7a6a8ba8 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java
@@ -158,6 +158,7 @@ private void fillChunkDataBuffer(List resources) throws Exception {
                 }
             }
             chunkData.setCurrentPartResourceNum(chunkData.getCurrentPartResourceNum() + resSubTotal);
+            chunkData.setTotalResourcesNum(chunkData.getTotalResourcesNum() + resSubTotal);
             logger.fine("fillChunkDataBuffer: Processed resources - " + resSubTotal + "; Bufferred data size - "
                     + chunkData.getBufferStream().size());
         } else {
@@ -173,6 +174,7 @@ public Object readItem() throws Exception {
         // If the search already reaches the last page, then check if need to move to the next typeFilter.
         if (chunkData != null && pageNum > chunkData.getLastPageNum()) {
             if (searchParametersForResoureTypes.get(resourceType) == null || searchParametersForResoureTypes.get(resourceType).size() <= indexOfCurrentTypeFilter + 1) {
+                chunkData.setMoreToExport(false);
                 return null;
             } else {
              // If there is more typeFilter to process for current resource type, then reset pageNum only and move to the next typeFilter.
@@ -219,7 +221,7 @@ public Object readItem() throws Exception {
         pageNum++;
 
         if (chunkData == null) {
-            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0);
+            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, null, 0, 0);
             chunkData.setLastPageNum(searchContext.getLastPageNumber());
             if (isSingleCosObject) {
                 chunkData.setSingleCosObject(true);
@@ -232,7 +234,9 @@ public Object readItem() throws Exception {
         }
 
         if (resources != null) {
-            logger.fine("readItem: loaded resources number - " + resources.size());
+            if (logger.isLoggable(Level.FINE)) {
+                logger.fine("readItem: loaded resources number - " + resources.size());
+            }
             fillChunkDataBuffer(resources);
         } else {
             logger.fine("readItem: End of reading!");
@@ -261,7 +265,9 @@ public void open(Serializable checkpoint) throws Exception {
         if (fhirSearchPageSize != null) {
             try {
                 pageSize = Integer.parseInt(fhirSearchPageSize);
-                logger.fine("open: Set page size to " + pageSize + ".");
+                if (logger.isLoggable(Level.FINE)) {
+                    logger.fine("open: Set page size to " + pageSize + ".");
+                }
             } catch (Exception e) {
                 logger.warning("open: Set page size to default(" + Constants.DEFAULT_SEARCH_PAGE_SIZE + ").");
             }
@@ -283,7 +289,7 @@ public void open(Serializable checkpoint) throws Exception {
 
     @Override
     public void close() throws Exception {
-
+        // do nothing.
     }
 
     @Override
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java
index 78c4c8a2f31..f079c09b2fc 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java
@@ -141,8 +141,7 @@ private void pushFhirJsonsToCos(InputStream in, int dataLength) throws Exception
             if (chunkData.getPageNum() > chunkData.getLastPageNum()) {
                 BulkDataUtils.finishMultiPartUpload(cosClient, cosBucketName, cosBucketObjectName, chunkData.getUploadId(),
                         chunkData.getCosDataPacks());
-                stepCtx.setExitStatus(cosBucketObjectName + "; " + fhirResourceType
-                    + "[" + chunkData.getCurrentPartResourceNum() + "]");
+                chunkData.setResourceTypeSummary(fhirResourceType + "[" + chunkData.getCurrentPartResourceNum() + "]");
             }
 
         } else {
@@ -174,20 +173,17 @@ private void pushFhirJsonsToCos(InputStream in, int dataLength) throws Exception
             cosClient.putObject(req);
             logger.info(
                     "pushFhirJsons2Cos: " + itemName + "(" + dataLength + " bytes) was successfully written to COS");
-            // Job exit status, e.g, Patient[1000,1000,200]:Observation[1000,1000,200]
+            // Partition status for the exported resources, e.g, Patient[1000,1000,200]
             if (chunkData.getResourceTypeSummary() == null) {
-                chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + fhirResourceType + "[" + chunkData.getCurrentPartResourceNum());
+                chunkData.setResourceTypeSummary(fhirResourceType + "[" + chunkData.getCurrentPartResourceNum());
                 if (chunkData.getPageNum() > chunkData.getLastPageNum()) {
                     chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "]");
                 }
             } else {
-                if (chunkData.getPartNum() == 1) {
-                    chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + ":" + fhirResourceType + "[" + chunkData.getCurrentPartResourceNum());
-                } else {
-                    chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "," + chunkData.getCurrentPartResourceNum());
-                }
+                chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "," + chunkData.getCurrentPartResourceNum());
                 if (chunkData.getPageNum() > chunkData.getLastPageNum()) {
                     chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "]");
+                    stepCtx.setTransientUserData(chunkData);
                 }
             }
             chunkData.setPartNum(chunkData.getPartNum() + 1);
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportJobListener.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportJobListener.java
new file mode 100644
index 00000000000..a56391879a2
--- /dev/null
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportJobListener.java
@@ -0,0 +1,98 @@
+/*
+ * (C) Copyright IBM Corp. 2020
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.ibm.fhir.bulkexport.system;
+
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.listener.JobListener;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.JobExecution;
+import javax.batch.runtime.context.JobContext;
+import javax.inject.Inject;
+
+import com.ibm.fhir.bulkcommon.Constants;
+import com.ibm.fhir.bulkexport.common.CheckPointUserData;
+
+public class ExportJobListener implements JobListener {
+    private static final Logger logger = Logger.getLogger(ExportJobListener.class.getName());
+
+    long currentExecutionStartTimeInMS;
+
+    @Inject
+    JobContext jobContext;
+
+    @Inject
+    @BatchProperty(name = Constants.IMPORT_FHIR_DATASOURCES)
+    String dataSourcesInfo;
+
+    public ExportJobListener() {
+        // do nothing
+    }
+
+
+    @Override
+    public void afterJob() {
+        long currentExecutionEndTimeInMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+
+        // Used for generating response for all the import data resources.
+        @SuppressWarnings("unchecked")
+        List partitionSummaries = (List)jobContext.getTransientUserData();
+
+        JobOperator jobOperator = BatchRuntime.getJobOperator();
+        long totalJobExecutionMilliSeconds = 0;
+        // The job can be started, stopped and then started again, so we need to add them all to get the whole job execution duration.
+        for ( JobExecution jobExecution: jobOperator.getJobExecutions(jobOperator.getJobInstance(jobContext.getExecutionId()))) {
+            // For current execution, jobExecution.getEndTime() is either null or with wrong value because the current execution is not
+            // finished yet, so always use system time for both job execution start time and end time.
+            if (jobExecution.getExecutionId()  == jobContext.getExecutionId()) {
+                totalJobExecutionMilliSeconds += (currentExecutionEndTimeInMS - currentExecutionStartTimeInMS);
+            } else {
+                totalJobExecutionMilliSeconds += (jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime());
+            }
+        }
+
+        // If the job is stopped before any partition is finished, then nothing to show.
+        if (partitionSummaries == null) {
+            return;
+        }
+
+        double jobProcessingSeconds = totalJobExecutionMilliSeconds / 1000.0;
+        jobProcessingSeconds = jobProcessingSeconds < 1 ? 1.0 : jobProcessingSeconds;
+
+        // log the simple metrics.
+        logger.info(" ---- Fhir resources exported in " + jobProcessingSeconds + "seconds ----");
+        logger.info("ResourceType \t| Exported");
+        int totalExportedFhirResources = 0;
+        List resourceTypeSummaries = new ArrayList<>();
+        for (CheckPointUserData partitionSummary : partitionSummaries) {
+            logger.info(partitionSummary.getResourceTypeSummary() + "\t|"
+                  + partitionSummary.getTotalResourcesNum());
+            resourceTypeSummaries.add(partitionSummary.getResourceTypeSummary());
+            totalExportedFhirResources += partitionSummary.getTotalResourcesNum();
+        }
+
+        logger.info(" ---- Total: " + totalExportedFhirResources
+                + " ExportRate: " + new DecimalFormat("#0.00").format(totalExportedFhirResources/jobProcessingSeconds) + " ----");
+
+        if (resourceTypeSummaries.size() > 0) {
+            // e.g, Patient[1000,1000,200]:Observation[1000,250]
+            jobContext.setExitStatus(String.join(":", resourceTypeSummaries));
+        }
+    }
+
+    @Override
+    public void beforeJob() {
+        currentExecutionStartTimeInMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+    }
+
+}
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java
index e8969671c5b..bfa75222d0b 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java
@@ -9,7 +9,6 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.logging.Logger;
 
 import javax.batch.api.partition.PartitionAnalyzer;
 import javax.batch.runtime.BatchStatus;
@@ -19,18 +18,18 @@
 import com.ibm.fhir.bulkexport.common.CheckPointUserData;
 
 public class ExportPartitionAnalyzer implements PartitionAnalyzer {
-    private static final Logger logger = Logger.getLogger(ExportPartitionAnalyzer.class.getName());
     @Inject
     JobContext jobContext;
 
     private List partitionSummaries = new ArrayList<>();
 
     public ExportPartitionAnalyzer() {
+        // do nothing.
     }
 
     @Override
     public void analyzeStatus(BatchStatus batchStatus, String exitStatus) {
-
+        // do nothing.
     }
 
     @Override
@@ -38,12 +37,9 @@ public void analyzeCollectorData(Serializable data) {
         if (data == null) {
             return;
         }
-        CheckPointUserData partitionSummary  = (CheckPointUserData) data;
-
-        if (partitionSummary != null) {
-            partitionSummaries.add(partitionSummary);
-            jobContext.setTransientUserData(partitionSummaries);
-        }
 
+        CheckPointUserData partitionSummary  = (CheckPointUserData) data;
+        partitionSummaries.add(partitionSummary);
+        jobContext.setTransientUserData(partitionSummaries);
     }
 }
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java
index e0b13f66e55..de194124dc0 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java
@@ -7,7 +7,6 @@
 package com.ibm.fhir.bulkexport.system;
 
 import java.io.Serializable;
-import java.util.logging.Logger;
 
 import javax.batch.api.partition.PartitionCollector;
 import javax.batch.runtime.BatchStatus;
@@ -18,7 +17,6 @@
 import com.ibm.fhir.bulkexport.common.TransientUserData;
 
 public class ExportPartitionCollector implements PartitionCollector {
-    private static final Logger logger = Logger.getLogger(ExportPartitionCollector.class.getName());
     @Inject
     StepContext stepCtx;
 
@@ -29,16 +27,19 @@ public ExportPartitionCollector() {
 
     @Override
     public Serializable collectPartitionData() throws Exception {
-        TransientUserData partitionSummaryData  = (TransientUserData)stepCtx.getTransientUserData();
+        TransientUserData transientUserData  = (TransientUserData)stepCtx.getTransientUserData();
         BatchStatus batchStatus = stepCtx.getBatchStatus();
 
-        // If the job is being stopped or in other status except for "started", then collect nothing.
-        if (!batchStatus.equals(BatchStatus.STARTED)) {
+        // If the job is being stopped or in other status except for "started", or if there is more page to process, then collect nothing.
+        if (!batchStatus.equals(BatchStatus.STARTED)
+            || transientUserData.isMoreToExport()
+            || transientUserData.getResourceTypeSummary() == null)
+        {
             return null;
         }
 
-        CheckPointUserData partitionSummaryForMetrics = CheckPointUserData.fromTransientUserData(partitionSummaryData);
-        return partitionSummaryForMetrics;
+        CheckPointUserData partitionSummary = CheckPointUserData.fromTransientUserData(transientUserData);
+        return partitionSummary;
     }
 
 }

From 4cfcdfa1cb9508e8df9acecf6085c6cc832fa57d Mon Sep 17 00:00:00 2001
From: Albert Wang 
Date: Thu, 21 May 2020 14:16:19 -0400
Subject: [PATCH 3/6] issue #779 checkpoint algorithm updates

Signed-off-by: Albert Wang 
---
 .../batch-jobs/FhirBulkExportChunkJob.xml     |  9 ++--
 .../FhirBulkExportGroupChunkJob.xml           | 11 ++--
 .../FhirBulkExportPatientChunkJob.xml         |  9 ++--
 .../com/ibm/fhir/bulkcommon/Constants.java    |  6 ++-
 .../common/CheckPointAlgorithm.java           | 52 ++++++++++---------
 5 files changed, 45 insertions(+), 42 deletions(-)

diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml
index 2e83a189a40..c976021d364 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml
+++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml
@@ -1,13 +1,10 @@
 
 
-    
-        
-    
     
         
     
     
-        
+        
             
                 
                     
@@ -35,8 +32,8 @@
             
             
                 
-                    
-                    
+                    
+                    
                 
             
         
diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml
index e6dbe22825c..164e95a4a6d 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml
+++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportGroupChunkJob.xml
@@ -1,13 +1,10 @@
 
 
-    
-        
-    
     
         
     
     
-        
+        
             
                 
                     
@@ -32,6 +29,12 @@
                     
                 
             
+            
+                
+                    
+                    
+                
+            
         
         
             
diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml
index 014dd53d28b..9b0b44d9895 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml
+++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportPatientChunkJob.xml
@@ -1,13 +1,10 @@
 
 
-    
-        
-    
     
         
     
     
-        
+        
             
                 
                     
@@ -33,8 +30,8 @@
             
             
                 
-                    
-                    
+                    
+                    
                 
             
         
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java
index 0e1d8290741..154d07bd729 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java
@@ -18,9 +18,9 @@ public class Constants {
     // The minimal size (5M bytes) for COS multiple-parts upload.
     public static final int COS_PART_MINIMALSIZE = 5242880;
     public static final int DEFAULT_SEARCH_PAGE_SIZE = 1000;
-    public static final int DEFAULT_NUMOFPAGES_EACH_COS_OBJECT = 10;
     public static final int DEFAULT_NUMOFOBJECTS_PERREAD = 1;
-    public static final int DEFAULT_MAXCOSFILE_SIZE = 104857600;
+    public static final int DEFAULT_COSFILE_MAX_SIZE = 104857600;
+    public static final int DEFAULT_COSFILE_MAX_RESOURCESNUMBER = 50000;
     public static final String FHIR_SEARCH_LASTUPDATED = "_lastUpdated";
     public static final byte[] NDJSON_LINESEPERATOR = "\r\n".getBytes();
 
@@ -41,6 +41,8 @@ public class Constants {
     public static final String COS_LOCATION = "cos.location";
     public static final String COS_BUCKET_NAME = "cos.bucket.name";
     public static final String COS_IS_IBM_CREDENTIAL = "cos.credential.ibm";
+    public static final String COS_BUCKET_FILE_MAX_SZIE = "cos.bucket.filemaxsize";
+    public static final String COS_BUCKET_FILE_MAX_RESOURCES = "cos.bucket.filemaxresources";
     // COS bucket for import OperationOutcomes
     public static final String COS_OPERATIONOUTCOMES_BUCKET_NAME = "cos.operationoutcomes.bucket.name";
     public static final String FHIR_TENANT = "fhir.tenant";
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java
index 11f5bdbd811..7bd3f8f30f3 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java
@@ -6,11 +6,12 @@
 
 package com.ibm.fhir.bulkexport.common;
 
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.batch.api.BatchProperty;
 import javax.batch.api.chunk.CheckpointAlgorithm;
-import javax.batch.runtime.context.JobContext;
+import javax.batch.runtime.context.StepContext;
 import javax.inject.Inject;
 
 import com.ibm.fhir.bulkcommon.Constants;
@@ -23,21 +24,21 @@
 public class CheckPointAlgorithm implements CheckpointAlgorithm {
     private final static Logger logger = Logger.getLogger(CheckPointAlgorithm.class.getName());
     @Inject
-    JobContext jobContext;
+    StepContext stepCtx;
 
     /**
-     * The cos.pagesperobject.
+     * The file resources number limit when exporting to multiple COS files.
      */
     @Inject
-    @BatchProperty(name = "cos.pagesperobject")
-    String pagesPerCosObject;
+    @BatchProperty(name = Constants.COS_BUCKET_FILE_MAX_RESOURCES)
+    String cosBucketFileMaxResources;
 
     /**
      * The file size limit when exporting to multiple COS files.
      */
     @Inject
-    @BatchProperty(name = "cos.bucket.maxfilesize")
-    String cosBucketMaxFileSize;
+    @BatchProperty(name = Constants.COS_BUCKET_FILE_MAX_SZIE)
+    String cosBucketFileMaxSize;
 
     /**
      * Default constructor.
@@ -75,39 +76,42 @@ public void beginCheckpoint() {
      */
     @Override
     public boolean isReadyToCheckpoint() {
-        TransientUserData chunkData = (TransientUserData) jobContext.getTransientUserData();
+        TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData();
 
         if (chunkData != null) {
             if (chunkData.isSingleCosObject()) {
                 return chunkData.getBufferStream().size() > Constants.COS_PART_MINIMALSIZE
                         || chunkData.getPageNum() > chunkData.getLastPageNum();
             } else {
-                int numofPagePerCosObject, cosMaxFileSize;
-                if (cosBucketMaxFileSize != null) {
+                int cosFileMaxResources = Constants.DEFAULT_COSFILE_MAX_RESOURCESNUMBER, cosFileMaxSize = Constants.DEFAULT_COSFILE_MAX_SIZE;
+                if (cosBucketFileMaxSize != null) {
                     try {
-                        cosMaxFileSize = Integer.parseInt(cosBucketMaxFileSize);
-                        logger.fine("isReadyToCheckpoint: Set max COS file size to " + cosMaxFileSize + ".");
+                        cosFileMaxSize = Integer.parseInt(cosBucketFileMaxSize);
+                        if (logger.isLoggable(Level.FINE)) {
+                            logger.fine("isReadyToCheckpoint: Set max COS file size to " + cosFileMaxSize + ".");
+                        }
                     } catch (Exception e) {
-                        cosMaxFileSize = Constants.DEFAULT_MAXCOSFILE_SIZE;
                         logger.warning("isReadyToCheckpoint: Set max COS file size to default("
-                                + Constants.DEFAULT_MAXCOSFILE_SIZE + ").");
+                                + Constants.DEFAULT_COSFILE_MAX_SIZE + ").");
                     }
-                    return (chunkData.getBufferStream().size() >= cosMaxFileSize
-                            || chunkData.getPageNum() > chunkData.getLastPageNum());
+
                 } else {
-                    if (pagesPerCosObject != null) {
+                    if (cosBucketFileMaxResources != null) {
                         try {
-                            numofPagePerCosObject = Integer.parseInt(pagesPerCosObject);
-                            logger.fine("isReadyToCheckpoint: " + numofPagePerCosObject + " pages per COS object!");
+                            cosFileMaxResources = Integer.parseInt(cosBucketFileMaxResources);
+                            if (logger.isLoggable(Level.FINE)) {
+                                logger.fine("isReadyToCheckpoint: " + cosFileMaxResources + " resources per COS file!");
+                            }
                         } catch (Exception e) {
-                            numofPagePerCosObject = Constants.DEFAULT_NUMOFPAGES_EACH_COS_OBJECT;
-                            logger.warning("isReadyToCheckpoint: Set number of pages per COS object to default("
-                                    + Constants.DEFAULT_NUMOFPAGES_EACH_COS_OBJECT + ").");
+                            logger.warning("isReadyToCheckpoint: Set number of resources per COS file to default("
+                                    + Constants.DEFAULT_COSFILE_MAX_RESOURCESNUMBER + ").");
                         }
-                        return ((chunkData.getPageNum() - 1) % numofPagePerCosObject == 0
-                                || chunkData.getPageNum() > chunkData.getLastPageNum());
                     }
                 }
+
+                return (chunkData.getPageNum() > chunkData.getLastPageNum()
+                        || chunkData.getBufferStream().size() >= cosFileMaxSize
+                        || chunkData.getCurrentPartResourceNum() >= cosFileMaxResources);
             }
         }
         return true;

From c4a64643681f778b9ae7e49f5fa97a5b99edc598 Mon Sep 17 00:00:00 2001
From: Albert Wang 
Date: Tue, 26 May 2020 08:57:06 -0400
Subject: [PATCH 4/6] issue #779 #1150 further refactor and fix CWWKY0041W
 warnings

Signed-off-by: Albert Wang 
---
 docs/src/pages/guides/FHIRServerUsersGuide.md |  6 ++
 .../batch-jobs/FhirBulkExportChunkJob.xml     |  2 -
 .../ibm/fhir/bulkcommon/BulkDataUtils.java    | 10 ++-
 .../com/ibm/fhir/bulkcommon/Constants.java    |  6 +-
 .../common/CheckPointAlgorithm.java           | 66 +++++++-------
 .../bulkexport/common/CheckPointUserData.java | 55 ++++++++----
 .../bulkexport/common/TransientUserData.java  |  9 +-
 .../fhir/bulkexport/group/ChunkReader.java    |  4 +-
 .../fhir/bulkexport/patient/ChunkReader.java  |  6 +-
 .../patient/PatientExportPartitionMapper.java |  2 +
 .../fhir/bulkexport/system/ChunkReader.java   | 23 ++---
 .../fhir/bulkexport/system/ChunkWriter.java   | 89 +++++++------------
 .../bulkexport/system/ExportJobListener.java  |  2 +
 .../system/ExportPartitionAnalyzer.java       |  2 +
 .../system/ExportPartitionCollector.java      |  2 +
 .../system/SystemExportPartitionMapper.java   |  2 +
 .../com/ibm/fhir/bulkimport/ChunkReader.java  |  2 +
 .../com/ibm/fhir/bulkimport/ChunkWriter.java  |  2 +
 .../fhir/bulkimport/ImportJobListener.java    |  6 +-
 .../bulkimport/ImportPartitionAnalyzer.java   |  2 +
 .../bulkimport/ImportPartitionCollector.java  |  4 +-
 .../bulkimport/ImportPartitionMapper.java     |  2 +
 .../ibm/fhir/config/FHIRConfiguration.java    |  2 +
 23 files changed, 165 insertions(+), 141 deletions(-)

diff --git a/docs/src/pages/guides/FHIRServerUsersGuide.md b/docs/src/pages/guides/FHIRServerUsersGuide.md
index 0cb615ce41a..45b8cc84d84 100644
--- a/docs/src/pages/guides/FHIRServerUsersGuide.md
+++ b/docs/src/pages/guides/FHIRServerUsersGuide.md
@@ -1486,6 +1486,8 @@ This section contains reference information about each of the configuration prop
 |`fhirServer/bulkdata/validBaseUrls`|string|The list of supported urls which are approved for the fhir server to access|
 |`fhirServer/bulkdata/validBaseUrlsDisabled`|boolean|Disables the URL checking feature|
 |`fhirServer/bulkdata/maxInputPerRequest`|integer|The maximum inputs per bulk import|
+|`fhirServer/bulkdata/cosFileMaxResources`|int|Max FHIR resources per COS file, "-1" means no limit |
+|`fhirServer/bulkdata/cosFileMaxSize`|int|Max COS file size in bytes, "-1" means no limit |
 
 
 ### 5.1.2 Default property values
@@ -1532,6 +1534,8 @@ This section contains reference information about each of the configuration prop
 |`fhirServer/audit/serviceProperties/geoCounty`|US|
 |`fhirServer/bulkdata/isExportPublic`|true|
 |`fhirServer/bulkdata/validBaseUrlsDisabled`|false|
+|`fhirServer/bulkdata/cosFileMaxResources`|500000|
+|`fhirServer/bulkdata/cosFileMaxSize`|209715200|
 
 
 ### 5.1.3 Property attributes
@@ -1596,6 +1600,8 @@ must restart the server for that change to take effect.
 |`fhirServer/bulkdata/validBaseUrls`|Y|Y|
 |`fhirServer/bulkdata/maxInputPerRequest`|Y|Y|
 |`fhirServer/bulkdata/validBaseUrlsDisabled`|Y|Y|
+|`fhirServer/bulkdata/cosFileMaxResources`|N|Y|
+|`fhirServer/bulkdata/cosFileMaxSize`|N|Y|
 
 ## 5.2 Keystores, truststores, and the FHIR server
 
diff --git a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml
index c976021d364..1649d6bce04 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml
+++ b/fhir-bulkimportexport-webapp/src/main/java/META-INF/batch-jobs/FhirBulkExportChunkJob.xml
@@ -13,7 +13,6 @@
                     
                     
                     
-                    
                                        
                 
             
@@ -26,7 +25,6 @@
                     
                     
                     
-                    
                     
                 
             
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/BulkDataUtils.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/BulkDataUtils.java
index ee937fb4ba7..97f1d281103 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/BulkDataUtils.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/BulkDataUtils.java
@@ -16,9 +16,11 @@
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +50,7 @@
 import com.ibm.cloud.objectstorage.services.s3.model.GetObjectRequest;
 import com.ibm.cloud.objectstorage.services.s3.model.InitiateMultipartUploadRequest;
 import com.ibm.cloud.objectstorage.services.s3.model.InitiateMultipartUploadResult;
+import com.ibm.cloud.objectstorage.services.s3.model.ObjectMetadata;
 import com.ibm.cloud.objectstorage.services.s3.model.PartETag;
 import com.ibm.cloud.objectstorage.services.s3.model.S3Object;
 import com.ibm.cloud.objectstorage.services.s3.model.S3ObjectInputStream;
@@ -88,6 +91,11 @@ public static String startPartUpload(AmazonS3 cosClient, String bucketName, Stri
             InitiateMultipartUploadRequest initMultipartUploadReq = new InitiateMultipartUploadRequest(bucketName, itemName);
             if (isPublicAccess) {
                 initMultipartUploadReq.setCannedACL(CannedAccessControlList.PublicRead);
+                ObjectMetadata metadata = new ObjectMetadata();
+                // Set expiration time to 2 hours(7200 seconds).
+                // Note: IBM COS doesn't honor this but also doesn't fail on this.
+                metadata.setExpirationTime(Date.from(Instant.now().plusSeconds(7200)));
+                initMultipartUploadReq.setObjectMetadata(metadata);
             }
 
             InitiateMultipartUploadResult mpResult = cosClient.initiateMultipartUpload(initMultipartUploadReq);
@@ -420,7 +428,7 @@ public static Map, List>>> ge
         }
         return searchParametersForResoureTypes;
     }
-    
+
     public static JsonArray getDataSourcesFromJobInput(String dataSourcesInfo) {
         try (JsonReader reader =
                 Json.createReader(new StringReader(
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java
index 154d07bd729..4d614d440a6 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java
@@ -18,9 +18,8 @@ public class Constants {
     // The minimal size (5M bytes) for COS multiple-parts upload.
     public static final int COS_PART_MINIMALSIZE = 5242880;
     public static final int DEFAULT_SEARCH_PAGE_SIZE = 1000;
-    public static final int DEFAULT_NUMOFOBJECTS_PERREAD = 1;
-    public static final int DEFAULT_COSFILE_MAX_SIZE = 104857600;
-    public static final int DEFAULT_COSFILE_MAX_RESOURCESNUMBER = 50000;
+    public static final int DEFAULT_COSFILE_MAX_SIZE = 209715200;
+    public static final int DEFAULT_COSFILE_MAX_RESOURCESNUMBER = 500000;
     public static final String FHIR_SEARCH_LASTUPDATED = "_lastUpdated";
     public static final byte[] NDJSON_LINESEPERATOR = "\r\n".getBytes();
 
@@ -56,7 +55,6 @@ public class Constants {
     public static final String EXPORT_FHIR_SEARCH_PAGESIZE = "fhir.search.pagesize";
     public static final String EXPORT_FHIR_SEARCH_TYPEFILTERS = "fhir.typeFilters";
     public static final String EXPORT_FHIR_SEARCH_PATIENTGROUPID = "fhir.search.patientgroupid";
-    public static final String EXPORT_COS_OBJECTNAME = "cos.bucket.objectname";
     public static final String EXPORT_COS_OBJECT_PATHPREFIX = "cos.bucket.pathprefix";
 
     // Partition work item info generated in ImportPartitionMapper.
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java
index 7bd3f8f30f3..a481bebda4f 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java
@@ -12,17 +12,23 @@
 import javax.batch.api.BatchProperty;
 import javax.batch.api.chunk.CheckpointAlgorithm;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.fhir.bulkcommon.Constants;
+import com.ibm.fhir.config.FHIRConfigHelper;
+import com.ibm.fhir.config.FHIRConfiguration;
 
 /**
  * Bulk export Chunk implementation - custom checkpoint algorithm.
  *
  */
-
+@Dependent
 public class CheckPointAlgorithm implements CheckpointAlgorithm {
     private final static Logger logger = Logger.getLogger(CheckPointAlgorithm.class.getName());
+    private int cosFileMaxResources = FHIRConfigHelper.getIntProperty(FHIRConfiguration.PROPERTY_BULKDATA_BATCHJOB_COSFILEMAXRESOURCES, Constants.DEFAULT_COSFILE_MAX_RESOURCESNUMBER);
+    private int cosFileMaxSize = FHIRConfigHelper.getIntProperty(FHIRConfiguration.PROPERTY_BULKDATA_BATCHJOB_COSFILEMAXSIZE  , Constants.DEFAULT_COSFILE_MAX_SIZE);
+
     @Inject
     StepContext stepCtx;
 
@@ -79,42 +85,38 @@ public boolean isReadyToCheckpoint() {
         TransientUserData chunkData = (TransientUserData) stepCtx.getTransientUserData();
 
         if (chunkData != null) {
-            if (chunkData.isSingleCosObject()) {
-                return chunkData.getBufferStream().size() > Constants.COS_PART_MINIMALSIZE
-                        || chunkData.getPageNum() > chunkData.getLastPageNum();
-            } else {
-                int cosFileMaxResources = Constants.DEFAULT_COSFILE_MAX_RESOURCESNUMBER, cosFileMaxSize = Constants.DEFAULT_COSFILE_MAX_SIZE;
-                if (cosBucketFileMaxSize != null) {
-                    try {
-                        cosFileMaxSize = Integer.parseInt(cosBucketFileMaxSize);
-                        if (logger.isLoggable(Level.FINE)) {
-                            logger.fine("isReadyToCheckpoint: Set max COS file size to " + cosFileMaxSize + ".");
-                        }
-                    } catch (Exception e) {
-                        logger.warning("isReadyToCheckpoint: Set max COS file size to default("
-                                + Constants.DEFAULT_COSFILE_MAX_SIZE + ").");
+            if (cosBucketFileMaxSize != null) {
+                try {
+                    cosFileMaxSize = Integer.parseInt(cosBucketFileMaxSize);
+                    if (logger.isLoggable(Level.FINE)) {
+                        logger.fine("isReadyToCheckpoint: Set max COS file size to " + cosFileMaxSize + ".");
                     }
+                } catch (Exception e) {
+                    logger.warning("isReadyToCheckpoint: Set max COS file size to default("
+                            + Constants.DEFAULT_COSFILE_MAX_SIZE + ").");
+                }
+
+            }
 
-                } else {
-                    if (cosBucketFileMaxResources != null) {
-                        try {
-                            cosFileMaxResources = Integer.parseInt(cosBucketFileMaxResources);
-                            if (logger.isLoggable(Level.FINE)) {
-                                logger.fine("isReadyToCheckpoint: " + cosFileMaxResources + " resources per COS file!");
-                            }
-                        } catch (Exception e) {
-                            logger.warning("isReadyToCheckpoint: Set number of resources per COS file to default("
-                                    + Constants.DEFAULT_COSFILE_MAX_RESOURCESNUMBER + ").");
-                        }
+            if (cosBucketFileMaxResources != null) {
+                try {
+                    cosFileMaxResources = Integer.parseInt(cosBucketFileMaxResources);
+                    if (logger.isLoggable(Level.FINE)) {
+                        logger.fine("isReadyToCheckpoint: " + cosFileMaxResources + " resources per COS file!");
                     }
+                } catch (Exception e) {
+                    logger.warning("isReadyToCheckpoint: Set number of resources per COS file to default("
+                            + Constants.DEFAULT_COSFILE_MAX_RESOURCESNUMBER + ").");
                 }
+            }
+
+            chunkData.setFinishCurrentUpload(cosFileMaxSize != -1 && chunkData.getCurrentUploadSize() >= cosFileMaxSize
+                    || cosFileMaxResources != -1 && chunkData.getCurrentUploadResourceNum() >= cosFileMaxResources);
 
-                return (chunkData.getPageNum() > chunkData.getLastPageNum()
-                        || chunkData.getBufferStream().size() >= cosFileMaxSize
-                        || chunkData.getCurrentPartResourceNum() >= cosFileMaxResources);
+            return (chunkData.getPageNum() > chunkData.getLastPageNum()
+                    || chunkData.getBufferStream().size() > Constants.COS_PART_MINIMALSIZE
+                    || chunkData.isFinishCurrentUpload());
             }
+            return false;
         }
-        return true;
-    }
-
 }
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java
index 123db7e45e0..6ebc36607ed 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointUserData.java
@@ -20,9 +20,11 @@ public class CheckPointUserData implements java.io.Serializable {
     private int lastPageNum;
     private int partNum;
     private String uploadId;
-    private boolean isSingleCosObject = false;
+    private int uploadCount = 1;
     private List cosDataPacks;
-    private int currentPartResourceNum = 0;
+    private int currentUploadResourceNum = 0;
+    private int currentUploadSize = 0;
+    private boolean isFinishCurrentUpload = false;
     private int totalResourcesNum = 0;
     // One resource type can have 0 to multiple typeFilters, indexOfCurrentTypeFilter is used to tell the currently processed typeFilter.
     private int indexOfCurrentTypeFilter;
@@ -32,7 +34,7 @@ public class CheckPointUserData implements java.io.Serializable {
     private boolean isMoreToExport = true;
 
     public CheckPointUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentTypeFilter,
-            String resourceTypeSummary, int totalResourcesNum, int currentPartResourceNum) {
+            String resourceTypeSummary, int totalResourcesNum, int currentUploadResourceNum, int currentUploadSize, int uploadCount) {
         super();
         this.pageNum = pageNum;
         this.uploadId = uploadId;
@@ -41,13 +43,16 @@ public CheckPointUserData(int pageNum, String uploadId, List cosDataPa
         this.indexOfCurrentTypeFilter = indexOfCurrentTypeFilter;
         this.resourceTypeSummary = resourceTypeSummary;
         this.totalResourcesNum = totalResourcesNum;
-        this.currentPartResourceNum = currentPartResourceNum;
+        this.currentUploadResourceNum = currentUploadResourceNum;
+        this.currentUploadSize = currentUploadSize;
+        this.uploadCount = uploadCount;
     }
 
     public static CheckPointUserData fromTransientUserData(TransientUserData userData) {
         return new CheckPointUserData(userData.getPageNum(), userData.getUploadId(), userData.getCosDataPacks(),
                 userData.getPartNum(), userData.getIndexOfCurrentTypeFilter(), userData.getResourceTypeSummary(),
-                userData.getTotalResourcesNum(), userData.getCurrentPartResourceNum());
+                userData.getTotalResourcesNum(), userData.getCurrentUploadResourceNum(), userData.getCurrentUploadSize(),
+                userData.getUploadCount());
     }
 
     public int getPageNum() {
@@ -74,14 +79,6 @@ public void setCosDataPacks(List cosDataPacks) {
         this.cosDataPacks = cosDataPacks;
     }
 
-    public boolean isSingleCosObject() {
-        return isSingleCosObject;
-    }
-
-    public void setSingleCosObject(boolean isSingleCosObject) {
-        this.isSingleCosObject = isSingleCosObject;
-    }
-
     public int getPartNum() {
         return partNum;
     }
@@ -98,12 +95,12 @@ public void setLastPageNum(int lastPageNum) {
         this.lastPageNum = lastPageNum;
     }
 
-    public int getCurrentPartResourceNum() {
-        return currentPartResourceNum;
+    public int getCurrentUploadResourceNum() {
+        return currentUploadResourceNum;
     }
 
-    public void setCurrentPartResourceNum(int currentPartResourceNum) {
-        this.currentPartResourceNum = currentPartResourceNum;
+    public void setCurrentUploadResourceNum(int currentUploadResourceNum) {
+        this.currentUploadResourceNum = currentUploadResourceNum;
     }
 
     public int getIndexOfCurrentTypeFilter() {
@@ -138,4 +135,28 @@ public void setTotalResourcesNum(int totalResourcesNum) {
         this.totalResourcesNum = totalResourcesNum;
     }
 
+    public int getCurrentUploadSize() {
+        return currentUploadSize;
+    }
+
+    public void setCurrentUploadSize(int currentUploadSize) {
+        this.currentUploadSize = currentUploadSize;
+    }
+
+    public boolean isFinishCurrentUpload() {
+        return isFinishCurrentUpload;
+    }
+
+    public void setFinishCurrentUpload(boolean isFinishCurrentUpload) {
+        this.isFinishCurrentUpload = isFinishCurrentUpload;
+    }
+
+    public int getUploadCount() {
+        return uploadCount;
+    }
+
+    public void setUploadCount(int uploadCount) {
+        this.uploadCount = uploadCount;
+    }
+
 }
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java
index 7069420a310..9684bb5a4a6 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/TransientUserData.java
@@ -20,14 +20,17 @@ public class TransientUserData extends CheckPointUserData {
     private ByteArrayOutputStream bufferStream = new ByteArrayOutputStream();
 
     public TransientUserData(int pageNum, String uploadId, List cosDataPacks, int partNum, int indexOfCurrentTypeFilter,
-            String resourceTypeSummary, int totalResourcesNum, int currentPartResourceNum) {
-        super(pageNum, uploadId, cosDataPacks, partNum, indexOfCurrentTypeFilter, resourceTypeSummary, totalResourcesNum, currentPartResourceNum);
+            String resourceTypeSummary, int totalResourcesNum, int currentPartResourceNum, int currentPartSize, int uploadCount) {
+        super(pageNum, uploadId, cosDataPacks, partNum, indexOfCurrentTypeFilter, resourceTypeSummary,
+                totalResourcesNum, currentPartResourceNum, currentPartSize, uploadCount);
     }
 
     public static TransientUserData fromCheckPointUserData(CheckPointUserData checkPointData) {
         return new TransientUserData(checkPointData.getPageNum(), checkPointData.getUploadId(),
                 checkPointData.getCosDataPacks(), checkPointData.getPartNum(), checkPointData.getIndexOfCurrentTypeFilter(),
-                checkPointData.getResourceTypeSummary(), checkPointData.getTotalResourcesNum(), checkPointData.getCurrentPartResourceNum());
+                checkPointData.getResourceTypeSummary(), checkPointData.getTotalResourcesNum(),
+                checkPointData.getCurrentUploadResourceNum(), checkPointData.getCurrentUploadSize(),
+                checkPointData.getUploadCount());
     }
 
     public ByteArrayOutputStream getBufferStream() {
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java
index d6c05527a54..b665752ecdc 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/group/ChunkReader.java
@@ -18,6 +18,7 @@
 
 import javax.batch.api.BatchProperty;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.cloud.objectstorage.services.s3.model.PartETag;
@@ -35,6 +36,7 @@
 /**
  * Bulk patient group export Chunk implementation - the Reader.
  */
+@Dependent
 public class ChunkReader extends com.ibm.fhir.bulkexport.patient.ChunkReader {
     private final static Logger logger = Logger.getLogger(ChunkReader.class.getName());
     // List for the patients
@@ -118,7 +120,7 @@ public Object readItem() throws Exception {
         pageNum++;
 
         if (chunkData == null) {
-            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, null, 0, 0);
+            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, null, 0, 0, 0, 1);
             stepCtx.setTransientUserData(chunkData);
         } else {
             chunkData.setPageNum(pageNum);
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java
index 5f3de3991e8..de211604917 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/ChunkReader.java
@@ -22,6 +22,7 @@
 import javax.batch.api.BatchProperty;
 import javax.batch.api.chunk.AbstractItemReader;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.cloud.objectstorage.services.s3.model.PartETag;
@@ -48,6 +49,7 @@
  * Bulk patient export Chunk implementation - the Reader.
  *
  */
+@Dependent
 public class ChunkReader extends AbstractItemReader {
     private final static Logger logger = Logger.getLogger(ChunkReader.class.getName());
     protected int pageNum = 1;
@@ -199,7 +201,7 @@ protected void fillChunkDataBuffer(List patientIds) throws Exception {
                 indexOfCurrentTypeFilter++;
             } while (searchParametersForResoureTypes.get(resourceType) != null && indexOfCurrentTypeFilter < searchParametersForResoureTypes.get(resourceType).size());
 
-            chunkData.setCurrentPartResourceNum(chunkData.getCurrentPartResourceNum() + resSubTotal);
+            chunkData.setCurrentUploadResourceNum(chunkData.getCurrentUploadResourceNum() + resSubTotal);
             chunkData.setTotalResourcesNum(chunkData.getTotalResourcesNum() + resSubTotal);
             if (logger.isLoggable(Level.FINE)) {
                 logger.fine("fillChunkDataBuffer: Processed resources - " + resSubTotal + "; Bufferred data size - "
@@ -252,7 +254,7 @@ public Object readItem() throws Exception {
         pageNum++;
 
         if (chunkData == null) {
-            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, null, 0, 0);
+            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, null, 0, 0, 0, 1);
             chunkData.setLastPageNum(searchContext.getLastPageNumber());
             stepCtx.setTransientUserData(chunkData);
         } else {
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/PatientExportPartitionMapper.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/PatientExportPartitionMapper.java
index 78528f37d5c..dab268687d5 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/PatientExportPartitionMapper.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/patient/PatientExportPartitionMapper.java
@@ -15,11 +15,13 @@
 import javax.batch.api.partition.PartitionMapper;
 import javax.batch.api.partition.PartitionPlan;
 import javax.batch.api.partition.PartitionPlanImpl;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.fhir.bulkcommon.Constants;
 import com.ibm.fhir.search.compartment.CompartmentUtil;
 
+@Dependent
 public class PatientExportPartitionMapper implements PartitionMapper {
 
     /**
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java
index 4ed7a6a8ba8..0321e4db43f 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkReader.java
@@ -21,6 +21,7 @@
 import javax.batch.api.BatchProperty;
 import javax.batch.api.chunk.AbstractItemReader;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.cloud.objectstorage.services.s3.model.PartETag;
@@ -46,6 +47,7 @@
  * Bulk system export Chunk implementation - the Reader.
  *
  */
+@Dependent
 public class ChunkReader extends AbstractItemReader {
     private final static Logger logger = Logger.getLogger(ChunkReader.class.getName());
     boolean isSingleCosObject = false;
@@ -114,13 +116,6 @@ public class ChunkReader extends AbstractItemReader {
     @BatchProperty(name = Constants.EXPORT_FHIR_SEARCH_PAGESIZE)
     String fhirSearchPageSize;
 
-    /**
-     * The Cos object name.
-     */
-    @Inject
-    @BatchProperty(name = Constants.EXPORT_COS_OBJECTNAME)
-    String cosBucketObjectName;
-
     @Inject
     StepContext stepCtx;
 
@@ -157,7 +152,8 @@ private void fillChunkDataBuffer(List resources) throws Exception {
                     throw e;
                 }
             }
-            chunkData.setCurrentPartResourceNum(chunkData.getCurrentPartResourceNum() + resSubTotal);
+            chunkData.setCurrentUploadResourceNum(chunkData.getCurrentUploadResourceNum() + resSubTotal);
+            chunkData.setCurrentUploadSize(chunkData.getCurrentUploadSize() + chunkData.getBufferStream().size());
             chunkData.setTotalResourcesNum(chunkData.getTotalResourcesNum() + resSubTotal);
             logger.fine("fillChunkDataBuffer: Processed resources - " + resSubTotal + "; Bufferred data size - "
                     + chunkData.getBufferStream().size());
@@ -221,11 +217,8 @@ public Object readItem() throws Exception {
         pageNum++;
 
         if (chunkData == null) {
-            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, null, 0, 0);
+            chunkData = new TransientUserData(pageNum, null, new ArrayList(), 1, 0, null, 0, 0, 0, 1);
             chunkData.setLastPageNum(searchContext.getLastPageNumber());
-            if (isSingleCosObject) {
-                chunkData.setSingleCosObject(true);
-            }
             stepCtx.setTransientUserData(chunkData);
         } else {
             chunkData.setPageNum(pageNum);
@@ -273,12 +266,6 @@ public void open(Serializable checkpoint) throws Exception {
             }
         }
 
-        if (cosBucketObjectName != null
-                && cosBucketObjectName.trim().length() > 0) {
-            isSingleCosObject = true;
-            logger.info("open: Use single COS object for uploading!");
-        }
-
         FHIRRequestContext.set(new FHIRRequestContext(fhirTenant, fhirDatastoreId));
         FHIRPersistenceHelper fhirPersistenceHelper = new FHIRPersistenceHelper();
         fhirPersistence = fhirPersistenceHelper.getFHIRPersistenceImplementation();
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java
index f079c09b2fc..7c94759545f 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ChunkWriter.java
@@ -8,8 +8,6 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
-import java.time.Instant;
-import java.util.Date;
 import java.util.List;
 import java.util.logging.Logger;
 
@@ -17,13 +15,11 @@
 import javax.batch.api.chunk.AbstractItemWriter;
 import javax.batch.runtime.context.JobContext;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.cloud.objectstorage.services.s3.AmazonS3;
-import com.ibm.cloud.objectstorage.services.s3.model.CannedAccessControlList;
 import com.ibm.cloud.objectstorage.services.s3.model.CreateBucketRequest;
-import com.ibm.cloud.objectstorage.services.s3.model.ObjectMetadata;
-import com.ibm.cloud.objectstorage.services.s3.model.PutObjectRequest;
 import com.ibm.fhir.bulkcommon.BulkDataUtils;
 import com.ibm.fhir.bulkcommon.Constants;
 import com.ibm.fhir.bulkexport.common.TransientUserData;
@@ -34,6 +30,7 @@
  * Bulk export Chunk implementation - the Writer.
  *
  */
+@Dependent
 public class ChunkWriter extends AbstractItemWriter {
     private static final Logger logger = Logger.getLogger(ChunkWriter.class.getName());
     private AmazonS3 cosClient = null;
@@ -95,13 +92,6 @@ public class ChunkWriter extends AbstractItemWriter {
     @BatchProperty(name = Constants.PARTITION_RESOURCE_TYPE)
     String fhirResourceType;
 
-    /**
-     * The Cos object name(only used by system export for exporting single resource type)
-     */
-    @Inject
-    @BatchProperty(name = Constants.EXPORT_COS_OBJECTNAME)
-    String cosBucketObjectName;
-
     @Inject
     StepContext stepCtx;
 
@@ -126,69 +116,52 @@ private void pushFhirJsonsToCos(InputStream in, int dataLength) throws Exception
             logger.warning("pushFhirJsons2Cos: chunkData is null, this should never happen!");
             throw new Exception("pushFhirJsons2Cos: chunkData is null, this should never happen!");
         }
-        if (chunkData.isSingleCosObject()) {
-            if (chunkData.getUploadId() == null) {
-                chunkData.setUploadId(BulkDataUtils.startPartUpload(cosClient, cosBucketName, cosBucketObjectName, false));
-            }
 
-            chunkData.getCosDataPacks().add(BulkDataUtils.multiPartUpload(cosClient, cosBucketName, cosBucketObjectName,
-                    chunkData.getUploadId(), in, dataLength, chunkData.getPartNum()));
-            logger.info("pushFhirJsons2Cos: " + dataLength + " bytes were successfully appended to COS object - "
-                    + cosBucketObjectName);
-            chunkData.setPartNum(chunkData.getPartNum() + 1);
-            chunkData.getBufferStream().reset();
-
-            if (chunkData.getPageNum() > chunkData.getLastPageNum()) {
-                BulkDataUtils.finishMultiPartUpload(cosClient, cosBucketName, cosBucketObjectName, chunkData.getUploadId(),
-                        chunkData.getCosDataPacks());
-                chunkData.setResourceTypeSummary(fhirResourceType + "[" + chunkData.getCurrentPartResourceNum() + "]");
-            }
+        String itemName;
+        if (cosBucketPathPrefix != null && cosBucketPathPrefix.trim().length() > 0) {
+          itemName = cosBucketPathPrefix + "/" + fhirResourceType + "_" + chunkData.getUploadCount() + ".ndjson";
 
         } else {
-            ObjectMetadata metadata = new ObjectMetadata();
-            metadata.setContentLength(dataLength);
-
-            String itemName;
-            PutObjectRequest req;
-            if (cosBucketPathPrefix != null && cosBucketPathPrefix.trim().length() > 0) {
-                itemName = cosBucketPathPrefix + "/" + fhirResourceType + "_" + chunkData.getPartNum() + ".ndjson";
-                if (isExportPublic) {
-                    // Set expiration time to 2 hours(7200 seconds).
-                    // Note: IBM COS doesn't honor this but also doesn't fail on this.
-                    metadata.setExpirationTime(Date.from(Instant.now().plusSeconds(7200)));
-                }
-
-                req = new PutObjectRequest(cosBucketName, itemName, in, metadata);
+          itemName = "job" + jobContext.getExecutionId() + "/" + fhirResourceType + "_" + chunkData.getUploadCount() + ".ndjson";
+        }
 
-                if (isExportPublic) {
-                    // Give public read only access.
-                    req.setCannedAcl(CannedAccessControlList.PublicRead);
-                }
+        if (chunkData.getUploadId() == null) {
+            chunkData.setUploadId(BulkDataUtils.startPartUpload(cosClient, cosBucketName, itemName, isExportPublic));
+        }
 
-            } else {
-                itemName = "job" + jobContext.getExecutionId() + "/" + fhirResourceType + "_" + chunkData.getPartNum() + ".ndjson";
-                req = new PutObjectRequest(cosBucketName, itemName, in, metadata);
-            }
+        chunkData.getCosDataPacks().add(BulkDataUtils.multiPartUpload(cosClient, cosBucketName, itemName,
+                chunkData.getUploadId(), in, dataLength, chunkData.getPartNum()));
+        logger.info("pushFhirJsons2Cos: " + dataLength + " bytes were successfully appended to COS object - "
+                + itemName);
+        chunkData.setPartNum(chunkData.getPartNum() + 1);
+        chunkData.getBufferStream().reset();
 
-            cosClient.putObject(req);
-            logger.info(
-                    "pushFhirJsons2Cos: " + itemName + "(" + dataLength + " bytes) was successfully written to COS");
+        if (chunkData.getPageNum() > chunkData.getLastPageNum() || chunkData.isFinishCurrentUpload()) {
+            BulkDataUtils.finishMultiPartUpload(cosClient, cosBucketName, itemName, chunkData.getUploadId(),
+                    chunkData.getCosDataPacks());
             // Partition status for the exported resources, e.g, Patient[1000,1000,200]
             if (chunkData.getResourceTypeSummary() == null) {
-                chunkData.setResourceTypeSummary(fhirResourceType + "[" + chunkData.getCurrentPartResourceNum());
+                chunkData.setResourceTypeSummary(fhirResourceType + "[" + chunkData.getCurrentUploadResourceNum());
                 if (chunkData.getPageNum() > chunkData.getLastPageNum()) {
                     chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "]");
                 }
             } else {
-                chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "," + chunkData.getCurrentPartResourceNum());
+                chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "," + chunkData.getCurrentUploadResourceNum());
                 if (chunkData.getPageNum() > chunkData.getLastPageNum()) {
                     chunkData.setResourceTypeSummary(chunkData.getResourceTypeSummary() + "]");
                     stepCtx.setTransientUserData(chunkData);
                 }
             }
-            chunkData.setPartNum(chunkData.getPartNum() + 1);
-            chunkData.getBufferStream().reset();
-            chunkData.setCurrentPartResourceNum(0);
+
+            if (chunkData.getPageNum() <= chunkData.getLastPageNum()) {
+                chunkData.setPartNum(1);
+                chunkData.setUploadId(null);
+                chunkData.setCurrentUploadResourceNum(0);
+                chunkData.setCurrentUploadSize(0);
+                chunkData.setFinishCurrentUpload(false);
+                chunkData.getCosDataPacks().clear();
+                chunkData.setUploadCount(chunkData.getUploadCount() + 1);
+            }
         }
     }
 
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportJobListener.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportJobListener.java
index a56391879a2..8228c98ab4b 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportJobListener.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportJobListener.java
@@ -18,11 +18,13 @@
 import javax.batch.runtime.BatchRuntime;
 import javax.batch.runtime.JobExecution;
 import javax.batch.runtime.context.JobContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.fhir.bulkcommon.Constants;
 import com.ibm.fhir.bulkexport.common.CheckPointUserData;
 
+@Dependent
 public class ExportJobListener implements JobListener {
     private static final Logger logger = Logger.getLogger(ExportJobListener.class.getName());
 
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java
index bfa75222d0b..02e2db1abe8 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionAnalyzer.java
@@ -13,10 +13,12 @@
 import javax.batch.api.partition.PartitionAnalyzer;
 import javax.batch.runtime.BatchStatus;
 import javax.batch.runtime.context.JobContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.fhir.bulkexport.common.CheckPointUserData;
 
+@Dependent
 public class ExportPartitionAnalyzer implements PartitionAnalyzer {
     @Inject
     JobContext jobContext;
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java
index de194124dc0..a50a1fab2ca 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/ExportPartitionCollector.java
@@ -11,11 +11,13 @@
 import javax.batch.api.partition.PartitionCollector;
 import javax.batch.runtime.BatchStatus;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.fhir.bulkexport.common.CheckPointUserData;
 import com.ibm.fhir.bulkexport.common.TransientUserData;
 
+@Dependent
 public class ExportPartitionCollector implements PartitionCollector {
     @Inject
     StepContext stepCtx;
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/SystemExportPartitionMapper.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/SystemExportPartitionMapper.java
index 12aa4f24f3c..e74f6979581 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/SystemExportPartitionMapper.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/system/SystemExportPartitionMapper.java
@@ -15,11 +15,13 @@
 import javax.batch.api.partition.PartitionMapper;
 import javax.batch.api.partition.PartitionPlan;
 import javax.batch.api.partition.PartitionPlanImpl;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.fhir.bulkcommon.Constants;
 import com.ibm.fhir.model.util.ModelSupport;
 
+@Dependent
 public class SystemExportPartitionMapper implements PartitionMapper {
 
     /**
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkReader.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkReader.java
index 854c746f324..f3cd5cd39a4 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkReader.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkReader.java
@@ -16,6 +16,7 @@
 import javax.batch.api.chunk.AbstractItemReader;
 import javax.batch.runtime.BatchStatus;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.cloud.objectstorage.services.s3.AmazonS3;
@@ -27,6 +28,7 @@
  * Bulk import Chunk implementation - the Reader.
  *
  */
+@Dependent
 public class ChunkReader extends AbstractItemReader {
     private static final Logger logger = Logger.getLogger(ChunkReader.class.getName());
     private AmazonS3 cosClient = null;
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkWriter.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkWriter.java
index 1e91086f5f4..6e28021d03e 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkWriter.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ChunkWriter.java
@@ -16,6 +16,7 @@
 import javax.batch.api.BatchProperty;
 import javax.batch.api.chunk.AbstractItemWriter;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.cloud.objectstorage.services.s3.AmazonS3;
@@ -39,6 +40,7 @@
  * Bulk import Chunk implementation - the Writer.
  *
  */
+@Dependent
 public class ChunkWriter extends AbstractItemWriter {
     private static final Logger logger = Logger.getLogger(ChunkWriter.class.getName());
     AmazonS3 cosClient = null;
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportJobListener.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportJobListener.java
index 6780efb4f74..c5a30044681 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportJobListener.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportJobListener.java
@@ -20,6 +20,7 @@
 import javax.batch.runtime.BatchRuntime;
 import javax.batch.runtime.JobExecution;
 import javax.batch.runtime.context.JobContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 import javax.json.JsonArray;
 import javax.json.JsonObject;
@@ -28,6 +29,7 @@
 import com.ibm.fhir.bulkcommon.BulkDataUtils;
 import com.ibm.fhir.bulkcommon.Constants;
 
+@Dependent
 public class ImportJobListener implements JobListener {
     private static final Logger logger = Logger.getLogger(ImportJobListener.class.getName());
 
@@ -35,7 +37,7 @@ public class ImportJobListener implements JobListener {
 
     @Inject
     JobContext jobContext;
-    
+
     @Inject
     @BatchProperty(name = Constants.IMPORT_FHIR_DATASOURCES)
     String dataSourcesInfo;
@@ -93,7 +95,7 @@ public void afterJob() {
         }
 
         jobContext.setExitStatus(Arrays.toString(resultInExitStatus));
-        
+
         for (ImportCheckPointData partitionSummary : partitionSummaries) {
             ImportCheckPointData partitionSummaryInMap = importedResourceTypeSummaries.get(partitionSummary.getImportPartitionResourceType());
             if (partitionSummaryInMap == null) {
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionAnalyzer.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionAnalyzer.java
index 8c17b66266a..35d993fc130 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionAnalyzer.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionAnalyzer.java
@@ -16,10 +16,12 @@
 import javax.batch.api.partition.PartitionAnalyzer;
 import javax.batch.runtime.BatchStatus;
 import javax.batch.runtime.context.JobContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.fhir.bulkcommon.Constants;
 
+@Dependent
 public class ImportPartitionAnalyzer implements PartitionAnalyzer {
     private static final Logger logger = Logger.getLogger(ImportPartitionAnalyzer.class.getName());
     @Inject
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionCollector.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionCollector.java
index 039eda0b58b..22354b6438f 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionCollector.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionCollector.java
@@ -15,12 +15,14 @@
 import javax.batch.api.partition.PartitionCollector;
 import javax.batch.runtime.BatchStatus;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 
 import com.ibm.cloud.objectstorage.services.s3.AmazonS3;
 import com.ibm.fhir.bulkcommon.BulkDataUtils;
 import com.ibm.fhir.bulkcommon.Constants;
 
+@Dependent
 public class ImportPartitionCollector implements PartitionCollector {
     private static final Logger logger = Logger.getLogger(ImportPartitionCollector.class.getName());
     AmazonS3 cosClient = null;
@@ -71,7 +73,7 @@ public class ImportPartitionCollector implements PartitionCollector {
 
     public ImportPartitionCollector() {
         // The injected properties are not available at class construction time
-        // These values are lazy injected BEFORE calling 'collectPartitionData'. 
+        // These values are lazy injected BEFORE calling 'collectPartitionData'.
     }
 
     @Override
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionMapper.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionMapper.java
index a43104733a3..70447a4d3de 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionMapper.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkimport/ImportPartitionMapper.java
@@ -16,6 +16,7 @@
 import javax.batch.api.partition.PartitionPlan;
 import javax.batch.api.partition.PartitionPlanImpl;
 import javax.batch.runtime.context.StepContext;
+import javax.enterprise.context.Dependent;
 import javax.inject.Inject;
 import javax.json.JsonArray;
 import javax.json.JsonObject;
@@ -28,6 +29,7 @@
 import com.ibm.fhir.bulkcommon.BulkDataUtils;
 import com.ibm.fhir.bulkcommon.Constants;
 
+@Dependent
 public class ImportPartitionMapper implements PartitionMapper {
     private static final Logger logger = Logger.getLogger(ImportPartitionMapper.class.getName());
     private AmazonS3 cosClient = null;
diff --git a/fhir-config/src/main/java/com/ibm/fhir/config/FHIRConfiguration.java b/fhir-config/src/main/java/com/ibm/fhir/config/FHIRConfiguration.java
index 2a6a9231324..e4b6a88fe49 100644
--- a/fhir-config/src/main/java/com/ibm/fhir/config/FHIRConfiguration.java
+++ b/fhir-config/src/main/java/com/ibm/fhir/config/FHIRConfiguration.java
@@ -100,6 +100,8 @@ public class FHIRConfiguration {
     public static final String PROPERTY_BULKDATA_BATCHJOB_VALID_URLS_DISABLED = "fhirServer/bulkdata/validBaseUrlsDisabled";
     public static final String PROPERTY_BULKDATA_BATCHJOB_MAX_INPUT_PER_TENANT =
             "fhirServer/bulkdata/maxInputPerRequest";
+    public static final String PROPERTY_BULKDATA_BATCHJOB_COSFILEMAXSIZE = "fhirServer/bulkdata/cosFileMaxSize";
+    public static final String PROPERTY_BULKDATA_BATCHJOB_COSFILEMAXRESOURCES = "fhirServer/bulkdata/cosFileMaxResources";
 
     // Custom header names
     public static final String DEFAULT_TENANT_ID_HEADER_NAME = "X-FHIR-TENANT-ID";

From 066c797643da029380943400aae495f9296c15da Mon Sep 17 00:00:00 2001
From: Albert Wang 
Date: Tue, 26 May 2020 09:18:39 -0400
Subject: [PATCH 5/6] issue #1150 add cdi-api for fixing CWWKY0041

Signed-off-by: Albert Wang 
---
 fhir-bulkimportexport-webapp/pom.xml | 4 ++++
 fhir-parent/pom.xml                  | 6 ++++++
 2 files changed, 10 insertions(+)

diff --git a/fhir-bulkimportexport-webapp/pom.xml b/fhir-bulkimportexport-webapp/pom.xml
index 3cf7ab78af8..17a0caafe1a 100644
--- a/fhir-bulkimportexport-webapp/pom.xml
+++ b/fhir-bulkimportexport-webapp/pom.xml
@@ -33,6 +33,10 @@
             org.glassfish
             jakarta.json
         
+        
+            javax.enterprise
+            cdi-api
+        
         
             ${project.groupId}
             fhir-persistence
diff --git a/fhir-parent/pom.xml b/fhir-parent/pom.xml
index f3e07be67e7..cee42177a2d 100644
--- a/fhir-parent/pom.xml
+++ b/fhir-parent/pom.xml
@@ -297,6 +297,12 @@
                 jakarta.inject-api
                 1.0
             
+            
+                javax.enterprise
+                cdi-api
+                2.0.SP1
+                provided
+            
             
                 com.ibm.cos
                 ibm-cos-java-sdk

From 356ab6a372fe748746c03937f97d61a93a9a5bba Mon Sep 17 00:00:00 2001
From: Albert Wang 
Date: Tue, 26 May 2020 10:15:56 -0400
Subject: [PATCH 6/6] issue #779 adding fhir-server-config.json and updates per
 review

Signed-off-by: Albert Wang 
---
 docs/src/pages/guides/FHIRServerUsersGuide.md               | 4 ++--
 fhir-bulkimportexport-webapp/pom.xml                        | 4 ++--
 .../src/main/java/com/ibm/fhir/bulkcommon/Constants.java    | 2 +-
 .../com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java | 2 +-
 fhir-parent/pom.xml                                         | 6 +++---
 .../liberty-config/config/default/fhir-server-config.json   | 2 ++
 6 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/docs/src/pages/guides/FHIRServerUsersGuide.md b/docs/src/pages/guides/FHIRServerUsersGuide.md
index 45b8cc84d84..0d144544ee2 100644
--- a/docs/src/pages/guides/FHIRServerUsersGuide.md
+++ b/docs/src/pages/guides/FHIRServerUsersGuide.md
@@ -1486,8 +1486,8 @@ This section contains reference information about each of the configuration prop
 |`fhirServer/bulkdata/validBaseUrls`|string|The list of supported urls which are approved for the fhir server to access|
 |`fhirServer/bulkdata/validBaseUrlsDisabled`|boolean|Disables the URL checking feature|
 |`fhirServer/bulkdata/maxInputPerRequest`|integer|The maximum inputs per bulk import|
-|`fhirServer/bulkdata/cosFileMaxResources`|int|Max FHIR resources per COS file, "-1" means no limit |
-|`fhirServer/bulkdata/cosFileMaxSize`|int|Max COS file size in bytes, "-1" means no limit |
+|`fhirServer/bulkdata/cosFileMaxResources`|int|The maximum number of FHIR resources per COS file, "-1" means no limit, the default value is 500000 |
+|`fhirServer/bulkdata/cosFileMaxSize`|int|The maximum COS file size in bytes, "-1" means no limit, the default value is 209715200 (200M) |
 
 
 ### 5.1.2 Default property values
diff --git a/fhir-bulkimportexport-webapp/pom.xml b/fhir-bulkimportexport-webapp/pom.xml
index 17a0caafe1a..28d87e63776 100644
--- a/fhir-bulkimportexport-webapp/pom.xml
+++ b/fhir-bulkimportexport-webapp/pom.xml
@@ -34,8 +34,8 @@
             jakarta.json
         
         
-            javax.enterprise
-            cdi-api
+            jakarta.enterprise
+            jakarta.enterprise.cdi-api
         
         
             ${project.groupId}
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java
index 4d614d440a6..b59dd3a56d7 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkcommon/Constants.java
@@ -40,7 +40,7 @@ public class Constants {
     public static final String COS_LOCATION = "cos.location";
     public static final String COS_BUCKET_NAME = "cos.bucket.name";
     public static final String COS_IS_IBM_CREDENTIAL = "cos.credential.ibm";
-    public static final String COS_BUCKET_FILE_MAX_SZIE = "cos.bucket.filemaxsize";
+    public static final String COS_BUCKET_FILE_MAX_SIZE = "cos.bucket.filemaxsize";
     public static final String COS_BUCKET_FILE_MAX_RESOURCES = "cos.bucket.filemaxresources";
     // COS bucket for import OperationOutcomes
     public static final String COS_OPERATIONOUTCOMES_BUCKET_NAME = "cos.operationoutcomes.bucket.name";
diff --git a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java
index a481bebda4f..37f424bd98e 100644
--- a/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java
+++ b/fhir-bulkimportexport-webapp/src/main/java/com/ibm/fhir/bulkexport/common/CheckPointAlgorithm.java
@@ -43,7 +43,7 @@ public class CheckPointAlgorithm implements CheckpointAlgorithm {
      * The file size limit when exporting to multiple COS files.
      */
     @Inject
-    @BatchProperty(name = Constants.COS_BUCKET_FILE_MAX_SZIE)
+    @BatchProperty(name = Constants.COS_BUCKET_FILE_MAX_SIZE)
     String cosBucketFileMaxSize;
 
     /**
diff --git a/fhir-parent/pom.xml b/fhir-parent/pom.xml
index cee42177a2d..00dae15c0ea 100644
--- a/fhir-parent/pom.xml
+++ b/fhir-parent/pom.xml
@@ -298,9 +298,9 @@
                 1.0
             
             
-                javax.enterprise
-                cdi-api
-                2.0.SP1
+                jakarta.enterprise
+                jakarta.enterprise.cdi-api
+                2.0.2
                 provided
             
             
diff --git a/fhir-server/liberty-config/config/default/fhir-server-config.json b/fhir-server/liberty-config/config/default/fhir-server-config.json
index fb78925d610..7367a19e080 100644
--- a/fhir-server/liberty-config/config/default/fhir-server-config.json
+++ b/fhir-server/liberty-config/config/default/fhir-server-config.json
@@ -142,6 +142,8 @@
             "batch-truststore": "resources/security/fhirTrustStore.p12",
             "batch-truststore-password": "change-password",
             "isExportPublic": true,
+            "cosFileMaxResources": 500000,
+            "cosFileMaxSize": 209715200,
             "validBaseUrls": []
         }
     }