Skip to content

Commit

Permalink
Added generic event data to audit trail
Browse files Browse the repository at this point in the history
  • Loading branch information
NyashaMuusha committed Feb 28, 2024
1 parent 41d3841 commit cc889c1
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ CREATE TABLE IF NOT EXISTS audit_trail (
interactionID VARCHAR(64),
goldenID VARCHAR(64),
event VARCHAR(256),
score FLOAT DEFAULT -1.0,
linkingRule VARCHAR(14) DEFAULT 'UNMATCHED',
eventData VARCHAR(256),
CONSTRAINT PKEY_AUDIT_TRAIL PRIMARY KEY (id)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.logging.log4j.Logger;
import org.jembi.jempi.AppConfig;
import org.jembi.jempi.shared.models.AuditEvent;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.sql.SQLException;
import java.util.Locale;
Expand All @@ -12,6 +13,7 @@

final class PsqlAuditTrail {
private static final Logger LOGGER = LogManager.getLogger(PsqlAuditTrail.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final PsqlClient psqlClient;

PsqlAuditTrail() {
Expand Down Expand Up @@ -55,17 +57,18 @@ CONSTRAINT PKEY_AUDIT_TRAIL PRIMARY KEY (id)

void addAuditEvent(final AuditEvent event) {
psqlClient.connect(AppConfig.POSTGRESQL_AUDIT_DB);
try (var preparedStatement = psqlClient.prepareStatement(String.format(Locale.ROOT, """
INSERT INTO %s (createdAt, interactionID, goldenID, event, score, linkingRule)
VALUES (?, ?, ?, ?, ?, ?);

try (
var preparedStatement = psqlClient.prepareStatement(String.format(Locale.ROOT, """
INSERT INTO %s (createdAt, interactionID, goldenID, event, eventData)
VALUES (?, ?, ?, ?, ?);
""", PSQL_TABLE_AUDIT_TRAIL)
.stripIndent())) {
preparedStatement.setTimestamp(1, event.createdAt());
preparedStatement.setString(2, event.interactionID());
preparedStatement.setString(3, event.goldenID());
preparedStatement.setString(4, event.event());
preparedStatement.setFloat(5, event.score());
preparedStatement.setString(6, event.linkingRule().name());
preparedStatement.setString(5, event.eventData());
preparedStatement.executeUpdate();
} catch (SQLException e) {
LOGGER.error(e.getLocalizedMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jembi.jempi.shared.models.AuditEvent;
import org.jembi.jempi.shared.models.LinkingRule;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;

import static org.jembi.jempi.shared.models.GlobalConstants.PSQL_TABLE_AUDIT_TRAIL;

Expand Down Expand Up @@ -44,10 +42,8 @@ List<AuditEvent> goldenRecordAuditTrail(final String uid) {
final var interactionID = rs.getString(4);
final var goldenID = rs.getString(5);
final var event = rs.getString(6);
final var score = rs.getFloat(7);
final var linkingRuleName = Optional.ofNullable(rs.getString(8));
final var linkingRule = linkingRuleName.map(LinkingRule::valueOf).orElse(LinkingRule.UNMATCHED);
list.add(new AuditEvent(createdAt, insertedAt, interactionID, goldenID, event, score, linkingRule));
final var eventData = rs.getString(7);
list.add(new AuditEvent(createdAt, insertedAt, interactionID, goldenID, event, eventData));
}
} catch (Exception e) {
LOGGER.error(e);
Expand All @@ -72,10 +68,8 @@ List<AuditEvent> interactionRecordAuditTrail(final String uid) {
final var interactionID = rs.getString(4);
final var goldenID = rs.getString(5);
final var event = rs.getString(6);
final var score = rs.getFloat(7);
final var linkingRuleName = Optional.ofNullable(rs.getString(8));
final var linkingRule = linkingRuleName.map(LinkingRule::valueOf).orElse(LinkingRule.UNMATCHED);
list.add(new AuditEvent(createdAt, insertedAt, interactionID, goldenID, event, score, linkingRule));
final var eventData = rs.getString(7);
list.add(new AuditEvent(createdAt, insertedAt, interactionID, goldenID, event, eventData));
}
} catch (Exception e) {
LOGGER.error(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jembi.jempi.libmpi;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.vavr.control.Either;
import io.vavr.control.Option;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -17,6 +18,8 @@
import java.util.List;
import java.util.Locale;

import static org.jembi.jempi.shared.utils.AppUtils.OBJECT_MAPPER;

public final class LibMPI {

private static final Logger LOGGER = LogManager.getLogger(LibMPI.class);
Expand Down Expand Up @@ -54,26 +57,34 @@ public LibMPI(
client = new LibPostgresql(URL, USR, PSW);
}

private void sendAuditEvent(
final String interactionID,
final String goldenID,
final String event,
final Float score,
final LinkingRule linkingRule) {
topicAuditEvents.produceAsync(goldenID,
new AuditEvent(new Timestamp(System.currentTimeMillis()),
null,
interactionID,
goldenID,
event,
score,
linkingRule),
((metadata, exception) -> {
if (exception != null) {
LOGGER.error(exception.getLocalizedMessage(), exception);
}
}));
private <T> void sendAuditEvent(
final String interactionID,
final String goldenID,
final String event,
final T eventData) {

var serializedEventData = getSerializedEventData(eventData);
topicAuditEvents.produceAsync(goldenID,
new AuditEvent(new Timestamp(System.currentTimeMillis()),
null,
interactionID,
goldenID,
event,
serializedEventData),
(metadata, exception) -> {
if (exception != null) {
LOGGER.error(exception.getMessage(), exception);
}
});
}

private <T> String getSerializedEventData(final T eventData) {
try {
return eventData != null ? OBJECT_MAPPER.writeValueAsString(eventData) : null;
} catch(JsonProcessingException e) {
LOGGER.error("Failed to serialize event data", e);
return null;
}
}

/*
Expand Down Expand Up @@ -240,9 +251,9 @@ public boolean setScore(
final float newScore) {
final var result = client.setScore(interactionID, goldenID, newScore);
if (result) {
sendAuditEvent(interactionID, goldenID, String.format(Locale.ROOT, "score: %.5f -> %.5f", oldScore, newScore), newScore, LinkingRule.UNMATCHED);
sendAuditEvent(interactionID, goldenID, String.format(Locale.ROOT, "score: %.5f -> %.5f", oldScore, newScore), new LinkingAuditDetails(newScore, LinkingRule.UNMATCHED));
} else {
sendAuditEvent(interactionID, goldenID, String.format(Locale.ROOT, "set score error: %.5f -> %.5f", oldScore, newScore), newScore, LinkingRule.UNMATCHED);
sendAuditEvent(interactionID, goldenID, String.format(Locale.ROOT, "set score error: %.5f -> %.5f", oldScore, newScore), new LinkingAuditDetails(newScore, LinkingRule.UNMATCHED));

}
return result;
Expand All @@ -264,11 +275,11 @@ public boolean updateGoldenRecordField(
final String newValue) {
final var result = client.updateGoldenRecordField(goldenId, fieldName, newValue);
if (result) {
sendAuditEvent(interactionId, goldenId, String.format(Locale.ROOT, "%s: '%s' -> '%s'", fieldName, oldValue, newValue), -1.0f, LinkingRule.UNMATCHED);
sendAuditEvent(interactionId, goldenId, String.format(Locale.ROOT, "%s: '%s' -> '%s'", fieldName, oldValue, newValue), null);
} else {
sendAuditEvent(interactionId,
goldenId,
String.format(Locale.ROOT, "%s: error updating '%s' -> '%s'", fieldName, oldValue, newValue), -1.0f, LinkingRule.UNMATCHED);
String.format(Locale.ROOT, "%s: error updating '%s' -> '%s'", fieldName, oldValue, newValue), null);
}
return result;
}
Expand All @@ -285,11 +296,11 @@ public Either<MpiGeneralError, LinkInfo> linkToNewGoldenRecord(
"Interaction -> new GoldenID: old(%s) new(%s) [%f]",
currentGoldenId,
result.get().goldenUID(),
score), score, LinkingRule.UNMATCHED);
score), new LinkingAuditDetails(score, LinkingRule.UNMATCHED));
} else {
sendAuditEvent(interactionId,
currentGoldenId,
String.format(Locale.ROOT, "Interaction -> update GoldenID error: old(%s) [%f]", currentGoldenId, score), score, LinkingRule.UNMATCHED);
String.format(Locale.ROOT, "Interaction -> update GoldenID error: old(%s) [%f]", currentGoldenId, score), new LinkingAuditDetails(score, LinkingRule.UNMATCHED));
}
return result;
}
Expand All @@ -307,26 +318,25 @@ public Either<MpiGeneralError, LinkInfo> updateLink(
"Interaction -> update GoldenID: old(%s) new(%s) [%f]",
goldenID,
newGoldenID,
score), score, LinkingRule.UNMATCHED);
score), new LinkingAuditDetails(score, LinkingRule.UNMATCHED));
} else {
sendAuditEvent(interactionID,
newGoldenID,
String.format(Locale.ROOT,
"Interaction -> update GoldenID error: old(%s) new(%s) [%f]",
goldenID,
newGoldenID,
score), score, LinkingRule.UNMATCHED);
score), new LinkingAuditDetails(score, LinkingRule.UNMATCHED));
}
return result;
return result;
}

public LinkInfo createInteractionAndLinkToExistingGoldenRecord(
final Interaction interaction,
final LibMPIClientInterface.GoldenIdScore goldenIdScore,
final boolean deterministicValidation,
final float probabilisticValidation,
final LinkingRule linkingRule
) {
final LinkingRule linkingRule) {
final var result = client.createInteractionAndLinkToExistingGoldenRecord(interaction, goldenIdScore);
if (result != null) {
sendAuditEvent(result.interactionUID(),
Expand All @@ -336,14 +346,13 @@ public LinkInfo createInteractionAndLinkToExistingGoldenRecord(
+ "Probabilistic(%.3f)",
result.score(),
deterministicValidation,
probabilisticValidation), result.score(),
linkingRule);
probabilisticValidation), new LinkingAuditDetails(result.score(), linkingRule));
} else {
sendAuditEvent(interaction.interactionId(),
goldenIdScore.goldenId(),
String.format(Locale.ROOT,
"Interaction -> error linking to existing GoldenRecord (%.5f)",
goldenIdScore.score()), goldenIdScore.score(), LinkingRule.UNMATCHED);
goldenIdScore.score()), new LinkingAuditDetails(goldenIdScore.score(), linkingRule));
}
return result;

Expand All @@ -356,11 +365,11 @@ public LinkInfo createInteractionAndLinkToClonedGoldenRecord(
if (result != null) {
sendAuditEvent(result.interactionUID(),
result.goldenUID(),
String.format(Locale.ROOT, "Interaction -> New GoldenRecord (%f)", score), score, LinkingRule.UNMATCHED);
String.format(Locale.ROOT, "Interaction -> New GoldenRecord (%f)", score), new LinkingAuditDetails(score, LinkingRule.UNMATCHED));
} else {
sendAuditEvent(interaction.interactionId(),
null,
String.format(Locale.ROOT, "Interaction -> error linking to new GoldenRecord (%f)", score), score, LinkingRule.UNMATCHED);
String.format(Locale.ROOT, "Interaction -> error linking to new GoldenRecord (%f)", score), new LinkingAuditDetails(score, LinkingRule.UNMATCHED));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,7 @@ public static ApiAuditTrail fromAuditTrail(final List<AuditEvent> trail) {
x.interactionID(),
x.goldenID(),
x.event(),
new ApiLinkingRule(
String.format("Matched with score %s", x.score()),
x.linkingRule().name())
toApiModel(getDeserializedEventData(x.eventData()))
))
.toList());
}
Expand Down Expand Up @@ -362,4 +360,22 @@ public record ApiLinkingRule(
) {
}

private static LinkingAuditDetails getDeserializedEventData(final String eventData) {
try {
return OBJECT_MAPPER.readValue(eventData, LinkingAuditDetails.class);
} catch (JsonProcessingException e) {
LOGGER.error("Failed to deserialize event data", e);
return null;
}
}

private static ApiLinkingRule toApiModel(final LinkingAuditDetails linkingAuditDetails) {
if (linkingAuditDetails == null) {
return null;
}
return new ApiLinkingRule(
String.format("Matched with score %s", linkingAuditDetails.score()),
linkingAuditDetails.linkingRule().name()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ public record AuditEvent(
String interactionID,
String goldenID,
String event,
Float score,
LinkingRule linkingRule) {
String eventData) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.jembi.jempi.shared.models;

public record LinkingAuditDetails(
float score,
LinkingRule linkingRule
) {
}

0 comments on commit cc889c1

Please sign in to comment.