diff --git a/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/citus/CitusAdapter.java b/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/citus/CitusAdapter.java index bda0eac3005..82257514882 100644 --- a/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/citus/CitusAdapter.java +++ b/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/citus/CitusAdapter.java @@ -200,10 +200,12 @@ public void distributeFunction(String schemaName, String functionName, int distr throw new IllegalArgumentException("invalid distributeByParamNumber value: " + distributeByParamNumber); } // Need to get the signature text first in order to build the create_distribution_function - // statement + // statement. Note the cast to ::regprocedure will return a string like this: + // "fhirdata.add_logical_resource_ident(integer,character varying)" + // which can be passed in to the Citus create_distributed_function procedure final String objectName = DataDefinitionUtil.getQualifiedName(schemaName, functionName); final String SELECT = - "SELECT p.oid::regproc || '(' || pg_get_function_identity_arguments(p.oid) || ')' " + + "SELECT p.oid::regprocedure " + " FROM pg_catalog.pg_proc p " + " WHERE p.oid::regproc::text = LOWER(?)"; @@ -216,14 +218,23 @@ public void distributeFunction(String schemaName, String functionName, int distr if (rs.next()) { functionSig = rs.getString(1); } + + if (rs.next()) { + final String fn = DataDefinitionUtil.getQualifiedName(schemaName, functionName); + logger.severe("Overloaded function signature: " + fn + " " + functionSig); + functionSig = rs.getString(1); + logger.severe("Overloaded function signature: " + fn + " " + functionSig); + throw new IllegalStateException("Overloading not supported for function '" + fn + "'"); + } } if (functionSig != null) { - final String DISTRIBUTE = "SELECT create_distributed_function(?, ?)"; + logger.info("Distributing function: " + functionSig); + final String DISTRIBUTE = "SELECT create_distributed_function(?::regprocedure, ?::text)"; try (PreparedStatement ps = c.prepareStatement(DISTRIBUTE)) { ps.setString(1, functionSig); ps.setString(2, "$" + distributeByParamNumber); - ps.executeQuery(DISTRIBUTE); + ps.execute(); } } else { logger.warning("No matching function found for '" + objectName + "'"); diff --git a/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/common/PreparedStatementHelper.java b/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/common/PreparedStatementHelper.java index 7be7e257cf1..d281b44af66 100644 --- a/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/common/PreparedStatementHelper.java +++ b/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/common/PreparedStatementHelper.java @@ -6,6 +6,8 @@ package com.ibm.fhir.database.utils.common; +import java.io.InputStream; +import java.sql.CallableStatement; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; @@ -101,6 +103,23 @@ public PreparedStatementHelper setString(String value) throws SQLException { return this; } + /** + * Set the (possibly null) InputStream value at the current position + * and increment the position by 1 + * @param value + * @return this instance + * @throws SQLException + */ + public PreparedStatementHelper setBinaryStream(InputStream value) throws SQLException { + if (value != null) { + ps.setBinaryStream(index, value); + } else { + ps.setNull(index, Types.BINARY); + } + index++; + return this; + } + /** * Set the (possibly null) int value at the current position * and increment the position by 1 @@ -118,6 +137,23 @@ public PreparedStatementHelper setTimestamp(Timestamp value) throws SQLException return this; } + /** + * Register an OUT parameter, assuming the delegate is a CallableStatement + * @param parameterType from {@link java.sql.Types} + * @return the parameter index of the OUT parameter + * @throws SQLException + */ + public int registerOutParameter(int parameterType) throws SQLException { + int idx = index++; + if (ps instanceof CallableStatement) { + CallableStatement cs = (CallableStatement)ps; + cs.registerOutParameter(idx, parameterType); + } else { + throw new IllegalStateException("Delegate is not a CallableStatement"); + } + return idx; + } + /** * Add a new batch entry based on the current state of the {@link PreparedStatement}. * Note that we don't return this on purpose...because addBatch should be last in diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java index fc2c352abe4..76be6e2c4fb 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java @@ -6,22 +6,39 @@ package com.ibm.fhir.persistence.jdbc.citus; +import java.sql.CallableStatement; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.Types; import java.util.List; +import java.util.Objects; +import java.util.UUID; import java.util.logging.Level; import java.util.logging.Logger; import javax.transaction.TransactionSynchronizationRegistry; +import com.ibm.fhir.database.utils.common.PreparedStatementHelper; +import com.ibm.fhir.persistence.InteractionStatus; import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; +import com.ibm.fhir.persistence.exception.FHIRPersistenceException; +import com.ibm.fhir.persistence.exception.FHIRPersistenceVersionIdMismatchException; +import com.ibm.fhir.persistence.index.FHIRRemoteIndexService; import com.ibm.fhir.persistence.jdbc.FHIRPersistenceJDBCCache; import com.ibm.fhir.persistence.jdbc.connection.FHIRDbFlavor; +import com.ibm.fhir.persistence.jdbc.dao.api.FHIRDAOConstants; import com.ibm.fhir.persistence.jdbc.dao.api.IResourceReferenceDAO; +import com.ibm.fhir.persistence.jdbc.dao.api.JDBCIdentityCache; +import com.ibm.fhir.persistence.jdbc.dao.api.ParameterDAO; +import com.ibm.fhir.persistence.jdbc.dao.impl.JDBCIdentityCacheImpl; +import com.ibm.fhir.persistence.jdbc.dao.impl.ParameterVisitorBatchDAO; +import com.ibm.fhir.persistence.jdbc.dto.ExtractedParameterValue; import com.ibm.fhir.persistence.jdbc.dto.Resource; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; +import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceFKVException; import com.ibm.fhir.persistence.jdbc.impl.ParameterTransactionDataImpl; import com.ibm.fhir.persistence.jdbc.postgres.PostgresResourceDAO; @@ -33,6 +50,14 @@ public class CitusResourceDAO extends PostgresResourceDAO { private static final String CLASSNAME = CitusResourceDAO.class.getName(); private static final Logger log = Logger.getLogger(CLASSNAME); + // @formatter:off + // 0 1 + // 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 + // @formatter:on + // Don't forget that we must account for IN and OUT parameters. + private static final String SQL_INSERT_WITH_PARAMETERS = "{ CALL %s.add_any_resource(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) }"; + private static final String SQL_LOGICAL_RESOURCE_IDENT = "{ CALL %s.add_logical_resource_ident(?,?,?) }"; + // Read the current version of the resource (even if the resource has been deleted) private static final String SQL_READ = "" + "SELECT R.RESOURCE_ID, R.LOGICAL_RESOURCE_ID, R.VERSION_ID, R.LAST_UPDATED, R.IS_DELETED, R.DATA, LR.LOGICAL_ID, R.RESOURCE_PAYLOAD_KEY " @@ -171,4 +196,129 @@ public Resource versionRead(String logicalId, String resourceType, int versionId } + @Override + public Resource insert(Resource resource, List parameters, String parameterHashB64, + ParameterDAO parameterDao, Integer ifNoneMatch) + throws FHIRPersistenceException { + final String METHODNAME = "insert(Resource, List"; + log.entering(CLASSNAME, METHODNAME); + + final Connection connection = getConnection(); // do not close + long dbCallStartTime; + double dbCallDuration; + + try { + // Just make sure this resource type is known to the database before we + // hit the procedure + Integer resourceTypeId = getResourceTypeId(resource.getResourceType()); + Objects.requireNonNull(resourceTypeId); + + // For Citus, we first make a call to establish the logical_resource_ident record + long logicalResourceId = createOrLockLogicalResourceIdent(resourceTypeId, resource.getLogicalId()); + + final String stmtString = String.format(SQL_INSERT_WITH_PARAMETERS, getSchemaName()); + try (CallableStatement stmt = connection.prepareCall(stmtString)) { + PreparedStatementHelper psh = new PreparedStatementHelper(stmt); + + psh.setLong(logicalResourceId); + psh.setInt(resourceTypeId); + psh.setString(resource.getResourceType()); + psh.setString(resource.getLogicalId()); + psh.setBinaryStream(resource.getDataStream() != null ? resource.getDataStream().inputStream() : null); + psh.setTimestamp(resource.getLastUpdated()); + psh.setString(resource.isDeleted() ? "Y": "N"); + psh.setString(UUID.randomUUID().toString()); + psh.setInt(resource.getVersionId()); + psh.setString(parameterHashB64); + psh.setInt(ifNoneMatch); + psh.setString(resource.getResourcePayloadKey()); + + final int oldParameterHashIndex = psh.registerOutParameter(Types.VARCHAR); + final int interactionStatusIndex = psh.registerOutParameter(Types.INTEGER); + final int ifNoneMatchVersionIndex = psh.registerOutParameter(Types.INTEGER); + + dbCallStartTime = System.nanoTime(); + stmt.execute(); + dbCallDuration = (System.nanoTime()-dbCallStartTime)/1e6; + + resource.setLogicalResourceId(logicalResourceId); + if (stmt.getInt(interactionStatusIndex) == 1) { // interaction status + // no change, so skip parameter updates + resource.setInteractionStatus(InteractionStatus.IF_NONE_MATCH_EXISTED); + resource.setIfNoneMatchVersion(stmt.getInt(ifNoneMatchVersionIndex)); // current version + } else { + resource.setInteractionStatus(InteractionStatus.MODIFIED); + + // Parameter time + // To keep things simple for the postgresql use-case, we just use a visitor to + // handle inserts of parameters directly in the resource parameter tables. + // Note we don't get any parameters for the resource soft-delete operation + // Bypass the parameter insert here if we have the remoteIndexService configured + FHIRRemoteIndexService remoteIndexService = FHIRRemoteIndexService.getServiceInstance(); + final String currentParameterHash = stmt.getString(oldParameterHashIndex); + if (remoteIndexService == null + && parameters != null && (parameterHashB64 == null || parameterHashB64.isEmpty() + || !parameterHashB64.equals(currentParameterHash))) { + // postgresql doesn't support partitioned multi-tenancy, so we disable it on the DAO: + JDBCIdentityCache identityCache = new JDBCIdentityCacheImpl(getCache(), this, parameterDao, getResourceReferenceDAO()); + try (ParameterVisitorBatchDAO pvd = new ParameterVisitorBatchDAO(connection, null, resource.getResourceType(), false, resource.getLogicalResourceId(), 100, + identityCache, getResourceReferenceDAO(), getTransactionData())) { + for (ExtractedParameterValue p: parameters) { + p.accept(pvd); + } + } + } + } + if (log.isLoggable(Level.FINE)) { + log.fine("Successfully inserted Resource. logicalResourceId=" + resource.getLogicalResourceId() + " executionTime=" + dbCallDuration + "ms"); + } + } + } catch(FHIRPersistenceDBConnectException | FHIRPersistenceDataAccessException e) { + throw e; + } catch(SQLIntegrityConstraintViolationException e) { + FHIRPersistenceFKVException fx = new FHIRPersistenceFKVException("Encountered FK violation while inserting Resource."); + throw severe(log, fx, e); + } catch(SQLException e) { + if (FHIRDAOConstants.SQLSTATE_WRONG_VERSION.equals(e.getSQLState())) { + // this is just a concurrency update, so there's no need to log the SQLException here + throw new FHIRPersistenceVersionIdMismatchException("Encountered version id mismatch while inserting Resource"); + } else { + FHIRPersistenceDataAccessException fx = new FHIRPersistenceDataAccessException("SQLException encountered while inserting Resource."); + throw severe(log, fx, e); + } + } catch(Throwable e) { + FHIRPersistenceDataAccessException fx = new FHIRPersistenceDataAccessException("Failure inserting Resource."); + throw severe(log, fx, e); + } finally { + log.exiting(CLASSNAME, METHODNAME); + } + + return resource; + } + + /** + * Call the ADD_LOGICAL_RESOURCE_IDENT procedure to create or lock (select for update) + * the logical_resource_ident record. For Citus we run this step first because this + * function is distributed by the logical_id parameter. + * @param resourceTypeId + * @param logicalId + * @return + * @throws SQLException + */ + protected long createOrLockLogicalResourceIdent(int resourceTypeId, String logicalId) throws SQLException { + long logicalResourceId; + + final String stmtString = String.format(SQL_LOGICAL_RESOURCE_IDENT, getSchemaName()); + try (CallableStatement cs = getConnection().prepareCall(stmtString)) { + PreparedStatementHelper psh = new PreparedStatementHelper(cs); + psh.setInt(resourceTypeId); + psh.setString(logicalId); + int idxLogicalResourceId = psh.registerOutParameter(Types.BIGINT); + cs.execute(); + logicalResourceId = cs.getLong(idxLogicalResourceId); + } + + // At this point the logical_resource_ident record will be locked for update + return logicalResourceId; + } } \ No newline at end of file diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceDAOImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceDAOImpl.java index 5833512dc9d..7645db9f9ad 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceDAOImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceDAOImpl.java @@ -279,7 +279,7 @@ protected Resource createDTO(ResultSet resultSet, boolean hasResourceTypeId) thr if (payloadData != null) { resource.setDataStream(new InputOutputByteStream(payloadData, payloadData.length)); } - resource.setId(resultSet.getLong(IDX_RESOURCE_ID)); + resource.setResourceId(resultSet.getLong(IDX_RESOURCE_ID)); resource.setLogicalResourceId(resultSet.getLong(IDX_LOGICAL_RESOURCE_ID)); resource.setLastUpdated(resultSet.getTimestamp(IDX_LAST_UPDATED, CalendarHelper.getCalendarForUTC())); resource.setLogicalId(resultSet.getString(IDX_LOGICAL_ID)); @@ -541,7 +541,7 @@ public Resource insert(Resource resource, List paramete long latestTime = System.nanoTime(); double dbCallDuration = (latestTime-dbCallStartTime)/1e6; - resource.setId(stmt.getLong(10)); + resource.setLogicalResourceId(stmt.getLong(10)); final long versionedResourceRowId = stmt.getLong(11); final String currentHash = stmt.getString(12); final int interactionStatus = stmt.getInt(13); @@ -580,7 +580,7 @@ public Resource insert(Resource resource, List paramete || !parameterHashB64.equals(currentHash))) { JDBCIdentityCache identityCache = new JDBCIdentityCacheImpl(cache, this, parameterDao, getResourceReferenceDAO()); try (ParameterVisitorBatchDAO pvd = new ParameterVisitorBatchDAO(connection, "FHIR_ADMIN", resource.getResourceType(), true, - resource.getId(), 100, identityCache, resourceReferenceDAO, this.transactionData)) { + resource.getLogicalResourceId(), 100, identityCache, resourceReferenceDAO, this.transactionData)) { for (ExtractedParameterValue p: parameters) { p.accept(pvd); } @@ -591,7 +591,7 @@ public Resource insert(Resource resource, List paramete latestTime = System.nanoTime(); double totalDuration = (latestTime - dbCallStartTime) / 1e6; double paramInsertDuration = (latestTime-paramInsertStartTime)/1e6; - log.fine("Successfully inserted Resource. id=" + resource.getId() + " total=" + totalDuration + "ms, proc=" + dbCallDuration + "ms, param=" + paramInsertDuration + "ms"); + log.fine("Successfully inserted Resource. logicalResourceId=" + resource.getLogicalResourceId() + " total=" + totalDuration + "ms, proc=" + dbCallDuration + "ms, param=" + paramInsertDuration + "ms"); } } } catch (FHIRPersistenceDBConnectException | diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceDAO.java index 9e13a2f1714..b157878c475 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceDAO.java @@ -108,7 +108,7 @@ public Resource insert(Resource resource, List paramete AtomicInteger outInteractionStatus = new AtomicInteger(); AtomicInteger outIfNoneMatchVersion = new AtomicInteger(); - long resourceId = this.storeResource(resource.getResourceType(), + long logicalResourceId = this.storeResource(resource.getResourceType(), parameters, resource.getLogicalId(), resource.getDataStream() != null ? resource.getDataStream().inputStream() : null, @@ -132,11 +132,11 @@ public Resource insert(Resource resource, List paramete resource.setIfNoneMatchVersion(outIfNoneMatchVersion.get()); } else { resource.setInteractionStatus(InteractionStatus.MODIFIED); - resource.setId(resourceId); + resource.setLogicalResourceId(logicalResourceId); } if (logger.isLoggable(Level.FINE)) { - logger.fine("Successfully inserted Resource. id=" + resource.getId() + " executionTime=" + dbCallDuration + "ms"); + logger.fine("Successfully inserted Resource. logicalResourceId=" + resource.getLogicalResourceId() + " executionTime=" + dbCallDuration + "ms"); } } catch(FHIRPersistenceDBConnectException | FHIRPersistenceDataAccessException e) { throw e; @@ -199,7 +199,7 @@ public Resource insert(Resource resource, List paramete * @param resourcePayloadKey * @param outInteractionStatus * @param outIfNoneMatchVersion - * @return the resource_id for the entry we created + * @return the logical_resource_id for the entry we created * @throws Exception */ public long storeResource(String tablePrefix, List parameters, @@ -299,7 +299,7 @@ public long storeResource(String tablePrefix, List para // insert the logical_resource_ident record (which we now do our locking on) final String INS_IDENT = "INSERT INTO logical_resource_ident (resource_type_id, logical_id, logical_resource_id) VALUES (?, ?, ?)"; if (logger.isLoggable(Level.FINEST)) { - logger.finest("Creating new logical_resource_ident row for: " + v_resource_type + "/" + p_logical_id); + logger.finest("Creating new logical_resource_ident row for: " + v_resource_type + "/" + p_logical_id + " => logical_resource_id=" + v_logical_resource_id); } try (PreparedStatement stmt = conn.prepareStatement(INS_IDENT)) { @@ -577,7 +577,7 @@ identityCache, getResourceReferenceDAO(), getTransactionData())) { } logger.exiting(CLASSNAME, METHODNAME); - return v_resource_id; + return v_logical_resource_id; } /** diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dto/Resource.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dto/Resource.java index 7cbd092be82..6602ad59dbd 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dto/Resource.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dto/Resource.java @@ -1,5 +1,5 @@ /* - * (C) Copyright IBM Corp. 2017,2021 + * (C) Copyright IBM Corp. 2017, 2022 * * SPDX-License-Identifier: Apache-2.0 */ @@ -17,14 +17,14 @@ public class Resource { /** - * This is the _RESOURCES.RESOURCE_ID column + * This is the _RESOURCES.RESOURCE_ID column. It is unique for a specific version + * of a resource. It is not used during create/update interactions. */ - private long id; + private long resourceId; /** - * This is the _LOGICAL_RESOURCES.LOGICAL_RESOURCE_ID column. It is only - * set when this DTO is used to read table data. It is not set when the DTO is - * used to insert/update. + * This is the _LOGICAL_RESOURCES.LOGICAL_RESOURCE_ID column. It is used during + * create/update interactions as well as read interactions */ private long logicalResourceId; @@ -114,18 +114,34 @@ public Integer getIfNoneMatchVersion() { return this.ifNoneMatchVersion; } - public long getId() { - return id; + /** + * Getter for the database xx_resources.resource_id value + * @return + */ + public long getResourceId() { + return resourceId; } - public void setId(long id) { - this.id = id; + /** + * Setter for the database xx_resources.resource_id value + * @param id + */ + public void setResourceId(long id) { + this.resourceId = id; } + /** + * Getter for the logical_resources.logical_resource_id value + * @return + */ public long getLogicalResourceId() { return logicalResourceId; } - + + /** + * Setter for the logical_resources.logical_resource_id value + * @param logicalResourceId + */ public void setLogicalResourceId(long logicalResourceId) { this.logicalResourceId = logicalResourceId; } @@ -156,7 +172,7 @@ public void setDeleted(boolean deleted) { @Override public String toString() { - return "Resource [id=" + id + ", logicalResourceId=" + logicalResourceId + ", logicalId=" + logicalId + + return "Resource [id=" + resourceId + ", logicalResourceId=" + logicalResourceId + ", logicalId=" + logicalId + ", versionId=" + versionId + ", resourceType=" + resourceType + ", lastUpdated=" + lastUpdated + ", deleted=" + deleted + "]"; } diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java index 80be6df1200..c3da80886fe 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java @@ -421,12 +421,12 @@ public SingleResourceResult create(FHIRPersistenceContex ExtractedSearchParameters searchParameters = this.extractSearchParameters(updatedResource, resourceDTO); resourceDao.insert(resourceDTO, searchParameters.getParameters(), searchParameters.getParameterHashB64(), parameterDao, context.getIfNoneMatch()); if (log.isLoggable(Level.FINE)) { - log.fine("Persisted FHIR Resource '" + resourceDTO.getResourceType() + "/" + resourceDTO.getLogicalId() + "' id=" + resourceDTO.getId() + log.fine("Persisted FHIR Resource '" + resourceDTO.getResourceType() + "/" + resourceDTO.getLogicalId() + "' logicalResourceId=" + resourceDTO.getLogicalResourceId() + ", version=" + resourceDTO.getVersionId()); } if (resourceDTO.getInteractionStatus() == InteractionStatus.MODIFIED) { - sendParametersToRemoteIndexService(resourceDTO.getResourceType(), resourceDTO.getLogicalId(), resourceDTO.getId(), + sendParametersToRemoteIndexService(resourceDTO.getResourceType(), resourceDTO.getLogicalId(), resourceDTO.getLogicalResourceId(), resourceDTO.getVersionId(), resourceDTO.getLastUpdated().toInstant(), context.getRequestShard(), searchParameters); } SingleResourceResult.Builder resultBuilder = new SingleResourceResult.Builder() @@ -674,17 +674,17 @@ public SingleResourceResult update(FHIRPersistenceContex if (log.isLoggable(Level.FINE)) { if (resourceDTO.getInteractionStatus() == InteractionStatus.IF_NONE_MATCH_EXISTED) { - log.fine("If-None-Match: Existing FHIR Resource '" + resourceDTO.getResourceType() + "/" + resourceDTO.getLogicalId() + "' id=" + resourceDTO.getId() + log.fine("If-None-Match: Existing FHIR Resource '" + resourceDTO.getResourceType() + "/" + resourceDTO.getLogicalId() + "' logicalResourceId=" + resourceDTO.getLogicalResourceId() + ", version=" + resourceDTO.getVersionId()); } else { - log.fine("Persisted FHIR Resource '" + resourceDTO.getResourceType() + "/" + resourceDTO.getLogicalId() + "' id=" + resourceDTO.getId() + log.fine("Persisted FHIR Resource '" + resourceDTO.getResourceType() + "/" + resourceDTO.getLogicalId() + "' logicalResourceId=" + resourceDTO.getLogicalResourceId() + ", version=" + resourceDTO.getVersionId()); } } // If configured, send the extracted parameters to the remote indexing service if (resourceDTO.getInteractionStatus() == InteractionStatus.MODIFIED) { - sendParametersToRemoteIndexService(resourceDTO.getResourceType(), resourceDTO.getLogicalId(), resourceDTO.getId(), + sendParametersToRemoteIndexService(resourceDTO.getResourceType(), resourceDTO.getLogicalId(), resourceDTO.getLogicalResourceId(), resourceDTO.getVersionId(), resourceDTO.getLastUpdated().toInstant(), context.getRequestShard(), searchParameters); } @@ -886,7 +886,7 @@ private List newSearchForIncludeReso List allIncludeResources = new ArrayList<>(); // Used for de-duplication - Set allResourceIds = resourceDTOList.stream().map(r -> r.getId()).collect(Collectors.toSet()); + Set allResourceIds = resourceDTOList.stream().map(r -> r.getResourceId()).collect(Collectors.toSet()); // This is a map of iterations to query results. The query results is a map of // search resource type to returned logical resource IDs. The logical resource IDs @@ -909,7 +909,7 @@ private List newSearchForIncludeReso baseLogicalResourceIds, queryResultMap, resourceDao, 1, allResourceIds); // Add new ids to de-dup list - allResourceIds.addAll(includeResources.stream().map(r -> r.getId()).collect(Collectors.toSet())); + allResourceIds.addAll(includeResources.stream().map(r -> r.getResourceId()).collect(Collectors.toSet())); // Add resources to list allIncludeResources.addAll(includeResources); @@ -931,7 +931,7 @@ private List newSearchForIncludeReso baseLogicalResourceIds, queryResultMap, resourceDao, 1, allResourceIds); // Add new ids to de-dup list - allResourceIds.addAll(revincludeResources.stream().map(r -> r.getId()).collect(Collectors.toSet())); + allResourceIds.addAll(revincludeResources.stream().map(r -> r.getResourceId()).collect(Collectors.toSet())); // Add resources to list allIncludeResources.addAll(revincludeResources); @@ -975,7 +975,7 @@ private List newSearchForIncludeReso SearchConstants.INCLUDE, queryIds, queryResultMap, resourceDao, i+1, allResourceIds); // Add new ids to de-dup list - allResourceIds.addAll(includeResources.stream().map(r -> r.getId()).collect(Collectors.toSet())); + allResourceIds.addAll(includeResources.stream().map(r -> r.getResourceId()).collect(Collectors.toSet())); // Add resources to list allIncludeResources.addAll(includeResources); @@ -1000,7 +1000,7 @@ private List newSearchForIncludeReso SearchConstants.REVINCLUDE, queryIds, queryResultMap, resourceDao, i+1, allResourceIds); // Add new ids to de-dup list - allResourceIds.addAll(revincludeResources.stream().map(r -> r.getId()).collect(Collectors.toSet())); + allResourceIds.addAll(revincludeResources.stream().map(r -> r.getResourceId()).collect(Collectors.toSet())); // Add resources to list allIncludeResources.addAll(revincludeResources); @@ -1053,7 +1053,7 @@ private List runIncludeQuery(Class includeDTOs = - resourceDao.search(includeQuery).stream().filter(r -> !allResourceIds.contains(r.getId())).collect(Collectors.toList()); + resourceDao.search(includeQuery).stream().filter(r -> !allResourceIds.contains(r.getResourceId())).collect(Collectors.toList()); // Add query result to map. // The logical resource IDs are pulled from the returned DTOs and saved in a @@ -1146,7 +1146,7 @@ public void delete(FHIRPersistenceContext context, Class resourceDao.insert(resourceDTO, null, null, null, IF_NONE_MATCH_NULL); if (log.isLoggable(Level.FINE)) { - log.fine("Deleted FHIR Resource '" + resourceDTO.getResourceType() + "/" + resourceDTO.getLogicalId() + "' id=" + resourceDTO.getId() + log.fine("Deleted FHIR Resource '" + resourceDTO.getResourceType() + "/" + resourceDTO.getLogicalId() + "' logicalResourceId=" + resourceDTO.getLogicalResourceId() + ", version=" + resourceDTO.getVersionId()); } } catch(FHIRPersistenceException e) { @@ -1484,7 +1484,7 @@ protected List buildSortedResourceDT // Store each ResourceDTO in its proper position in the returned sorted list. for (com.ibm.fhir.persistence.jdbc.dto.Resource resourceDTO : resourceDTOList) { - sortIndex = idPositionMap.get(resourceDTO.getId()); + sortIndex = idPositionMap.get(resourceDTO.getResourceId()); sortedResourceDTOs[sortIndex] = resourceDTO; } diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceDAO.java index bf39a82b4d4..8c1fa2700e8 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceDAO.java @@ -107,7 +107,6 @@ public Resource insert(Resource resource, List paramete logger.entering(CLASSNAME, METHODNAME); final Connection connection = getConnection(); // do not close - CallableStatement stmt = null; String stmtString = null; Timestamp lastUpdated; long dbCallStartTime; @@ -126,70 +125,72 @@ public Resource insert(Resource resource, List paramete } else { stmtString = String.format(SQL_INSERT_WITH_PARAMETERS, getSchemaName()); } - stmt = connection.prepareCall(stmtString); - int arg = 1; - if (getFlavor().getSchemaType() == SchemaType.SHARDED) { - stmt.setShort(arg++, shardKey); - } - stmt.setString(arg++, resource.getResourceType()); - stmt.setString(arg++, resource.getLogicalId()); - - if (resource.getDataStream() != null) { - stmt.setBinaryStream(arg++, resource.getDataStream().inputStream()); - } else { - // payload was offloaded to another data store - stmt.setNull(arg++, Types.BINARY); - } - - lastUpdated = resource.getLastUpdated(); - stmt.setTimestamp(arg++, lastUpdated, CalendarHelper.getCalendarForUTC()); - stmt.setString(arg++, resource.isDeleted() ? "Y": "N"); - stmt.setString(arg++, UUID.randomUUID().toString()); - stmt.setInt(arg++, resource.getVersionId()); - stmt.setString(arg++, parameterHashB64); - setInt(stmt, arg++, ifNoneMatch); - setString(stmt, arg++, resource.getResourcePayloadKey()); - - // TODO use a helper function which can return the arg index to help clean up the syntax - stmt.registerOutParameter(arg, Types.BIGINT); final int resourceIdIndex = arg++; - stmt.registerOutParameter(arg, Types.VARCHAR); final int oldParameterHashIndex = arg++; - stmt.registerOutParameter(arg, Types.INTEGER); final int interactionStatusIndex = arg++; - stmt.registerOutParameter(arg, Types.INTEGER); final int ifNoneMatchVersionIndex = arg++; - - dbCallStartTime = System.nanoTime(); - stmt.execute(); - dbCallDuration = (System.nanoTime()-dbCallStartTime)/1e6; - resource.setId(stmt.getLong(resourceIdIndex)); - if (stmt.getInt(interactionStatusIndex) == 1) { // interaction status - // no change, so skip parameter updates - resource.setInteractionStatus(InteractionStatus.IF_NONE_MATCH_EXISTED); - resource.setIfNoneMatchVersion(stmt.getInt(ifNoneMatchVersionIndex)); // current version - } else { - resource.setInteractionStatus(InteractionStatus.MODIFIED); + try (CallableStatement stmt = connection.prepareCall(stmtString)) { + int arg = 1; + if (getFlavor().getSchemaType() == SchemaType.SHARDED) { + stmt.setShort(arg++, shardKey); + } + stmt.setString(arg++, resource.getResourceType()); + stmt.setString(arg++, resource.getLogicalId()); + + if (resource.getDataStream() != null) { + stmt.setBinaryStream(arg++, resource.getDataStream().inputStream()); + } else { + // payload was offloaded to another data store + stmt.setNull(arg++, Types.BINARY); + } + + lastUpdated = resource.getLastUpdated(); + stmt.setTimestamp(arg++, lastUpdated, CalendarHelper.getCalendarForUTC()); + stmt.setString(arg++, resource.isDeleted() ? "Y": "N"); + stmt.setString(arg++, UUID.randomUUID().toString()); + stmt.setInt(arg++, resource.getVersionId()); + stmt.setString(arg++, parameterHashB64); + setInt(stmt, arg++, ifNoneMatch); + setString(stmt, arg++, resource.getResourcePayloadKey()); + + // TODO use a helper function which can return the arg index to help clean up the syntax + stmt.registerOutParameter(arg, Types.BIGINT); final int logicalResourceIdIndex = arg++; + stmt.registerOutParameter(arg, Types.VARCHAR); final int oldParameterHashIndex = arg++; + stmt.registerOutParameter(arg, Types.INTEGER); final int interactionStatusIndex = arg++; + stmt.registerOutParameter(arg, Types.INTEGER); final int ifNoneMatchVersionIndex = arg++; + + dbCallStartTime = System.nanoTime(); + stmt.execute(); + dbCallDuration = (System.nanoTime()-dbCallStartTime)/1e6; - // Parameter time - // To keep things simple for the postgresql use-case, we just use a visitor to - // handle inserts of parameters directly in the resource parameter tables. - // Note we don't get any parameters for the resource soft-delete operation - // Bypass the parameter insert here if we have the remoteIndexService configured - FHIRRemoteIndexService remoteIndexService = FHIRRemoteIndexService.getServiceInstance(); - final String currentParameterHash = stmt.getString(oldParameterHashIndex); - if (remoteIndexService == null - && parameters != null && (parameterHashB64 == null || parameterHashB64.isEmpty() - || !parameterHashB64.equals(currentParameterHash))) { - // postgresql doesn't support partitioned multi-tenancy, so we disable it on the DAO: - JDBCIdentityCache identityCache = new JDBCIdentityCacheImpl(getCache(), this, parameterDao, getResourceReferenceDAO()); - try (ParameterVisitorBatchDAO pvd = new ParameterVisitorBatchDAO(connection, null, resource.getResourceType(), false, resource.getId(), 100, - identityCache, getResourceReferenceDAO(), getTransactionData())) { - for (ExtractedParameterValue p: parameters) { - p.accept(pvd); + resource.setLogicalResourceId(stmt.getLong(logicalResourceIdIndex)); + if (stmt.getInt(interactionStatusIndex) == 1) { // interaction status + // no change, so skip parameter updates + resource.setInteractionStatus(InteractionStatus.IF_NONE_MATCH_EXISTED); + resource.setIfNoneMatchVersion(stmt.getInt(ifNoneMatchVersionIndex)); // current version + } else { + resource.setInteractionStatus(InteractionStatus.MODIFIED); + + // Parameter time + // To keep things simple for the postgresql use-case, we just use a visitor to + // handle inserts of parameters directly in the resource parameter tables. + // Note we don't get any parameters for the resource soft-delete operation + // Bypass the parameter insert here if we have the remoteIndexService configured + FHIRRemoteIndexService remoteIndexService = FHIRRemoteIndexService.getServiceInstance(); + final String currentParameterHash = stmt.getString(oldParameterHashIndex); + if (remoteIndexService == null + && parameters != null && (parameterHashB64 == null || parameterHashB64.isEmpty() + || !parameterHashB64.equals(currentParameterHash))) { + // postgresql doesn't support partitioned multi-tenancy, so we disable it on the DAO: + JDBCIdentityCache identityCache = new JDBCIdentityCacheImpl(getCache(), this, parameterDao, getResourceReferenceDAO()); + try (ParameterVisitorBatchDAO pvd = new ParameterVisitorBatchDAO(connection, null, resource.getResourceType(), false, resource.getLogicalResourceId(), 100, + identityCache, getResourceReferenceDAO(), getTransactionData())) { + for (ExtractedParameterValue p: parameters) { + p.accept(pvd); + } } } } - } - if (logger.isLoggable(Level.FINE)) { - logger.fine("Successfully inserted Resource. id=" + resource.getId() + " executionTime=" + dbCallDuration + "ms"); + if (logger.isLoggable(Level.FINE)) { + logger.fine("Successfully inserted Resource. logicalResourceId=" + resource.getLogicalResourceId() + " executionTime=" + dbCallDuration + "ms"); + } } } catch(FHIRPersistenceDBConnectException | FHIRPersistenceDataAccessException e) { throw e; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceNoProcDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceNoProcDAO.java index 34fde9eeb23..bff777ad2ec 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceNoProcDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceNoProcDAO.java @@ -100,7 +100,7 @@ public Resource insert(Resource resource, List paramete AtomicInteger outInteractionStatus = new AtomicInteger(); AtomicInteger outIfNoneMatchVersion = new AtomicInteger(); - long resourceId = this.storeResource(resource.getResourceType(), + long logicalResourceId = this.storeResource(resource.getResourceType(), parameters, resource.getLogicalId(), resource.getDataStream().inputStream(), @@ -125,11 +125,11 @@ public Resource insert(Resource resource, List paramete resource.setIfNoneMatchVersion(outIfNoneMatchVersion.get()); } else { resource.setInteractionStatus(InteractionStatus.MODIFIED); - resource.setId(resourceId); + resource.setLogicalResourceId(logicalResourceId); } if (logger.isLoggable(Level.FINE)) { - logger.fine("Successfully inserted Resource. id=" + resource.getId() + " executionTime=" + dbCallDuration + "ms"); + logger.fine("Successfully inserted Resource. logicalResourceId=" + resource.getLogicalResourceId() + " executionTime=" + dbCallDuration + "ms"); } } catch(FHIRPersistenceDBConnectException | FHIRPersistenceDataAccessException e) { throw e; @@ -459,7 +459,7 @@ identityCache, getResourceReferenceDAO(), getTransactionData())) { } logger.exiting(CLASSNAME, METHODNAME); - return v_resource_id; + return v_logical_resource_id; } /** diff --git a/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/app/Main.java b/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/app/Main.java index 2d738a0821e..e443fecd44d 100644 --- a/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/app/Main.java +++ b/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/app/Main.java @@ -533,7 +533,7 @@ protected void buildFhirDataSchemaModel(PhysicalDataModel pdm) { gen.buildDatabaseSpecificArtifactsPostgres(pdm); break; case CITUS: - gen.buildDatabaseSpecificArtifactsPostgres(pdm); + gen.buildDatabaseSpecificArtifactsCitus(pdm); break; default: throw new IllegalStateException("Unsupported db type: " + dbType); @@ -764,8 +764,11 @@ protected boolean updateSchema(PhysicalDataModel pdm, SchemaType schemaType) { } /** - * Apply any table distribution rules in one transaction and then add all the - * FK constraints that are needed + * Apply any table distribution rules then add all the + * FK constraints that are needed. Applying all the distribution rules + * in one transaction causes issues with Citus/PostgreSQL (out of shared memory + * errors) so instead we provide a function to allow the visitor to break + * things up into smaller transactions. * @param pdm */ private void applyDistributionRules(PhysicalDataModel pdm, SchemaType schemaType) { diff --git a/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/build/DistributedSchemaAdapter.java b/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/build/DistributedSchemaAdapter.java index a56e0b41908..237fc96a4e1 100644 --- a/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/build/DistributedSchemaAdapter.java +++ b/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/build/DistributedSchemaAdapter.java @@ -79,9 +79,4 @@ public void applyDistributionRules(String schemaName, String tableName, Distribu DistributionContext dc = createContext(distributionType, distributionColumnName); databaseAdapter.applyDistributionRules(schemaName, tableName, dc); } - - @Override - public void distributeFunction(String schemaName, String functionName, int distributeByParamNumber) { - // NOP for now - } } diff --git a/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/control/FhirSchemaGenerator.java b/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/control/FhirSchemaGenerator.java index 5ffd17e61c4..2dd8904f59b 100644 --- a/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/control/FhirSchemaGenerator.java +++ b/fhir-persistence-schema/src/main/java/com/ibm/fhir/schema/control/FhirSchemaGenerator.java @@ -142,6 +142,7 @@ public class FhirSchemaGenerator { private static final String ADD_PARAMETER_NAME = "ADD_PARAMETER_NAME"; private static final String ADD_RESOURCE_TYPE = "ADD_RESOURCE_TYPE"; private static final String ADD_ANY_RESOURCE = "ADD_ANY_RESOURCE"; + private static final String ADD_LOGICAL_RESOURCE_IDENT = "ADD_LOGICAL_RESOURCE_IDENT"; // Special procedure for Citus database support private static final String ADD_LOGICAL_RESOURCE = "ADD_LOGICAL_RESOURCE"; @@ -545,6 +546,83 @@ public void buildDatabaseSpecificArtifactsPostgres(PhysicalDataModel model) { fd.addTag(SCHEMA_GROUP_TAG, FHIRDATA_GROUP); } + /** + * Add stored procedures/functions for Citus (largely based on PostgreSQL, but some functions are distributed + * based on a parameter to make them more efficient. + * @implNote https://docs.microsoft.com/en-us/azure/postgresql/hyperscale/reference-functions#create_distributed_function + * @param model + */ + public void buildDatabaseSpecificArtifactsCitus(PhysicalDataModel model) { + // Have to use different object names from DB2, because the group processing doesn't support 2 objects with the same name. + final String ROOT_DIR = "postgres/"; + final String CITUS_ROOT_DIR = "citus/"; + FunctionDef fd = model.addFunction(this.schemaName, + ADD_CODE_SYSTEM, + FhirSchemaVersion.V0001.vid(), + () -> SchemaGeneratorUtil.readTemplate(adminSchemaName, schemaName, ROOT_DIR + ADD_CODE_SYSTEM.toLowerCase() + ".sql", null), + Arrays.asList(fhirSequence, codeSystemsTable, allTablesComplete), + procedurePrivileges); + fd.addTag(SCHEMA_GROUP_TAG, FHIRDATA_GROUP); + + fd = model.addFunction(this.schemaName, + ADD_PARAMETER_NAME, + FhirSchemaVersion.V0001.vid(), + () -> SchemaGeneratorUtil.readTemplate(adminSchemaName, schemaName, ROOT_DIR + ADD_PARAMETER_NAME.toLowerCase() + + ".sql", null), + Arrays.asList(fhirSequence, parameterNamesTable, allTablesComplete), procedurePrivileges); + fd.addTag(SCHEMA_GROUP_TAG, FHIRDATA_GROUP); + + fd = model.addFunction(this.schemaName, + ADD_RESOURCE_TYPE, + FhirSchemaVersion.V0001.vid(), + () -> SchemaGeneratorUtil.readTemplate(adminSchemaName, schemaName, ROOT_DIR + ADD_RESOURCE_TYPE.toLowerCase() + + ".sql", null), + Arrays.asList(fhirSequence, resourceTypesTable, allTablesComplete), procedurePrivileges); + fd.addTag(SCHEMA_GROUP_TAG, FHIRDATA_GROUP); + + // We currently only support functions with PostgreSQL, although this is really just a procedure + final String deleteResourceParametersScript; + final String addAnyResourceScript; + final String eraseResourceScript; + final String schemaTypeSuffix = getSchemaTypeSuffix(); + addAnyResourceScript = CITUS_ROOT_DIR + ADD_ANY_RESOURCE.toLowerCase() + schemaTypeSuffix; + deleteResourceParametersScript = ROOT_DIR + DELETE_RESOURCE_PARAMETERS.toLowerCase() + ".sql"; + eraseResourceScript = ROOT_DIR + ERASE_RESOURCE.toLowerCase() + ".sql"; + final String addLogicalResourceIdentScript = CITUS_ROOT_DIR + ADD_LOGICAL_RESOURCE_IDENT.toLowerCase() + ".sql"; + + FunctionDef deleteResourceParameters = model.addFunction(this.schemaName, + DELETE_RESOURCE_PARAMETERS, + FhirSchemaVersion.V0020.vid(), + () -> SchemaGeneratorUtil.readTemplate(adminSchemaName, schemaName, deleteResourceParametersScript, null), + Arrays.asList(fhirSequence, resourceTypesTable, allTablesComplete), + procedurePrivileges, 2); // distributed by p_logical_resource_id + deleteResourceParameters.addTag(SCHEMA_GROUP_TAG, FHIRDATA_GROUP); + + // For Citus, we use an additional function to create/lock the logical_resource_ident record + // Function is distributed by p_logical_id (parameter 2) + fd = model.addFunction(this.schemaName, + ADD_LOGICAL_RESOURCE_IDENT, + FhirSchemaVersion.V0001.vid(), + () -> SchemaGeneratorUtil.readTemplate(adminSchemaName, schemaName, addLogicalResourceIdentScript, null), + Arrays.asList(fhirSequence, resourceTypesTable, deleteResourceParameters, allTablesComplete), procedurePrivileges, 2); + fd.addTag(SCHEMA_GROUP_TAG, FHIRDATA_GROUP); + + // Function is distributed by p_logical_resource_id (parameter 1) + fd = model.addFunction(this.schemaName, + ADD_ANY_RESOURCE, + FhirSchemaVersion.V0001.vid(), + () -> SchemaGeneratorUtil.readTemplate(adminSchemaName, schemaName, addAnyResourceScript, null), + Arrays.asList(fhirSequence, resourceTypesTable, deleteResourceParameters, allTablesComplete), procedurePrivileges, 1); + fd.addTag(SCHEMA_GROUP_TAG, FHIRDATA_GROUP); + + fd = model.addFunction(this.schemaName, + ERASE_RESOURCE, + FhirSchemaVersion.V0013.vid(), + () -> SchemaGeneratorUtil.readTemplate(adminSchemaName, schemaName, eraseResourceScript, null), + Arrays.asList(fhirSequence, resourceTypesTable, deleteResourceParameters, allTablesComplete), procedurePrivileges); + fd.addTag(SCHEMA_GROUP_TAG, FHIRDATA_GROUP); + } + /** * Get the suffix to select the appropriate procedure/function script * for the schema type diff --git a/fhir-persistence-schema/src/main/resources/citus/add_any_resource.sql b/fhir-persistence-schema/src/main/resources/citus/add_any_resource.sql new file mode 100644 index 00000000000..5c719b387cf --- /dev/null +++ b/fhir-persistence-schema/src/main/resources/citus/add_any_resource.sql @@ -0,0 +1,190 @@ +------------------------------------------------------------------------------- +-- (C) Copyright IBM Corp. 2020, 2022 +-- +-- SPDX-License-Identifier: Apache-2.0 +------------------------------------------------------------------------------- + +-- ---------------------------------------------------------------------------- +-- Procedure to add a resource version and its associated parameters. These +-- parameters only ever point to the latest version of a resource, never to +-- previous versions, which are kept to support history queries. +-- From V0027, we now use a logical_resource_ident table for locking. Records +-- can be created in this table either by this procedure, or as part of +-- reference parameter processing. +-- +-- This variant is for use with the Citus distributed variant of the schema. +-- This function is distributed by logical_resource_id (the first parameter) +-- because all SQL/DML it executes uses logical_resource_id. The +-- logical_resource_ident record must already exist and be locked for update +-- before this function is called. +-- +-- Because this function is distributed all object names must be fully +-- qualified. +-- +-- implNote - Conventions: +-- p_... prefix used to represent input parameters +-- v_... prefix used to represent declared variables +-- t_... prefix used to represent temp variables +-- o_... prefix used to represent output parameters +-- Parameters: +-- p_logical_resource_id: the logical_resource_ident primary key value for the resource +-- p_resource_type_id: the resource_type_id from resource_types +-- p_resource_type: the resource type name +-- p_logical_id: the logical id given to the resource by the FHIR server +-- p_payload: the BLOB (of JSON) which is the resource content +-- p_last_updated the last_updated time given by the FHIR server +-- p_is_deleted: the soft delete flag +-- p_version_id: the intended new version id of the resource (matching the JSON payload) +-- p_parameter_hash_b64 the Base64 encoded hash of parameter values +-- p_if_none_match the encoded If-None-Match value +-- o_current_parameter_hash: Base64 current parameter hash if existing resource +-- o_interaction_status: output indicating whether a change was made or IfNoneMatch hit +-- o_if_none_match_version: output revealing the version found when o_interaction_status is 1 (IfNoneMatch) +-- Exceptions: +-- SQLSTATE 99001: on version conflict (concurrency) +-- SQLSTATE 99002: missing expected row (data integrity) +-- SQLSTATE 99004: delete a currently deleted resource (data integrity) +-- ---------------------------------------------------------------------------- + ( IN p_logical_resource_id BIGINT, + IN p_resource_type_id INT, + IN p_resource_type VARCHAR( 36), + IN p_logical_id VARCHAR(255), + IN p_payload BYTEA, + IN p_last_updated TIMESTAMP, + IN p_is_deleted CHAR( 1), + IN p_source_key VARCHAR( 64), + IN p_version INT, + IN p_parameter_hash_b64 VARCHAR( 44), + IN p_if_none_match INT, + IN p_resource_payload_key VARCHAR( 36), + OUT o_current_parameter_hash VARCHAR( 44), + OUT o_interaction_status INT, + OUT o_if_none_match_version INT) + LANGUAGE plpgsql + AS $$ + + DECLARE + v_schema_name VARCHAR(128); + t_logical_resource_id BIGINT := NULL; + v_current_resource_id BIGINT := NULL; + v_resource_id BIGINT := NULL; + v_currently_deleted CHAR(1) := NULL; + v_new_resource INT := 0; + v_duplicate INT := 0; + v_current_version INT := 0; + v_ghost_resource INT := 0; + v_change_type CHAR(1) := NULL; + +BEGIN + -- default value unless we hit If-None-Match + o_interaction_status := 0; + + -- LOADED ON: {{DATE}} + v_schema_name := '{{SCHEMA_NAME}}'; + + -- Grab the new resource_id so that we can use it right away (and skip an update to xx_logical_resources later) + SELECT NEXTVAL('{{SCHEMA_NAME}}.fhir_sequence') INTO v_resource_id; + + -- Read the record from logical_resources to see if this is an existing resource + SELECT logical_resource_id, parameter_hash, is_deleted + INTO t_logical_resource_id, o_current_parameter_hash, v_currently_deleted + FROM {{SCHEMA_NAME}}.logical_resources + WHERE logical_resource_id = p_logical_resource_id; + IF (t_logical_resource_id IS NULL) + THEN + v_new_resource := 1; + -- we already own the lock on the ident record, so we can safely create + -- the corresponding records in the logical_resources and resource-type-specific + -- xx_logical_resources tables + INSERT INTO {{SCHEMA_NAME}}.logical_resources (logical_resource_id, resource_type_id, logical_id, reindex_tstamp, is_deleted, last_updated, parameter_hash) + VALUES (p_logical_resource_id, p_resource_type_id, p_logical_id, '1970-01-01', p_is_deleted, p_last_updated, p_parameter_hash_b64) ON CONFLICT DO NOTHING; + + EXECUTE 'INSERT INTO ' || v_schema_name || '.' || p_resource_type || '_logical_resources (logical_resource_id, logical_id, is_deleted, last_updated, version_id, current_resource_id) ' + || ' VALUES ($1, $2, $3, $4, $5, $6)' USING p_logical_resource_id, p_logical_id, p_is_deleted, p_last_updated, p_version, v_resource_id; + + -- Since the resource did not previously exist, make sure o_current_parameter_hash is null + o_current_parameter_hash := NULL; + ELSE + -- as this is an existing resource, we need to know the current resource id. + -- This is only available at the resource-specific logical_resources level + EXECUTE + 'SELECT current_resource_id, version_id FROM ' || v_schema_name || '.' || p_resource_type || '_logical_resources ' + || ' WHERE logical_resource_id = $1 ' + INTO v_current_resource_id, v_current_version USING p_logical_resource_id; + + IF v_current_resource_id IS NULL OR v_current_version IS NULL + THEN + -- our concurrency protection means that this shouldn't happen + RAISE 'Schema data corruption - missing logical resource' USING ERRCODE = '99002'; + END IF; + + -- If-None-Match does not apply if the resource is currently deleted + IF v_currently_deleted = 'N' AND p_if_none_match = 0 + THEN + -- If-None-Match hit. Raising an exception here causes PostgreSQL to mark the + -- connection with a fatal error, so instead we use an out parameter to + -- indicate the match + o_interaction_status := 1; + o_if_none_match_version := v_current_version; + RETURN; + END IF; + + -- Concurrency check: + -- the version parameter we've been given (which is also embedded in the JSON payload) must be + -- one greater than the current version, otherwise we've hit a concurrent update race condition + IF p_version != v_current_version + 1 + THEN + RAISE 'Concurrent update - mismatch of version in JSON' USING ERRCODE = '99001'; + END IF; + + -- Prevent creating a new deletion marker if the resource is currently deleted + IF v_currently_deleted = 'Y' AND p_is_deleted = 'Y' + THEN + RAISE 'Unexpected attempt to delete a Resource which is currently deleted' USING ERRCODE = '99004'; + END IF; + + IF o_current_parameter_hash IS NULL OR p_parameter_hash_b64 != o_current_parameter_hash + THEN + -- existing resource, so need to delete all its parameters (select because it's a function, not a procedure) + -- TODO patch parameter sets instead of all delete/all insert. + EXECUTE 'SELECT {{SCHEMA_NAME}}.delete_resource_parameters($1, $2)' + USING p_resource_type, p_logical_resource_id; + END IF; -- end if check parameter hash + END IF; -- end if existing resource + + -- create the new resource version entry in xx_resources + EXECUTE + 'INSERT INTO ' || v_schema_name || '.' || p_resource_type || '_resources (resource_id, logical_resource_id, version_id, data, last_updated, is_deleted, resource_payload_key) ' + || ' VALUES ($1, $2, $3, $4, $5, $6, $7)' + USING v_resource_id, p_logical_resource_id, p_version, p_payload, p_last_updated, p_is_deleted, p_resource_payload_key; + + IF v_new_resource = 0 THEN + -- As this is an existing logical resource, we need to update the xx_logical_resource values to match + -- the values of the current resource. For new resources, these are added by the insert so we don't + -- need to update them here. + EXECUTE 'UPDATE ' || v_schema_name || '.' || p_resource_type || '_logical_resources SET current_resource_id = $1, is_deleted = $2, last_updated = $3, version_id = $4 WHERE logical_resource_id = $5' + USING v_resource_id, p_is_deleted, p_last_updated, p_version, p_logical_resource_id; + + -- For V0014 we now also store is_deleted and last_updated values at the whole-system logical_resources level + EXECUTE 'UPDATE ' || v_schema_name || '.logical_resources SET is_deleted = $1, last_updated = $2, parameter_hash = $3 WHERE logical_resource_id = $4' + USING p_is_deleted, p_last_updated, p_parameter_hash_b64, p_logical_resource_id; + END IF; + + -- Finally, write a record to RESOURCE_CHANGE_LOG which records each event + -- related to resources changes (issue-1955) + IF p_is_deleted = 'Y' + THEN + v_change_type := 'D'; + ELSE + IF v_new_resource = 0 AND v_currently_deleted = 'N' + THEN + v_change_type := 'U'; + ELSE + v_change_type := 'C'; + END IF; + END IF; + + INSERT INTO {{SCHEMA_NAME}}.resource_change_log(resource_id, change_tstamp, resource_type_id, logical_resource_id, version_id, change_type) + VALUES (v_resource_id, p_last_updated, p_resource_type_id, p_logical_resource_id, p_version, v_change_type); + +END $$; diff --git a/fhir-persistence-schema/src/main/resources/citus/add_logical_resource_ident.sql b/fhir-persistence-schema/src/main/resources/citus/add_logical_resource_ident.sql new file mode 100644 index 00000000000..4ea33574c96 --- /dev/null +++ b/fhir-persistence-schema/src/main/resources/citus/add_logical_resource_ident.sql @@ -0,0 +1,80 @@ +------------------------------------------------------------------------------- +-- (C) Copyright IBM Corp. 2022 +-- +-- SPDX-License-Identifier: Apache-2.0 +------------------------------------------------------------------------------- + +-- ---------------------------------------------------------------------------- +-- Procedure to either create or select for update a logical_resource_ident +-- record. For Citus, this part of the "add_any_resource" logic is split +-- off into its own function here which allows us to distribute the function +-- on logical_id, which is used in all the SQL/DML executed by this +-- function. This allows Citus to push execution of the entire function +-- down to the worker node. +-- +-- implNote - Conventions: +-- p_... prefix used to represent input parameters +-- v_... prefix used to represent declared variables +-- t_... prefix used to represent temp variables +-- o_... prefix used to represent output parameters +-- Parameters: +-- p_resource_type_id: the resource type id from resource_types +-- p_logical_id: the logical id given to the resource by the FHIR server +-- o_logical_resource_id: output field returning the newly assigned logical_resource_id value +-- +-- ---------------------------------------------------------------------------- + ( IN p_resource_type_id INT, + IN p_logical_id VARCHAR(64), + OUT o_logical_resource_id BIGINT) + LANGUAGE plpgsql + AS $$ + + DECLARE + v_schema_name VARCHAR(128); + v_logical_resource_id BIGINT := NULL; + t_logical_resource_id BIGINT := NULL; + + -- Because we don't really update any existing key, so use NO KEY UPDATE to achieve better concurrence performance. + lock_cur CURSOR (t_resource_type_id INT, t_logical_id VARCHAR(1024)) FOR SELECT logical_resource_id FROM {{SCHEMA_NAME}}.logical_resource_ident WHERE resource_type_id = t_resource_type_id AND logical_id = t_logical_id FOR NO KEY UPDATE; + +BEGIN + + -- LOADED ON: {{DATE}} + v_schema_name := '{{SCHEMA_NAME}}'; + + -- Get a lock on the logical resource identity record + OPEN lock_cur(t_resource_type_id := p_resource_type_id, t_logical_id := p_logical_id); + FETCH lock_cur INTO v_logical_resource_id; + CLOSE lock_cur; + + -- Create the resource ident record if we don't have it already + IF v_logical_resource_id IS NULL + THEN + -- allocate the new logical_resource_id value + SELECT nextval('{{SCHEMA_NAME}}.fhir_sequence') INTO v_logical_resource_id; + + -- remember that we have a concurrent system...so there is a possibility + -- that another thread snuck in before us and created the ident record. To + -- handle this in PostgreSQL, we INSERT...ON CONFLICT DO NOTHING, then turn + -- around and read again to check that the logical_resource_id in the table + -- matches the value we tried to insert. + INSERT INTO {{SCHEMA_NAME}}.logical_resource_ident (resource_type_id, logical_id, logical_resource_id) + VALUES (p_resource_type_id, p_logical_id, v_logical_resource_id) ON CONFLICT DO NOTHING; + + -- Do a read so that we can verify that *we* did the insert + OPEN lock_cur(t_resource_type_id := p_resource_type_id, t_logical_id := p_logical_id); + FETCH lock_cur INTO t_logical_resource_id; + CLOSE lock_cur; + + IF v_logical_resource_id != t_logical_resource_id + THEN + -- logical_resource_ident record was created by another thread...so use that id instead + v_logical_resource_id := t_logical_resource_id; + END IF; + END IF; + + -- Hand back the id of the logical resource we created earlier. In the new R4 schema + -- only the logical_resource_id is the target of any FK, so there's no need to return + -- the resource_id (which is now private to the _resources tables). + o_logical_resource_id := v_logical_resource_id; +END $$;