diff --git a/docs/src/pages/guides/FHIRServerUsersGuide.md b/docs/src/pages/guides/FHIRServerUsersGuide.md index dda5dd4569c..8c60804ebc0 100644 --- a/docs/src/pages/guides/FHIRServerUsersGuide.md +++ b/docs/src/pages/guides/FHIRServerUsersGuide.md @@ -2266,6 +2266,7 @@ This section contains reference information about each of the configuration prop |`fhirServer/operations/membermatch/extendedProps`|object|The extended options for the extended member match implementation| |`fhirServer/operations/everything/includeTypes`|list|The list of related resource types to include alongside the patient compartment resource types. Instances of these resource types will only be returned when they are referenced from one or more resource instances from the target patient compartment. Example values are like `Location`, `Medication`, `Organization`, and `Practitioner`| |`fhirServer/remoteIndexService/type`|string| The type of service used to send remote index messages. Only `kafka` is currently supported| +|`fhirServer/remoteIndexService/instanceIdentifier`|string| A UUID or other identifier unique to this cluster of IBM FHIR Servers | |`fhirServer/remoteIndexService/kafka/mode`|string| Current operation mode of the service. Specify `ACTIVE` to use the service| |`fhirServer/remoteIndexService/kafka/topicName`|string| The Kafka topic name. Typically `FHIR_REMOTE_INDEX` | |`fhirServer/remoteIndexService/kafka/connectionProperties/bootstrap.servers`|string| Bootstrap servers for the Kafka service | @@ -2421,6 +2422,7 @@ This section contains reference information about each of the configuration prop |`fhirServer/operations/membermatch/extendedProps`|empty| |`fhirServer/operations/everything/includeTypes`|null| |`fhirServer/remoteIndexService/type`|null| +|`fhirServer/remoteIndexService/instanceIdentifier`|null| |`fhirServer/remoteIndexService/kafka/mode`|null| |`fhirServer/remoteIndexService/kafka/topicName`|null| |`fhirServer/remoteIndexService/kafka/connectionProperties/bootstrap.servers`|null| @@ -2613,6 +2615,7 @@ Cases where that behavior is not supported are marked below with an `N` in the ` |`fhirServer/operations/membermatch/extendedProps`|Y|Y|Y| |`fhirServer/operations/everything/includeTypes`|Y|Y|N| |`fhirServer/remoteIndexService/type`|N|N|N| +|`fhirServer/remoteIndexService/instanceIdentifier`|N|N|N| |`fhirServer/remoteIndexService/kafka/mode`|N|N|N| |`fhirServer/remoteIndexService/kafka/topicName`|N|N|N| |`fhirServer/remoteIndexService/kafka/connectionProperties/bootstrap.servers`|N|N|N| diff --git a/fhir-config/src/main/java/com/ibm/fhir/config/FHIRConfiguration.java b/fhir-config/src/main/java/com/ibm/fhir/config/FHIRConfiguration.java index d70fdcb2e39..48cbf4e6010 100644 --- a/fhir-config/src/main/java/com/ibm/fhir/config/FHIRConfiguration.java +++ b/fhir-config/src/main/java/com/ibm/fhir/config/FHIRConfiguration.java @@ -115,6 +115,7 @@ public class FHIRConfiguration { // Configuration properties for the Kafka-based async index service public static final String PROPERTY_REMOTE_INDEX_SERVICE_TYPE = "fhirServer/remoteIndexService/type"; + public static final String PROPERTY_REMOTE_INDEX_SERVICE_INSTANCEIDENTIFIER = "fhirServer/remoteIndexService/instanceIdentifier"; public static final String PROPERTY_KAFKA_INDEX_SERVICE_TOPICNAME = "fhirServer/remoteIndexService/kafka/topicName"; public static final String PROPERTY_KAFKA_INDEX_SERVICE_CONNECTIONPROPS = "fhirServer/remoteIndexService/kafka/connectionProperties"; public static final String PROPERTY_KAFKA_INDEX_SERVICE_MODE = "fhirServer/remoteIndexService/kafka/mode"; diff --git a/fhir-core/src/main/java/com/ibm/fhir/core/util/LogSupport.java b/fhir-core/src/main/java/com/ibm/fhir/core/util/LogSupport.java new file mode 100644 index 00000000000..1b17b6a2cbb --- /dev/null +++ b/fhir-core/src/main/java/com/ibm/fhir/core/util/LogSupport.java @@ -0,0 +1,52 @@ +/* + * (C) Copyright IBM Corp. 2022 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.core.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Static support functions related to logging + */ +public class LogSupport { + private static final String MASK = "*****"; + private static final Pattern PASSWORD_EQ_PATTERN = Pattern.compile("[^\"]password[= ]*\"([^\"]*)\"", Pattern.CASE_INSENSITIVE); + private static final Pattern PASSWORD_PATTERN = Pattern.compile("\"password\"[: ]*\"([^\"]*)\"", Pattern.CASE_INSENSITIVE); + + /** + * Hide any text in quotes following the token "password" to avoid writing secrets to log files + * @param input + * @return + */ + public static String hidePassword(String input) { + String result = hidePassword(input, PASSWORD_EQ_PATTERN); + result = hidePassword(result, PASSWORD_PATTERN); + return result; + } + + /** + * Replace any text matching the given pattern with the MASK value + * @param input + * @param pattern + * @return + */ + private static String hidePassword(String input, Pattern pattern) { + final Matcher m = pattern.matcher(input); + final StringBuffer result = new StringBuffer(); + while (m.find()) { + final String match = m.group(); + final int start = m.start(); + m.appendReplacement(result, + match.substring(0, + m.start(1) - start) + + MASK + + match.substring(m.end(1) - start, m.end() - start)); + } + m.appendTail(result); + return result.toString(); + } +} diff --git a/fhir-core/src/test/java/com/ibm/fhir/core/util/test/LogSupportTest.java b/fhir-core/src/test/java/com/ibm/fhir/core/util/test/LogSupportTest.java new file mode 100644 index 00000000000..dd925c11832 --- /dev/null +++ b/fhir-core/src/test/java/com/ibm/fhir/core/util/test/LogSupportTest.java @@ -0,0 +1,46 @@ +/* + * (C) Copyright IBM Corp. 2022 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.core.util.test; + +import static org.testng.Assert.assertEquals; + +import org.testng.annotations.Test; + +import com.ibm.fhir.core.util.LogSupport; + +/** + * Unit tests for {@link LogSupport} methods + */ +public class LogSupportTest { + + @Test + public void testPassReplaceEquals() { + assertEquals(LogSupport.hidePassword("something password=\"change-password\" something else"), "something password=\"*****\" something else"); + } + + @Test + public void testPassReplaceEqualsWithSpace() { + assertEquals(LogSupport.hidePassword("something password = \"change-password\" something else"), "something password = \"*****\" something else"); + } + + @Test + public void testPassReplaceJson() { + assertEquals(LogSupport.hidePassword("something \"password\": \"change-password\" something else"), "something \"password\": \"*****\" something else"); + } + + @Test + public void testPassReplaceJsonCompact() { + assertEquals(LogSupport.hidePassword("something \"password\":\"change-password\" something else"), "something \"password\":\"*****\" something else"); + } + + @Test + public void testPassReplaceMixed() { + final String src = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"change-password-\";"; + final String tgt = src.replace("change-password-", "*****"); + assertEquals(LogSupport.hidePassword(src), tgt); + } +} diff --git a/fhir-persistence-schema/docs/physical_schema_V0027.png b/fhir-persistence-schema/docs/physical_schema_V0027.png index 48c12d2e6d1..9b6770cbc66 100644 Binary files a/fhir-persistence-schema/docs/physical_schema_V0027.png and b/fhir-persistence-schema/docs/physical_schema_V0027.png differ diff --git a/fhir-persistence/pom.xml b/fhir-persistence/pom.xml index 6d22adfc846..0ba50f2d2cf 100644 --- a/fhir-persistence/pom.xml +++ b/fhir-persistence/pom.xml @@ -36,6 +36,10 @@ fhir-search ${project.version} + + com.google.code.gson + gson + ${project.groupId} fhir-model diff --git a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/helper/RemoteIndexSupport.java b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/helper/RemoteIndexSupport.java new file mode 100644 index 00000000000..caa35512fca --- /dev/null +++ b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/helper/RemoteIndexSupport.java @@ -0,0 +1,73 @@ +/* + * (C) Copyright IBM Corp. 2022 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.persistence.helper; + +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.logging.Logger; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializer; +import com.ibm.fhir.persistence.index.RemoteIndexMessage; + +/** + * Utility methods supporting the fhir-remote-index consumer + */ +public class RemoteIndexSupport { + private static final Logger logger = Logger.getLogger(RemoteIndexSupport.class.getName()); + private static final DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT; + + /** + * Get an instance of Gson configured to support serialization/deserialization of + * remote index messages (sent through Kafka as strings) + * @return + */ + public static Gson getGson() { + Gson gson = new GsonBuilder() + .registerTypeAdapter(Instant.class, (JsonSerializer) (value, type, context) -> + new JsonPrimitive(formatter.format(value)) + ) + .registerTypeAdapter(Instant.class, (JsonDeserializer) (jsonElement, type, context) -> + formatter.parse(jsonElement.getAsString(), Instant::from) + ) + .create(); + + return gson; + } + + /** + * Unmarshall the JSON payload parameter as a RemoteIndexMessage + * @param jsonPayload + * @return + */ + public static RemoteIndexMessage unmarshall(String jsonPayload) { + try { + Gson gson = getGson(); + return gson.fromJson(jsonPayload, RemoteIndexMessage.class); + } catch (Throwable t) { + // We need to sink this error to avoid poison messages from + // blocking the queues. + // TODO. Perhaps push this to a dedicated error topic + logger.severe("Not a RemoteIndexMessage. Ignoring: '" + jsonPayload + "'"); + } + return null; + + } + + /** + * Marshall the RemoteIndexMessage to a JSON string + * @param message + * @return + */ + public static String marshallToString(RemoteIndexMessage message) { + Gson gson = getGson(); + return gson.toJson(message); + } +} diff --git a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/RemoteIndexMessage.java b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/RemoteIndexMessage.java index 8cfbdf248fb..663cd5337b3 100644 --- a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/RemoteIndexMessage.java +++ b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/RemoteIndexMessage.java @@ -12,6 +12,7 @@ public class RemoteIndexMessage { private String tenantId; private int messageVersion; + private String instanceIdentifier; private SearchParametersTransport data; @Override @@ -65,5 +66,19 @@ public int getMessageVersion() { public void setMessageVersion(int messageVersion) { this.messageVersion = messageVersion; } + + /** + * @return the instanceIdentifier + */ + public String getInstanceIdentifier() { + return instanceIdentifier; + } + + /** + * @param instanceIdentifier the instanceIdentifier to set + */ + public void setInstanceIdentifier(String instanceIdentifier) { + this.instanceIdentifier = instanceIdentifier; + } } diff --git a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/SearchParametersTransport.java b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/SearchParametersTransport.java index cd5954b151a..dcd762c7d8a 100644 --- a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/SearchParametersTransport.java +++ b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/SearchParametersTransport.java @@ -7,6 +7,7 @@ package com.ibm.fhir.persistence.index; import java.time.Instant; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; @@ -563,19 +564,28 @@ public void setParameterHash(String parameterHash) { } /** - * @return the lastUpdated + * @return the lastUpdated (UTC) */ public Instant getLastUpdated() { return lastUpdated; } /** - * @param lastUpdated the lastUpdated to set + * @param lastUpdated the lastUpdated to set. */ public void setLastUpdated(Instant lastUpdated) { this.lastUpdated = lastUpdated; } + /** + * Convenience function to get the lastUpdated time as an Instant. All our times are + * always UTC. + * @return + */ + public Instant getLastUpdatedInstant() { + return Instant.from(lastUpdated.atOffset(ZoneOffset.UTC)); + } + /** * @return the refValues */ diff --git a/fhir-remote-index/src/test/java/com/ibm/fhir/remote/index/MessageSerializationTest.java b/fhir-persistence/src/test/java/com/ibm/fhir/persistence/helper/MessageSerializationTest.java similarity index 90% rename from fhir-remote-index/src/test/java/com/ibm/fhir/remote/index/MessageSerializationTest.java rename to fhir-persistence/src/test/java/com/ibm/fhir/persistence/helper/MessageSerializationTest.java index c3257eb9fce..01fbb5e39bf 100644 --- a/fhir-remote-index/src/test/java/com/ibm/fhir/remote/index/MessageSerializationTest.java +++ b/fhir-persistence/src/test/java/com/ibm/fhir/persistence/helper/MessageSerializationTest.java @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package com.ibm.fhir.remote.index; +package com.ibm.fhir.persistence.helper; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -67,16 +67,16 @@ public void testRoundtrip() throws Exception { adapter.tokenValue("token-param", valueSystem, valueCode, compositeId); sent.setData(adapter.build()); - final String payload = marshallToString(sent); + final String payload = RemoteIndexSupport.marshallToString(sent); // Now unmarshall the payload and check everything matches - RemoteIndexMessage rcvd = unmarshallPayload(payload); + RemoteIndexMessage rcvd = RemoteIndexSupport.unmarshall(payload); assertNotNull(rcvd); assertEquals(rcvd.getMessageVersion(), RemoteIndexConstants.MESSAGE_VERSION); SearchParametersTransport data = rcvd.getData(); assertNotNull(data); assertEquals(data.getParameterHash(), parameterHash); - assertEquals(data.getLastUpdated(), lastUpdated); + assertEquals(data.getLastUpdatedInstant(), lastUpdated); assertEquals(data.getLogicalResourceId(), logicalResourceId); assertEquals(data.getResourceType(), resourceType); assertEquals(data.getLogicalId(), logicalId); @@ -115,17 +115,14 @@ public void testRoundtrip() throws Exception { assertEquals(data.getTokenValues().get(0).getValueCode(), valueCode); } - /** - * Marshall the message to a string - * @param message - * @return - */ - private String marshallToString(RemoteIndexMessage message) { - final Gson gson = new Gson(); - return gson.toJson(message); - } - private RemoteIndexMessage unmarshallPayload(String jsonPayload) throws Exception { - Gson gson = new Gson(); - return gson.fromJson(jsonPayload, RemoteIndexMessage.class); + @Test + public void testInstant() { + Gson gson = RemoteIndexSupport.getGson(); + Instant x = Instant.now(); + String value = gson.toJson(x); + + // now try and convert the other way + Instant x2 = gson.fromJson(value, Instant.class); + assertEquals(x, x2); } } diff --git a/fhir-remote-index/README.md b/fhir-remote-index/README.md index ffbcf835e71..0214b77a358 100644 --- a/fhir-remote-index/README.md +++ b/fhir-remote-index/README.md @@ -47,6 +47,7 @@ To enable remote indexing of search parameters, add the following `remoteIndexSe ... "remoteIndexService": { "type": "kafka", + "instanceIdenfier": "a-random-uuid-value", "kafka": { "mode": "ACTIVE", "topicName": "FHIR_REMOTE_INDEX", @@ -78,7 +79,8 @@ java -Djava.util.logging.config.file=logging.properties \ --database-properties database.properties \ --kafka-properties kafka.properties \ --topic-name FHIR_REMOTE_INDEX \ - --consumer-count 3 + --consumer-count 3 \ + --instance-identifier "a-random-uuid-value" ``` Logging uses standard `java.util.logging` (JUL) and can be configured as follows: @@ -157,6 +159,7 @@ Note: Citus configuration is the same as PostgreSQL. | --db-type {type} | The type of database. One of `postgresql`, `derby`, `db2` or `citus`. | | --database-properties {properties-file} | A Java properties file containing connection details for the downstream IBM FHIR Server database. | | --topic-name {topic} | The name of the Kafka topic to consume. Default `FHIR_REMOTE_INDEX`. | +| --instance-identifier {uuid} | Each IBM FHIR Server cluster should be allocated a unique instance identifier. This identifier is added to each message sent over Kafka. The consumer will ignore messages unless they include the same instance identifier value. This helps to ensure that messages are processed from only intended sources. | | --consumer-group {grp} | Override the default Kafka consumer group (`group.id` value) for this application. Default `remote-index-service-cg`. | | --schema-type {type} | Set the schema type. One of `PLAIN` or `DISTRIBUTED`. Default is `PLAIN`. The schema type `DISTRIBUTED` is for use with Citus databases. | | --max-ready-time-ms {milliseconds} | The maximum number of milliseconds to wait for the database to contain the correct data for a particular set of consumed messages. Should be slightly longer than the configured Liberty transaction timeout value. | @@ -169,4 +172,4 @@ Because messages are sent to Kafka before the transaction is committed, it is po 1. If the resource version doesn't yet exist in the database, the consumer will pause and wait for the transaction to be committed. The consumer will only wait up to the maximum transaction timeout window, at which point it will assume the transaction has failed and the message will be discarded. 2. If the resource version matches, but the lastUpdated time does not match, it assumes the message came from an IBM FHIR Server which failed before the transaction was committed, but the request was processed successfully by another server. The message will be discarded because there will be another message waiting in the queue from the second attempt. -3. If the resource version in the database already exceeds the version in the message, the message will be discarded because the information is already out of date. There will be another message waiting in the queue containing the search parameter values from the most recent resource. \ No newline at end of file +3. If the resource version in the database already exceeds the version in the message, the message will be discarded because the information is already out of date. There will be another message waiting in the queue containing the search parameter values from the most recent resource. diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java index 9ca6f29b6d1..a7bce7b6362 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java @@ -27,12 +27,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; +import com.ibm.fhir.core.util.LogSupport; import com.ibm.fhir.database.utils.api.IConnectionProvider; import com.ibm.fhir.database.utils.api.IDatabaseTranslator; import com.ibm.fhir.database.utils.api.SchemaType; import com.ibm.fhir.database.utils.citus.CitusTranslator; import com.ibm.fhir.database.utils.citus.ConfigureConnectionDAO; import com.ibm.fhir.database.utils.common.JdbcConnectionProvider; +import com.ibm.fhir.database.utils.derby.DerbyPropertyAdapter; +import com.ibm.fhir.database.utils.derby.DerbyTranslator; import com.ibm.fhir.database.utils.model.DbType; import com.ibm.fhir.database.utils.postgres.PostgresPropertyAdapter; import com.ibm.fhir.database.utils.postgres.PostgresTranslator; @@ -89,6 +92,9 @@ public class Main { private IDatabaseTranslator translator; private IConnectionProvider connectionProvider; private DbType dbType = DbType.POSTGRESQL; + + // Make sure we process messages sent from only the FHIR servers we are configured for + private String instanceIdentifier; /** * Parse the given command line arguments @@ -127,6 +133,13 @@ public void parseArgs(String[] args) { throw new IllegalArgumentException("Missing value for --topic-name"); } break; + case "--instance-identifier": + if (a < args.length && !args[a].startsWith("--")) { + instanceIdentifier = args[a++]; + } else { + throw new IllegalArgumentException("Missing value for --instance-identifier"); + } + break; case "--consumer-group": if (a < args.length && !args[a].startsWith("--")) { consumerGroup = args[a++]; @@ -205,7 +218,7 @@ private String getSchemaName() throws FHIRPersistenceException { public void run() throws FHIRPersistenceException { dumpProperties("kafka", kafkaProperties); dumpProperties("database", databaseProperties); - configureForPostgres(); + configureDatabaseAccess(); initIdentityCache(); // Keep track of how many consumers are still running. If too many fail, @@ -298,6 +311,21 @@ private KafkaConsumer buildConsumer() { return consumer; } + /** + * Set up the database connection + */ + private void configureDatabaseAccess() { + switch (this.dbType) { + case POSTGRESQL: + case CITUS: + configureForPostgres(); + case DERBY: + configureForDerby(); + default: + throw new IllegalArgumentException("Database type not supported: " + this.dbType); + } + } + /** * Set things up to talk to a PostgreSQL database */ @@ -313,6 +341,24 @@ private void configureForPostgres() { connectionProvider = new JdbcConnectionProvider(translator, propertyAdapter); } + /** + * Set things up to talk to a Derby database. Note that the in-memory + * instance of Derby supports only a single JVM and so the FHIR server + * instance would need to be stopped before running this fhir-remote-index + * application. Therefore, this is useful only for development work. + */ + private void configureForDerby() { + this.translator = new DerbyTranslator(); + try { + Class.forName(translator.getDriverClassName()); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + + DerbyPropertyAdapter propertyAdapter = new DerbyPropertyAdapter(databaseProperties); + connectionProvider = new JdbcConnectionProvider(translator, propertyAdapter); + } + /** * Instantiate a new message handler for use by a consumer thread. Each handler gets * its own database connection. @@ -331,15 +377,15 @@ private IMessageHandler buildHandler() throws FHIRPersistenceException { switch (schemaType) { case SHARDED: - return new ShardedPostgresMessageHandler(c, getSchemaName(), identityCache, maxReadyTimeMs); + return new ShardedPostgresMessageHandler(instanceIdentifier, c, getSchemaName(), identityCache, maxReadyTimeMs); case PLAIN: if (dbType == DbType.DERBY) { - return new PlainDerbyMessageHandler(c, getSchemaName(), identityCache, maxReadyTimeMs); + return new PlainDerbyMessageHandler(instanceIdentifier, c, getSchemaName(), identityCache, maxReadyTimeMs); } else { - return new PlainPostgresMessageHandler(c, getSchemaName(), identityCache, maxReadyTimeMs); + return new PlainPostgresMessageHandler(instanceIdentifier, c, getSchemaName(), identityCache, maxReadyTimeMs); } case DISTRIBUTED: - return new DistributedPostgresMessageHandler(c, getSchemaName(), identityCache, maxReadyTimeMs); + return new DistributedPostgresMessageHandler(instanceIdentifier, c, getSchemaName(), identityCache, maxReadyTimeMs); default: throw new FHIRPersistenceException("Schema type not supported: " + schemaType.name()); } @@ -395,6 +441,9 @@ protected void dumpProperties(String which, Properties p) { if (key.toLowerCase().contains("password")) { value = "[*******]"; } + // kill any passwords embedded within a more complex value string + value = LogSupport.hidePassword(value); + if (first) { first = false; } else { @@ -405,7 +454,7 @@ protected void dumpProperties(String which, Properties p) { buffer.append("\"").append(value).append("\""); } buffer.append("}"); - logger.info(which + ": " + buffer.toString()); + logger.fine(which + ": " + buffer.toString()); } } diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/BaseMessageHandler.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/BaseMessageHandler.java index 4a7f54bc09e..b542af4deb6 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/BaseMessageHandler.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/BaseMessageHandler.java @@ -7,15 +7,16 @@ package com.ibm.fhir.remote.index.database; import java.security.SecureRandom; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; -import com.google.gson.Gson; import com.ibm.fhir.database.utils.thread.ThreadHandler; import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; +import com.ibm.fhir.persistence.helper.RemoteIndexSupport; import com.ibm.fhir.persistence.index.DateParameter; import com.ibm.fhir.persistence.index.LocationParameter; import com.ibm.fhir.persistence.index.NumberParameter; @@ -37,6 +38,7 @@ */ public abstract class BaseMessageHandler implements IMessageHandler { private final Logger logger = Logger.getLogger(BaseMessageHandler.class.getName()); + private static final DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT; private static final int MIN_SUPPORTED_MESSAGE_VERSION = 1; // If we fail 10 times due to deadlocks, then something is seriously wrong @@ -45,11 +47,18 @@ public abstract class BaseMessageHandler implements IMessageHandler { private final long maxReadyWaitMs; + // Process messages only from a known origin + private final String instanceIdentifier; + /** * Protected constructor * @param maxReadyWaitMs the max time in ms to wait for the upstream transaction to make the data ready */ - protected BaseMessageHandler(long maxReadyWaitMs) { + protected BaseMessageHandler(String instanceIdentifier, long maxReadyWaitMs) { + if (instanceIdentifier == null || instanceIdentifier.isEmpty()) { + throw new IllegalArgumentException("Must specify an instanceIdentifier value"); + } + this.instanceIdentifier = instanceIdentifier; this.maxReadyWaitMs = maxReadyWaitMs; } @@ -60,10 +69,16 @@ public void process(List messages) throws FHIRPersistenceException { if (logger.isLoggable(Level.FINEST)) { logger.finest("Processing message payload: " + payload); } - RemoteIndexMessage message = unmarshall(payload); + RemoteIndexMessage message = RemoteIndexSupport.unmarshall(payload); if (message != null) { if (message.getMessageVersion() >= MIN_SUPPORTED_MESSAGE_VERSION) { - unmarshalled.add(message); + // check to make sure that the instanceIdentifier matches our configuration. This protects us + // from messages accidentally sent over the same topic from another instance + if (this.instanceIdentifier.equals(message.getInstanceIdentifier())) { + unmarshalled.add(message); + } else { + logger.warning("Message from unknown origin, ignoring payload=[" + payload + "]"); + } } else { logger.warning("Message version [" + message.getMessageVersion() + "] not supported, ignoring payload=[" + payload + "]"); } @@ -136,24 +151,6 @@ private void processWithRetry(List messages) throws FHIRPers */ protected abstract void pushBatch() throws FHIRPersistenceException; - /** - * Unmarshall the json payload string into a RemoteIndexMessage - * @param payload - * @return - */ - private RemoteIndexMessage unmarshall(String jsonPayload) { - Gson gson = new Gson(); - try { - return gson.fromJson(jsonPayload, RemoteIndexMessage.class); - } catch (Throwable t) { - // We need to sink this error to avoid poison messages from - // blocking the queues. - // TODO. Perhaps push this to a dedicated error topic - logger.severe("Not a RemoteIndexMessage. Ignoring: '" + jsonPayload + "'"); - } - return null; - } - /** * Process the list of messages * @param messages diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresMessageHandler.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresMessageHandler.java index 041082adc7c..dd679de3fc2 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresMessageHandler.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresMessageHandler.java @@ -28,13 +28,15 @@ public class DistributedPostgresMessageHandler extends PlainMessageHandler { /** * Public constructor + * + * @param instanceIdentifier * @param connection * @param schemaName * @param cache * @param maxReadyTimeMs */ - public DistributedPostgresMessageHandler(Connection connection, String schemaName, IdentityCache cache, long maxReadyTimeMs) { - super(new PostgresTranslator(), connection, schemaName, cache, maxReadyTimeMs); + public DistributedPostgresMessageHandler(String instanceIdentifier, Connection connection, String schemaName, IdentityCache cache, long maxReadyTimeMs) { + super(instanceIdentifier, new PostgresTranslator(), connection, schemaName, cache, maxReadyTimeMs); } @Override diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainDerbyMessageHandler.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainDerbyMessageHandler.java index 3c7ba2c9234..ad5e36052b6 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainDerbyMessageHandler.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainDerbyMessageHandler.java @@ -26,13 +26,14 @@ public class PlainDerbyMessageHandler extends PlainMessageHandler { /** * Public constructor + * @param instanceIdentifier * @param connection * @param schemaName * @param cache * @param maxReadyTimeMs */ - public PlainDerbyMessageHandler(Connection connection, String schemaName, IdentityCache cache, long maxReadyTimeMs) { - super(new DerbyTranslator(), connection, schemaName, cache, maxReadyTimeMs); + public PlainDerbyMessageHandler(String instanceIdentifier, Connection connection, String schemaName, IdentityCache cache, long maxReadyTimeMs) { + super(instanceIdentifier, new DerbyTranslator(), connection, schemaName, cache, maxReadyTimeMs); } @Override diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainMessageHandler.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainMessageHandler.java index fbdf152c39d..5db820a8209 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainMessageHandler.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainMessageHandler.java @@ -119,13 +119,15 @@ public abstract class PlainMessageHandler extends BaseMessageHandler { /** * Public constructor * + * @param instanceIdentifier + * @param translator * @param connection * @param schemaName * @param cache * @param maxReadyTimeMs */ - public PlainMessageHandler(IDatabaseTranslator translator, Connection connection, String schemaName, IdentityCache cache, long maxReadyTimeMs) { - super(maxReadyTimeMs); + public PlainMessageHandler(String instanceIdentifier, IDatabaseTranslator translator, Connection connection, String schemaName, IdentityCache cache, long maxReadyTimeMs) { + super(instanceIdentifier, maxReadyTimeMs); this.translator = translator; this.connection = connection; this.schemaName = schemaName; @@ -1114,7 +1116,7 @@ protected void checkReady(List messages, List getMessages(long logicalResourceId) { RemoteIndexMessage sent = new RemoteIndexMessage(); sent.setMessageVersion(RemoteIndexConstants.MESSAGE_VERSION); + sent.setInstanceIdentifier(instanceIdentifier); // Create an Observation resource with a few parameters SearchParametersTransportAdapter adapter = new SearchParametersTransportAdapter(OBSERVATION, OBSERVATION_LOGICAL_ID, logicalResourceId, @@ -127,23 +129,12 @@ private List getMessages(long logicalResourceId) { adapter.tagValue("tag-param", valueSystem, valueCode, WHOLE_SYSTEM); sent.setData(adapter.build()); - final String payload = marshallToString(sent); - + final String payload = RemoteIndexSupport.marshallToString(sent); final List result = new ArrayList<>(); result.add(payload); return result; } - /** - * Marshall the message to a string - * @param message - * @return - */ - private String marshallToString(RemoteIndexMessage message) { - final Gson gson = new Gson(); - return gson.toJson(message); - } - @Test public void testFill() throws Exception { final long logicalResourceId; @@ -154,7 +145,7 @@ public void testFill() throws Exception { try (Connection c = connectionProvider.getConnection()) { try { - PlainDerbyMessageHandler handler = new PlainDerbyMessageHandler(c, SCHEMA_NAME, identityCache, 1000L); + PlainDerbyMessageHandler handler = new PlainDerbyMessageHandler(instanceIdentifier, c, SCHEMA_NAME, identityCache, 1000L); handler.process(getMessages(logicalResourceId)); checkData(c, logicalResourceId); c.commit(); diff --git a/fhir-server/src/main/java/com/ibm/fhir/server/index/kafka/FHIRRemoteIndexKafkaService.java b/fhir-server/src/main/java/com/ibm/fhir/server/index/kafka/FHIRRemoteIndexKafkaService.java index 385a3d671ec..2d96cc375ce 100644 --- a/fhir-server/src/main/java/com/ibm/fhir/server/index/kafka/FHIRRemoteIndexKafkaService.java +++ b/fhir-server/src/main/java/com/ibm/fhir/server/index/kafka/FHIRRemoteIndexKafkaService.java @@ -19,8 +19,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import com.google.gson.Gson; import com.ibm.fhir.config.FHIRRequestContext; +import com.ibm.fhir.persistence.helper.RemoteIndexSupport; import com.ibm.fhir.persistence.index.FHIRRemoteIndexService; import com.ibm.fhir.persistence.index.IndexProviderResponse; import com.ibm.fhir.persistence.index.RemoteIndexConstants; @@ -41,6 +41,7 @@ public class FHIRRemoteIndexKafkaService extends FHIRRemoteIndexService { private String topicName = null; private Producer producer; private KafkaPropertyAdapter.Mode mode; + private String instanceIdentifier; /** * Default constructor @@ -55,6 +56,7 @@ public FHIRRemoteIndexKafkaService() { public void init(KafkaPropertyAdapter properties) { this.mode = properties.getMode(); this.topicName = properties.getTopicName(); + this.instanceIdentifier = properties.getInstanceIdentifier(); Properties kafkaProps = new Properties(); properties.putPropertiesTo(kafkaProps); @@ -91,17 +93,6 @@ public void shutdown() { } } - /** - * Render the data value to a JSON string which is the wire format we - * use for remote indexing messages - * @param message - * @return - */ - public String marshallToString(RemoteIndexMessage message) { - final Gson gson = new Gson(); - return gson.toJson(message); - } - @Override public IndexProviderResponse submit(final RemoteIndexData data) { // We rely on the default Kafka partitioner, which in our case will @@ -112,7 +103,7 @@ public IndexProviderResponse submit(final RemoteIndexData data) { msg.setMessageVersion(RemoteIndexConstants.MESSAGE_VERSION); msg.setTenantId(tenantId); msg.setData(data.getSearchParameters()); - final String message = marshallToString(msg); + final String message = RemoteIndexSupport.marshallToString(msg); if (this.mode == Mode.ACTIVE) { ProducerRecord record = new ProducerRecord(topicName, data.getPartitionKey(), message); diff --git a/fhir-server/src/main/java/com/ibm/fhir/server/index/kafka/KafkaPropertyAdapter.java b/fhir-server/src/main/java/com/ibm/fhir/server/index/kafka/KafkaPropertyAdapter.java index 97b3900a97e..6db10342a61 100644 --- a/fhir-server/src/main/java/com/ibm/fhir/server/index/kafka/KafkaPropertyAdapter.java +++ b/fhir-server/src/main/java/com/ibm/fhir/server/index/kafka/KafkaPropertyAdapter.java @@ -16,6 +16,7 @@ public class KafkaPropertyAdapter { private final Properties properties; private final String topicName; + private final String instanceIdentifier; private final Mode mode; public static enum Mode { @@ -25,10 +26,14 @@ public static enum Mode { /** * Public constructor + * + * @param instanceIdentifier * @param topicName * @param properties + * @param mode */ - public KafkaPropertyAdapter(String topicName, Properties properties, Mode mode) { + public KafkaPropertyAdapter(String instanceIdentifier, String topicName, Properties properties, Mode mode) { + this.instanceIdentifier = instanceIdentifier; this.topicName = topicName; this.properties = properties; this.mode = mode; @@ -61,4 +66,11 @@ public String getTopicName() { public Mode getMode() { return mode; } + + /** + * @return the instanceIdentifier + */ + public String getInstanceIdentifier() { + return instanceIdentifier; + } } diff --git a/fhir-server/src/main/java/com/ibm/fhir/server/listener/FHIRServletContextListener.java b/fhir-server/src/main/java/com/ibm/fhir/server/listener/FHIRServletContextListener.java index 9e456f3f8c5..98404ebd189 100644 --- a/fhir-server/src/main/java/com/ibm/fhir/server/listener/FHIRServletContextListener.java +++ b/fhir-server/src/main/java/com/ibm/fhir/server/listener/FHIRServletContextListener.java @@ -26,6 +26,7 @@ import static com.ibm.fhir.config.FHIRConfiguration.PROPERTY_NATS_TLS_ENABLED; import static com.ibm.fhir.config.FHIRConfiguration.PROPERTY_NATS_TRUSTSTORE; import static com.ibm.fhir.config.FHIRConfiguration.PROPERTY_NATS_TRUSTSTORE_PW; +import static com.ibm.fhir.config.FHIRConfiguration.PROPERTY_REMOTE_INDEX_SERVICE_INSTANCEIDENTIFIER; import static com.ibm.fhir.config.FHIRConfiguration.PROPERTY_REMOTE_INDEX_SERVICE_TYPE; import static com.ibm.fhir.config.FHIRConfiguration.PROPERTY_SERVER_REGISTRY_RESOURCE_PROVIDER_ENABLED; import static com.ibm.fhir.config.FHIRConfiguration.PROPERTY_SERVER_RESOLVE_FUNCTION_ENABLED; @@ -231,6 +232,7 @@ public void contextInitialized(ServletContextEvent event) { if (remoteIndexServiceType != null) { if ("kafka".equals(remoteIndexServiceType)) { String topicName = fhirConfig.getStringProperty(PROPERTY_KAFKA_INDEX_SERVICE_TOPICNAME, DEFAULT_KAFKA_INDEX_SERVICE_TOPICNAME); + String instanceIdentifier = fhirConfig.getStringProperty(PROPERTY_REMOTE_INDEX_SERVICE_INSTANCEIDENTIFIER); String mode = fhirConfig.getStringProperty(PROPERTY_KAFKA_INDEX_SERVICE_MODE, "active"); // Gather up the Kafka connection properties for the async index service @@ -247,7 +249,7 @@ public void contextInitialized(ServletContextEvent event) { log.info("Initializing Kafka async indexing service."); FHIRRemoteIndexKafkaService s = new FHIRRemoteIndexKafkaService(); - s.init(new KafkaPropertyAdapter(topicName, kafkaProps, KafkaPropertyAdapter.Mode.valueOf(mode))); + s.init(new KafkaPropertyAdapter(instanceIdentifier, topicName, kafkaProps, KafkaPropertyAdapter.Mode.valueOf(mode))); // Now the service is ready, we can publish it remoteIndexService = s; FHIRRemoteIndexService.setServiceInstance(remoteIndexService);