Skip to content

Commit

Permalink
issue #3437 add instanceIdentifier to remote index messages and other…
Browse files Browse the repository at this point in the history
… review fixes

Signed-off-by: Robin Arnold <robin.arnold@ibm.com>
  • Loading branch information
punktilious committed Jun 20, 2022
1 parent 5aa9325 commit c446a15
Show file tree
Hide file tree
Showing 22 changed files with 343 additions and 88 deletions.
3 changes: 3 additions & 0 deletions docs/src/pages/guides/FHIRServerUsersGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,7 @@ This section contains reference information about each of the configuration prop
|`fhirServer/operations/membermatch/extendedProps`|object|The extended options for the extended member match implementation|
|`fhirServer/operations/everything/includeTypes`|list|The list of related resource types to include alongside the patient compartment resource types. Instances of these resource types will only be returned when they are referenced from one or more resource instances from the target patient compartment. Example values are like `Location`, `Medication`, `Organization`, and `Practitioner`|
|`fhirServer/remoteIndexService/type`|string| The type of service used to send remote index messages. Only `kafka` is currently supported|
|`fhirServer/remoteIndexService/instanceIdentifier`|string| A UUID or other identifier unique to this cluster of IBM FHIR Servers |
|`fhirServer/remoteIndexService/kafka/mode`|string| Current operation mode of the service. Specify `ACTIVE` to use the service|
|`fhirServer/remoteIndexService/kafka/topicName`|string| The Kafka topic name. Typically `FHIR_REMOTE_INDEX` |
|`fhirServer/remoteIndexService/kafka/connectionProperties/bootstrap.servers`|string| Bootstrap servers for the Kafka service |
Expand Down Expand Up @@ -2421,6 +2422,7 @@ This section contains reference information about each of the configuration prop
|`fhirServer/operations/membermatch/extendedProps`|empty|
|`fhirServer/operations/everything/includeTypes`|null|
|`fhirServer/remoteIndexService/type`|null|
|`fhirServer/remoteIndexService/instanceIdentifier`|null|
|`fhirServer/remoteIndexService/kafka/mode`|null|
|`fhirServer/remoteIndexService/kafka/topicName`|null|
|`fhirServer/remoteIndexService/kafka/connectionProperties/bootstrap.servers`|null|
Expand Down Expand Up @@ -2613,6 +2615,7 @@ Cases where that behavior is not supported are marked below with an `N` in the `
|`fhirServer/operations/membermatch/extendedProps`|Y|Y|Y|
|`fhirServer/operations/everything/includeTypes`|Y|Y|N|
|`fhirServer/remoteIndexService/type`|N|N|N|
|`fhirServer/remoteIndexService/instanceIdentifier`|N|N|N|
|`fhirServer/remoteIndexService/kafka/mode`|N|N|N|
|`fhirServer/remoteIndexService/kafka/topicName`|N|N|N|
|`fhirServer/remoteIndexService/kafka/connectionProperties/bootstrap.servers`|N|N|N|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class FHIRConfiguration {

// Configuration properties for the Kafka-based async index service
public static final String PROPERTY_REMOTE_INDEX_SERVICE_TYPE = "fhirServer/remoteIndexService/type";
public static final String PROPERTY_REMOTE_INDEX_SERVICE_INSTANCEIDENTIFIER = "fhirServer/remoteIndexService/instanceIdentifier";
public static final String PROPERTY_KAFKA_INDEX_SERVICE_TOPICNAME = "fhirServer/remoteIndexService/kafka/topicName";
public static final String PROPERTY_KAFKA_INDEX_SERVICE_CONNECTIONPROPS = "fhirServer/remoteIndexService/kafka/connectionProperties";
public static final String PROPERTY_KAFKA_INDEX_SERVICE_MODE = "fhirServer/remoteIndexService/kafka/mode";
Expand Down
52 changes: 52 additions & 0 deletions fhir-core/src/main/java/com/ibm/fhir/core/util/LogSupport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.core.util;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Static support functions related to logging
*/
public class LogSupport {
private static final String MASK = "*****";
private static final Pattern PASSWORD_EQ_PATTERN = Pattern.compile("[^\"]password[= ]*\"([^\"]*)\"", Pattern.CASE_INSENSITIVE);
private static final Pattern PASSWORD_PATTERN = Pattern.compile("\"password\"[: ]*\"([^\"]*)\"", Pattern.CASE_INSENSITIVE);

/**
* Hide any text in quotes following the token "password" to avoid writing secrets to log files
* @param input
* @return
*/
public static String hidePassword(String input) {
String result = hidePassword(input, PASSWORD_EQ_PATTERN);
result = hidePassword(result, PASSWORD_PATTERN);
return result;
}

/**
* Replace any text matching the given pattern with the MASK value
* @param input
* @param pattern
* @return
*/
private static String hidePassword(String input, Pattern pattern) {
final Matcher m = pattern.matcher(input);
final StringBuffer result = new StringBuffer();
while (m.find()) {
final String match = m.group();
final int start = m.start();
m.appendReplacement(result,
match.substring(0,
m.start(1) - start)
+ MASK
+ match.substring(m.end(1) - start, m.end() - start));
}
m.appendTail(result);
return result.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.core.util.test;

import static org.testng.Assert.assertEquals;

import org.testng.annotations.Test;

import com.ibm.fhir.core.util.LogSupport;

/**
* Unit tests for {@link LogSupport} methods
*/
public class LogSupportTest {

@Test
public void testPassReplaceEquals() {
assertEquals(LogSupport.hidePassword("something password=\"change-password\" something else"), "something password=\"*****\" something else");
}

@Test
public void testPassReplaceEqualsWithSpace() {
assertEquals(LogSupport.hidePassword("something password = \"change-password\" something else"), "something password = \"*****\" something else");
}

@Test
public void testPassReplaceJson() {
assertEquals(LogSupport.hidePassword("something \"password\": \"change-password\" something else"), "something \"password\": \"*****\" something else");
}

@Test
public void testPassReplaceJsonCompact() {
assertEquals(LogSupport.hidePassword("something \"password\":\"change-password\" something else"), "something \"password\":\"*****\" something else");
}

@Test
public void testPassReplaceMixed() {
final String src = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"change-password-\";";
final String tgt = src.replace("change-password-", "*****");
assertEquals(LogSupport.hidePassword(src), tgt);
}
}
Binary file modified fhir-persistence-schema/docs/physical_schema_V0027.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions fhir-persistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
<artifactId>fhir-search</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>fhir-model</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.persistence.helper;

import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.logging.Logger;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializer;
import com.ibm.fhir.persistence.index.RemoteIndexMessage;

/**
* Utility methods supporting the fhir-remote-index consumer
*/
public class RemoteIndexSupport {
private static final Logger logger = Logger.getLogger(RemoteIndexSupport.class.getName());
private static final DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;

/**
* Get an instance of Gson configured to support serialization/deserialization of
* remote index messages (sent through Kafka as strings)
* @return
*/
public static Gson getGson() {
Gson gson = new GsonBuilder()
.registerTypeAdapter(Instant.class, (JsonSerializer<Instant>) (value, type, context) ->
new JsonPrimitive(formatter.format(value))
)
.registerTypeAdapter(Instant.class, (JsonDeserializer<Instant>) (jsonElement, type, context) ->
formatter.parse(jsonElement.getAsString(), Instant::from)
)
.create();

return gson;
}

/**
* Unmarshall the JSON payload parameter as a RemoteIndexMessage
* @param jsonPayload
* @return
*/
public static RemoteIndexMessage unmarshall(String jsonPayload) {
try {
Gson gson = getGson();
return gson.fromJson(jsonPayload, RemoteIndexMessage.class);
} catch (Throwable t) {
// We need to sink this error to avoid poison messages from
// blocking the queues.
// TODO. Perhaps push this to a dedicated error topic
logger.severe("Not a RemoteIndexMessage. Ignoring: '" + jsonPayload + "'");
}
return null;

}

/**
* Marshall the RemoteIndexMessage to a JSON string
* @param message
* @return
*/
public static String marshallToString(RemoteIndexMessage message) {
Gson gson = getGson();
return gson.toJson(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class RemoteIndexMessage {
private String tenantId;
private int messageVersion;
private String instanceIdentifier;
private SearchParametersTransport data;

@Override
Expand Down Expand Up @@ -65,5 +66,19 @@ public int getMessageVersion() {
public void setMessageVersion(int messageVersion) {
this.messageVersion = messageVersion;
}

/**
* @return the instanceIdentifier
*/
public String getInstanceIdentifier() {
return instanceIdentifier;
}

/**
* @param instanceIdentifier the instanceIdentifier to set
*/
public void setInstanceIdentifier(String instanceIdentifier) {
this.instanceIdentifier = instanceIdentifier;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.ibm.fhir.persistence.index;

import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -563,19 +564,28 @@ public void setParameterHash(String parameterHash) {
}

/**
* @return the lastUpdated
* @return the lastUpdated (UTC)
*/
public Instant getLastUpdated() {
return lastUpdated;
}

/**
* @param lastUpdated the lastUpdated to set
* @param lastUpdated the lastUpdated to set.
*/
public void setLastUpdated(Instant lastUpdated) {
this.lastUpdated = lastUpdated;
}

/**
* Convenience function to get the lastUpdated time as an Instant. All our times are
* always UTC.
* @return
*/
public Instant getLastUpdatedInstant() {
return Instant.from(lastUpdated.atOffset(ZoneOffset.UTC));
}

/**
* @return the refValues
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.remote.index;
package com.ibm.fhir.persistence.helper;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -67,16 +67,16 @@ public void testRoundtrip() throws Exception {
adapter.tokenValue("token-param", valueSystem, valueCode, compositeId);

sent.setData(adapter.build());
final String payload = marshallToString(sent);
final String payload = RemoteIndexSupport.marshallToString(sent);
// Now unmarshall the payload and check everything matches
RemoteIndexMessage rcvd = unmarshallPayload(payload);
RemoteIndexMessage rcvd = RemoteIndexSupport.unmarshall(payload);
assertNotNull(rcvd);
assertEquals(rcvd.getMessageVersion(), RemoteIndexConstants.MESSAGE_VERSION);

SearchParametersTransport data = rcvd.getData();
assertNotNull(data);
assertEquals(data.getParameterHash(), parameterHash);
assertEquals(data.getLastUpdated(), lastUpdated);
assertEquals(data.getLastUpdatedInstant(), lastUpdated);
assertEquals(data.getLogicalResourceId(), logicalResourceId);
assertEquals(data.getResourceType(), resourceType);
assertEquals(data.getLogicalId(), logicalId);
Expand Down Expand Up @@ -115,17 +115,14 @@ public void testRoundtrip() throws Exception {
assertEquals(data.getTokenValues().get(0).getValueCode(), valueCode);
}

/**
* Marshall the message to a string
* @param message
* @return
*/
private String marshallToString(RemoteIndexMessage message) {
final Gson gson = new Gson();
return gson.toJson(message);
}
private RemoteIndexMessage unmarshallPayload(String jsonPayload) throws Exception {
Gson gson = new Gson();
return gson.fromJson(jsonPayload, RemoteIndexMessage.class);
@Test
public void testInstant() {
Gson gson = RemoteIndexSupport.getGson();
Instant x = Instant.now();
String value = gson.toJson(x);

// now try and convert the other way
Instant x2 = gson.fromJson(value, Instant.class);
assertEquals(x, x2);
}
}
7 changes: 5 additions & 2 deletions fhir-remote-index/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ To enable remote indexing of search parameters, add the following `remoteIndexSe
...
"remoteIndexService": {
"type": "kafka",
"instanceIdenfier": "a-random-uuid-value",
"kafka": {
"mode": "ACTIVE",
"topicName": "FHIR_REMOTE_INDEX",
Expand Down Expand Up @@ -78,7 +79,8 @@ java -Djava.util.logging.config.file=logging.properties \
--database-properties database.properties \
--kafka-properties kafka.properties \
--topic-name FHIR_REMOTE_INDEX \
--consumer-count 3
--consumer-count 3 \
--instance-identifier "a-random-uuid-value"
```

Logging uses standard `java.util.logging` (JUL) and can be configured as follows:
Expand Down Expand Up @@ -157,6 +159,7 @@ Note: Citus configuration is the same as PostgreSQL.
| --db-type {type} | The type of database. One of `postgresql`, `derby`, `db2` or `citus`. |
| --database-properties {properties-file} | A Java properties file containing connection details for the downstream IBM FHIR Server database. |
| --topic-name {topic} | The name of the Kafka topic to consume. Default `FHIR_REMOTE_INDEX`. |
| --instance-identifier {uuid} | Each IBM FHIR Server cluster should be allocated a unique instance identifier. This identifier is added to each message sent over Kafka. The consumer will ignore messages unless they include the same instance identifier value. This helps to ensure that messages are processed from only intended sources. |
| --consumer-group {grp} | Override the default Kafka consumer group (`group.id` value) for this application. Default `remote-index-service-cg`. |
| --schema-type {type} | Set the schema type. One of `PLAIN` or `DISTRIBUTED`. Default is `PLAIN`. The schema type `DISTRIBUTED` is for use with Citus databases. |
| --max-ready-time-ms {milliseconds} | The maximum number of milliseconds to wait for the database to contain the correct data for a particular set of consumed messages. Should be slightly longer than the configured Liberty transaction timeout value. |
Expand All @@ -169,4 +172,4 @@ Because messages are sent to Kafka before the transaction is committed, it is po

1. If the resource version doesn't yet exist in the database, the consumer will pause and wait for the transaction to be committed. The consumer will only wait up to the maximum transaction timeout window, at which point it will assume the transaction has failed and the message will be discarded.
2. If the resource version matches, but the lastUpdated time does not match, it assumes the message came from an IBM FHIR Server which failed before the transaction was committed, but the request was processed successfully by another server. The message will be discarded because there will be another message waiting in the queue from the second attempt.
3. If the resource version in the database already exceeds the version in the message, the message will be discarded because the information is already out of date. There will be another message waiting in the queue containing the search parameter values from the most recent resource.
3. If the resource version in the database already exceeds the version in the message, the message will be discarded because the information is already out of date. There will be another message waiting in the queue containing the search parameter values from the most recent resource.
Loading

0 comments on commit c446a15

Please sign in to comment.