From ac09b6c3669ed2bdb7029bc94ea8f0eb87cbeb30 Mon Sep 17 00:00:00 2001 From: pbobylev Date: Wed, 24 Jul 2024 18:11:38 +0500 Subject: [PATCH] MODSOURCE-752: draft design --- .../main/java/org/folio/dao/RecordDao.java | 11 +- .../java/org/folio/dao/RecordDaoImpl.java | 122 ++++++++++-------- ...rceStoragePopulateTestMarcRecordsImpl.java | 2 +- .../rest/impl/SourceStorageRecordsImpl.java | 8 +- .../org/folio/services/RecordService.java | 15 ++- .../org/folio/services/RecordServiceImpl.java | 24 ++-- .../RecordDomainEventPublisher.java | 48 +++++++ .../AbstractPostProcessingEventHandler.java | 27 ++-- .../AbstractUpdateModifyEventHandler.java | 8 +- .../services/util/EventHandlingUtil.java | 26 +++- 10 files changed, 198 insertions(+), 93 deletions(-) create mode 100644 mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java index 3aaefd990..73803aab7 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java @@ -2,6 +2,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Function; @@ -189,10 +190,10 @@ Future getMatchedRecordsIdentifiers(MatchField mat * Saves {@link Record} to the db * * @param record Record to save - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with saved Record */ - Future saveRecord(Record record, String tenantId); + Future saveRecord(Record record, Map okapiHeaders); /** * Saves {@link Record} to the db using {@link ReactiveClassicGenericQueryExecutor} @@ -201,7 +202,7 @@ Future getMatchedRecordsIdentifiers(MatchField mat * @param record Record to save * @return future with saved Record */ - Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record); + Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record, Map okapiHeaders); /** * Saves {@link RecordCollection} to the db @@ -216,10 +217,10 @@ Future getMatchedRecordsIdentifiers(MatchField mat * Updates {{@link Record} in the db * * @param record Record to update - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with updated Record */ - Future updateRecord(Record record, String tenantId); + Future updateRecord(Record record, Map okapiHeaders); /** * Increments generation in case a record with the same matchedId exists diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index b50397b67..d7ed8c19f 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -1,5 +1,42 @@ package org.folio.dao; +import static java.lang.String.format; +import static java.util.Collections.emptyList; +import static org.folio.dao.util.AdvisoryLockUtil.acquireLock; +import static org.folio.dao.util.ErrorRecordDaoUtil.ERROR_RECORD_CONTENT; +import static org.folio.dao.util.ParsedRecordDaoUtil.PARSED_RECORD_CONTENT; +import static org.folio.dao.util.RawRecordDaoUtil.RAW_RECORD_CONTENT; +import static org.folio.dao.util.RecordDaoUtil.RECORD_NOT_FOUND_TEMPLATE; +import static org.folio.dao.util.RecordDaoUtil.ensureRecordForeignKeys; +import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalIdNonNull; +import static org.folio.dao.util.RecordDaoUtil.filterRecordByState; +import static org.folio.dao.util.RecordDaoUtil.filterRecordByType; +import static org.folio.dao.util.RecordDaoUtil.getExternalHrid; +import static org.folio.dao.util.RecordDaoUtil.getExternalId; +import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE; +import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE; +import static org.folio.okapi.common.XOkapiHeaders.TENANT; +import static org.folio.rest.jooq.Tables.ERROR_RECORDS_LB; +import static org.folio.rest.jooq.Tables.MARC_RECORDS_LB; +import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING; +import static org.folio.rest.jooq.Tables.RAW_RECORDS_LB; +import static org.folio.rest.jooq.Tables.RECORDS_LB; +import static org.folio.rest.jooq.Tables.SNAPSHOTS_LB; +import static org.folio.rest.jooq.enums.RecordType.MARC_BIB; +import static org.folio.rest.util.QueryParamUtil.toRecordType; +import static org.jooq.impl.DSL.condition; +import static org.jooq.impl.DSL.countDistinct; +import static org.jooq.impl.DSL.exists; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.inline; +import static org.jooq.impl.DSL.max; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.primaryKey; +import static org.jooq.impl.DSL.select; +import static org.jooq.impl.DSL.selectDistinct; +import static org.jooq.impl.DSL.table; +import static org.jooq.impl.DSL.trueCondition; + import com.google.common.collect.Lists; import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; import io.github.jklingsporn.vertx.jooq.shared.internal.QueryResult; @@ -11,6 +48,24 @@ import io.vertx.reactivex.pgclient.PgPool; import io.vertx.reactivex.sqlclient.SqlConnection; import io.vertx.sqlclient.Row; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.ws.rs.BadRequestException; +import javax.ws.rs.NotFoundException; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.expression.BinaryExpression; import net.sf.jsqlparser.expression.Expression; @@ -60,6 +115,7 @@ import org.folio.rest.jooq.tables.records.RecordsLbRecord; import org.folio.rest.jooq.tables.records.SnapshotsLbRecord; import org.folio.services.RecordSearchParameters; +import org.folio.services.domainevent.RecordDomainEventPublisher; import org.folio.services.util.TypeConnection; import org.folio.services.util.parser.ParseFieldsResult; import org.folio.services.util.parser.ParseLeaderResult; @@ -90,50 +146,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.NotFoundException; -import java.sql.Connection; -import java.sql.SQLException; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static java.lang.String.format; -import static java.util.Collections.emptyList; -import static org.folio.dao.util.AdvisoryLockUtil.acquireLock; -import static org.folio.dao.util.ErrorRecordDaoUtil.ERROR_RECORD_CONTENT; -import static org.folio.dao.util.ParsedRecordDaoUtil.PARSED_RECORD_CONTENT; -import static org.folio.dao.util.RawRecordDaoUtil.RAW_RECORD_CONTENT; -import static org.folio.dao.util.RecordDaoUtil.RECORD_NOT_FOUND_TEMPLATE; -import static org.folio.dao.util.RecordDaoUtil.ensureRecordForeignKeys; -import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalIdNonNull; -import static org.folio.dao.util.RecordDaoUtil.filterRecordByState; -import static org.folio.dao.util.RecordDaoUtil.filterRecordByType; -import static org.folio.dao.util.RecordDaoUtil.getExternalHrid; -import static org.folio.dao.util.RecordDaoUtil.getExternalId; -import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE; -import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE; -import static org.folio.rest.jooq.Tables.ERROR_RECORDS_LB; -import static org.folio.rest.jooq.Tables.MARC_RECORDS_LB; -import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING; -import static org.folio.rest.jooq.Tables.RAW_RECORDS_LB; -import static org.folio.rest.jooq.Tables.RECORDS_LB; -import static org.folio.rest.jooq.Tables.SNAPSHOTS_LB; -import static org.folio.rest.jooq.enums.RecordType.MARC_BIB; -import static org.folio.rest.util.QueryParamUtil.toRecordType; -import static org.jooq.impl.DSL.*; - @Component public class RecordDaoImpl implements RecordDao { @@ -216,13 +228,16 @@ public class RecordDaoImpl implements RecordDao { public static final Field MARC_INDEXERS_MARC_ID = field(TABLE_FIELD_TEMPLATE, UUID.class, field(MARC_INDEXERS), field(MARC_ID)); private final PostgresClientFactory postgresClientFactory; + private final RecordDomainEventPublisher recordDomainEventPublisher; @org.springframework.beans.factory.annotation.Value("${srs.record.matching.fallback-query.enable:false}") private boolean enableFallbackQuery; @Autowired - public RecordDaoImpl(final PostgresClientFactory postgresClientFactory) { + public RecordDaoImpl(final PostgresClientFactory postgresClientFactory, + final RecordDomainEventPublisher recordDomainEventPublisher) { this.postgresClientFactory = postgresClientFactory; + this.recordDomainEventPublisher = recordDomainEventPublisher; } @Override @@ -695,15 +710,19 @@ public Future> getRecordByCondition(ReactiveClassicGenericQuery } @Override - public Future saveRecord(Record record, String tenantId) { + public Future saveRecord(Record record, Map okapiHeaders) { + var tenantId = okapiHeaders.get(TENANT); LOG.trace("saveRecord:: Saving {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); - return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record)); + return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record, okapiHeaders)) + .onSuccess(created -> recordDomainEventPublisher.publishRecordCreated(created, okapiHeaders)); } @Override - public Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record) { + public Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record, + Map okapiHeaders) { LOG.trace("saveRecord:: Saving {} record {}", record.getRecordType(), record.getId()); - return insertOrUpdateRecord(txQE, record); + return insertOrUpdateRecord(txQE, record) + .onSuccess(created -> recordDomainEventPublisher.publishRecordCreated(created, okapiHeaders)); } @Override @@ -921,11 +940,12 @@ public Future saveRecords(RecordCollection recordCollectio } @Override - public Future updateRecord(Record record, String tenantId) { + public Future updateRecord(Record record, Map okapiHeaders) { + var tenantId = okapiHeaders.get(TENANT); LOG.trace("updateRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); return getQueryExecutor(tenantId).transaction(txQE -> getRecordById(txQE, record.getId()) .compose(optionalRecord -> optionalRecord - .map(r -> saveRecord(txQE, record)) + .map(r -> saveRecord(txQE, record, okapiHeaders)) .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId())))))); } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStoragePopulateTestMarcRecordsImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStoragePopulateTestMarcRecordsImpl.java index fd8ae01ef..82bf9aa61 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStoragePopulateTestMarcRecordsImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStoragePopulateTestMarcRecordsImpl.java @@ -67,7 +67,7 @@ public void postSourceStoragePopulateTestMarcRecords(TestMarcRecordsCollection e } return record; }) - .forEach(marcRecord -> futures.add(recordService.saveRecord(marcRecord, tenantId))); + .forEach(marcRecord -> futures.add(recordService.saveRecord(marcRecord, okapiHeaders))); GenericCompositeFuture.all(futures).onComplete(result -> { if (result.succeeded()) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageRecordsImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageRecordsImpl.java index 7b68b1c02..15991393f 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageRecordsImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageRecordsImpl.java @@ -53,7 +53,7 @@ public void postSourceStorageRecords(Record entity, Map okapiHea Handler> asyncResultHandler, Context vertxContext) { vertxContext.runOnContext(v -> { try { - recordService.saveRecord(entity, tenantId) + recordService.saveRecord(entity, okapiHeaders) .map((Response) PostSourceStorageRecordsResponse.respond201WithApplicationJson(entity, PostSourceStorageRecordsResponse.headersFor201())) .otherwise(ExceptionHelper::mapExceptionToResponse).onComplete(asyncResultHandler); } catch (Exception e) { @@ -88,7 +88,7 @@ public void putSourceStorageRecordsById(String id, Record entity, Map { try { entity.setId(id); - recordService.updateRecord(entity, tenantId) + recordService.updateRecord(entity, okapiHeaders) .map(updated -> PutSourceStorageRecordsByIdResponse.respond200WithApplicationJson(entity)) .map(Response.class::cast).otherwise(ExceptionHelper::mapExceptionToResponse) .onComplete(asyncResultHandler); @@ -103,7 +103,7 @@ public void putSourceStorageRecordsGenerationById(String matchedId, Record entit Handler> asyncResultHandler, Context vertxContext) { vertxContext.runOnContext(v -> { try { - recordService.updateRecordGeneration(matchedId, entity, tenantId) + recordService.updateRecordGeneration(matchedId, entity, okapiHeaders) .map(updated -> PutSourceStorageRecordsGenerationByIdResponse.respond200WithApplicationJson(entity)) .map(Response.class::cast).otherwise(ExceptionHelper::mapExceptionToResponse) .onComplete(asyncResultHandler); @@ -119,7 +119,7 @@ public void deleteSourceStorageRecordsById(String id, String idType, Map> asyncResultHandler, Context vertxContext) { vertxContext.runOnContext(v -> { try { - recordService.deleteRecordById(id, toExternalIdType(idType), tenantId).map(r -> true) + recordService.deleteRecordById(id, toExternalIdType(idType), okapiHeaders).map(r -> true) .map(updated -> DeleteSourceStorageRecordsByIdResponse.respond204()).map(Response.class::cast) .otherwise(ExceptionHelper::mapExceptionToResponse).onComplete(asyncResultHandler); } catch (Exception e) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java index f97b0bef8..5bbe2e131 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java @@ -2,6 +2,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import io.vertx.sqlclient.Row; @@ -68,10 +69,10 @@ public interface RecordService { * Saves record * * @param record record to save - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with saved Record */ - Future saveRecord(Record record, String tenantId); + Future saveRecord(Record record, Map okapiHeaders); /** * Saves collection of records @@ -86,20 +87,20 @@ public interface RecordService { * Updates record with given id * * @param record record to update - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with updated Record */ - Future updateRecord(Record record, String tenantId); + Future updateRecord(Record record, Map okapiHeaders); /** * Updates record generation with given matched id * * @param matchedId matched id * @param record record to update - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with updated Record generation */ - Future updateRecordGeneration(String matchedId, Record record, String tenantId); + Future updateRecordGeneration(String matchedId, Record record, Map okapiHeaders); /** * Searches for {@link SourceRecord} by {@link Condition} and ordered by order fields with offset and limit @@ -267,5 +268,5 @@ public interface RecordService { */ Future updateRecordsState(String matchedId, RecordState state, RecordType recordType, String tenantId); - Future deleteRecordById(String id, IdType idType, String tenantId); + Future deleteRecordById(String id, IdType idType, Map okapiHeaders); } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java index b47400c15..0d8daae1b 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java @@ -13,6 +13,7 @@ import static org.folio.dao.util.RecordDaoUtil.getExternalIdsCondition; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE; +import static org.folio.okapi.common.XOkapiHeaders.TENANT; import static org.folio.rest.util.QueryParamUtil.toRecordType; import static org.folio.services.util.AdditionalFieldsUtil.TAG_999; import static org.folio.services.util.AdditionalFieldsUtil.addFieldToMarcRecord; @@ -23,6 +24,7 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; @@ -120,7 +122,8 @@ public Future> getRecordById(String id, String tenantId) { } @Override - public Future saveRecord(Record record, String tenantId) { + public Future saveRecord(Record record, Map okapiHeaders) { + var tenantId = okapiHeaders.get(TENANT); LOG.debug("saveRecord:: Saving record with id: {} for tenant: {}", record.getId(), tenantId); ensureRecordHasId(record); ensureRecordHasSuppressDiscovery(record); @@ -145,9 +148,9 @@ public Future saveRecord(Record record, String tenantId) { return recordDao.getRecordByMatchedId(txQE, record.getMatchedId()) .compose(optionalMatchedRecord -> optionalMatchedRecord .map(matchedRecord -> recordDao.saveUpdatedRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), matchedRecord.withState(Record.State.OLD))) - .orElseGet(() -> recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))))); + .orElseGet(() -> recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), okapiHeaders))); } else { - return recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))); + return recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), okapiHeaders); } }), tenantId) .recover(RecordServiceImpl::mapToDuplicateExceptionIfNeeded); @@ -170,21 +173,21 @@ public Future saveRecords(RecordCollection recordCollectio } @Override - public Future updateRecord(Record record, String tenantId) { - return recordDao.updateRecord(ensureRecordForeignKeys(record), tenantId); + public Future updateRecord(Record record, Map okapiHeaders) { + return recordDao.updateRecord(ensureRecordForeignKeys(record), okapiHeaders); } @Override - public Future updateRecordGeneration(String matchedId, Record record, String tenantId) { + public Future updateRecordGeneration(String matchedId, Record record, Map okapiHeaders) { String marcField999s = getFieldFromMarcRecord(record, TAG_999, INDICATOR, INDICATOR, SUBFIELD_S); if (!matchedId.equals(marcField999s)) { return Future.failedFuture(new BadRequestException(format(MATCHED_ID_NOT_EQUAL_TO_999_FIELD, matchedId, marcField999s))); } record.setId(UUID.randomUUID().toString()); - return recordDao.getRecordByMatchedId(matchedId, tenantId) + return recordDao.getRecordByMatchedId(matchedId, okapiHeaders.get(TENANT)) .map(r -> r.orElseThrow(() -> new NotFoundException(format(RECORD_WITH_GIVEN_MATCHED_ID_NOT_FOUND, matchedId)))) - .compose(v -> saveRecord(record, tenantId)) + .compose(v -> saveRecord(record, okapiHeaders)) .recover(throwable -> { if (throwable instanceof DuplicateRecordException) { return Future.failedFuture(new BadRequestException(UPDATE_RECORD_DUPLICATE_EXCEPTION)); @@ -338,7 +341,8 @@ public Future getMatchedRecordsIdentifiers(RecordM } @Override - public Future deleteRecordById(String id, IdType idType, String tenantId) { + public Future deleteRecordById(String id, IdType idType, Map okapiHeaders) { + var tenantId = okapiHeaders.get(TENANT); return recordDao.getRecordByExternalId(id, idType, tenantId) .map(recordOptional -> recordOptional.orElseThrow(() -> new NotFoundException(format(NOT_FOUND_MESSAGE, Record.class.getSimpleName(), id)))) .map(record -> { @@ -347,7 +351,7 @@ public Future deleteRecordById(String id, IdType idType, String tenantId) ParsedRecordDaoUtil.updateLeaderStatus(record.getParsedRecord(), DELETED_LEADER_RECORD_STATUS); return record; }) - .compose(record -> updateRecord(record, tenantId)).map(r -> null); + .compose(record -> updateRecord(record, okapiHeaders)).map(r -> null); } private Future setMatchedIdForRecord(Record record, String tenantId) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java b/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java new file mode 100644 index 000000000..cc1cf072b --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java @@ -0,0 +1,48 @@ +package org.folio.services.domainevent; + +import static org.folio.okapi.common.XOkapiHeaders.TENANT; +import static org.folio.okapi.common.XOkapiHeaders.TOKEN; +import static org.folio.okapi.common.XOkapiHeaders.URL; +import static org.folio.services.util.EventHandlingUtil.sendEventToKafka; + +import io.vertx.core.Vertx; +import io.vertx.core.json.Json; +import io.vertx.kafka.client.producer.KafkaHeader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.folio.kafka.KafkaConfig; +import org.folio.rest.jaxrs.model.Record; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class RecordDomainEventPublisher { + + public static final String RECORD_DOMAIN_TOPIC = "srs.source_records"; + public static final String SOURCE_RECORD_CREATED = "SOURCE_RECORD_CREATED"; + public static final String SOURCE_RECORD_UPDATED = "SOURCE_RECORD_UPDATED"; + private static final String RECORD_TYPE = "folio.srs.recordType"; + + @Autowired + private KafkaConfig kafkaConfig; + + public void publishRecordCreated(Record created, Map okapiHeaders) { + Vertx.vertx().executeBlocking(() -> { + var kafkaHeaders = getKafkaHeaders(okapiHeaders, created.getRecordType()); + var key = created.getId(); + return sendEventToKafka(okapiHeaders.get(TENANT), Json.encode(created), SOURCE_RECORD_CREATED, kafkaHeaders, + kafkaConfig, key); + }); + } + + private List getKafkaHeaders(Map okapiHeaders, Record.RecordType recordType) { + return new ArrayList<>(List.of( + KafkaHeader.header(URL, okapiHeaders.get(URL)), + KafkaHeader.header(TENANT, okapiHeaders.get(TENANT)), + KafkaHeader.header(TOKEN, okapiHeaders.get(TOKEN)), + KafkaHeader.header(RECORD_TYPE, recordType.value())) + ); + } + +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/AbstractPostProcessingEventHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/AbstractPostProcessingEventHandler.java index 6c550f296..ece429973 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/AbstractPostProcessingEventHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/AbstractPostProcessingEventHandler.java @@ -6,6 +6,7 @@ import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.producer.KafkaHeader; +import java.util.Map; import org.apache.commons.lang3.tuple.Pair; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,6 +29,7 @@ import org.folio.services.SnapshotService; import org.folio.services.caches.MappingParametersSnapshotCache; import org.folio.services.exceptions.PostProcessingException; +import org.folio.services.util.EventHandlingUtil; import org.folio.services.util.TypeConnection; import org.jooq.Condition; @@ -43,6 +45,7 @@ import static org.folio.dao.util.MarcUtil.reorderMarcRecordFields; import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalId; import static org.folio.dao.util.RecordDaoUtil.filterRecordByNotSnapshotId; +import static org.folio.okapi.common.XOkapiHeaders.TENANT; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED_READY_FOR_POST_PROCESSING; import static org.folio.rest.jaxrs.model.ProfileType.MAPPING_PROFILE; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER; @@ -57,6 +60,7 @@ import static org.folio.services.util.AdditionalFieldsUtil.remove035WithActualHrId; import static org.folio.services.util.AdditionalFieldsUtil.updateLatestTransactionDate; import static org.folio.services.util.EventHandlingUtil.sendEventToKafka; +import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders; import static org.folio.services.util.RestUtil.retrieveOkapiConnectionParams; public abstract class AbstractPostProcessingEventHandler implements EventHandler { @@ -108,7 +112,7 @@ public CompletableFuture handle(DataImportEventPayload d if (centralTenantOperationExists(dataImportEventPayload)) { return saveRecordForCentralTenant(dataImportEventPayload, record, jobExecutionId); } - return saveRecord(record, dataImportEventPayload.getTenant()); + return saveRecord(record, toOkapiHeaders(dataImportEventPayload)); }) .onSuccess(record -> { sendReplyEvent(dataImportEventPayload, record); @@ -247,17 +251,17 @@ private void setSuppressFormDiscovery(Record record, boolean suppressFromDiscove } } - private Future updatePreviousRecordsState(String externalId, String snapshotId, String tenantId) { + private Future updatePreviousRecordsState(String externalId, String snapshotId, Map okapiHeaders) { Condition condition = filterRecordByNotSnapshotId(snapshotId) .and(filterRecordByExternalId(externalId)); - return recordService.getRecords(condition, getDbType(), new ArrayList<>(), 0, 999, tenantId) + return recordService.getRecords(condition, getDbType(), new ArrayList<>(), 0, 999, okapiHeaders.get(TENANT)) .compose(recordCollection -> { Promise result = Promise.promise(); @SuppressWarnings("squid:S3740") List> futures = new ArrayList<>(); recordCollection.getRecords() - .forEach(record -> futures.add(recordService.updateRecord(record.withState(Record.State.OLD), tenantId))); + .forEach(record -> futures.add(recordService.updateRecord(record.withState(Record.State.OLD), okapiHeaders))); GenericCompositeFuture.all(futures).onComplete(ar -> { if (ar.succeeded()) { result.complete(); @@ -308,21 +312,22 @@ private void executeHridManipulation(Record record, JsonObject externalEntity) { * Updates specific record. If it doesn't exist - then just save it. * * @param record - target record - * @param tenantId - tenantId + * @param okapiHeaders - okapi headers * @return - Future with Record result */ - private Future saveRecord(Record record, String tenantId) { + private Future saveRecord(Record record, Map okapiHeaders) { + var tenantId = okapiHeaders.get(TENANT); return recordService.getRecordById(record.getId(), tenantId) .compose(r -> { if (r.isPresent()) { return recordService.updateParsedRecord(record, tenantId).map(record.withGeneration(r.get().getGeneration())); } else { record.getRawRecord().setId(record.getId()); - return recordService.saveRecord(record, tenantId).map(record); + return recordService.saveRecord(record, okapiHeaders).map(record); } }) .compose(updatedRecord -> - updatePreviousRecordsState(getExternalId(updatedRecord), updatedRecord.getSnapshotId(), tenantId) + updatePreviousRecordsState(getExternalId(updatedRecord), updatedRecord.getSnapshotId(), okapiHeaders) .map(updatedRecord) ); } @@ -348,12 +353,14 @@ private Future saveRecordForCentralTenant(DataImportEventPayload dataImp String centralTenantId = dataImportEventPayload.getContext().get(CENTRAL_TENANT_ID); dataImportEventPayload.getContext().remove(CENTRAL_TENANT_INSTANCE_UPDATED_FLAG); LOG.info("handle:: Processing AbstractPostProcessingEventHandler - saving record by jobExecutionId: {} for the central tenantId: {}", jobExecutionId, centralTenantId); + var okapiHeaders = toOkapiHeaders(dataImportEventPayload); if (centralTenantId != null) { + okapiHeaders.put(TENANT, centralTenantId); return snapshotService.copySnapshotToOtherTenant(record.getSnapshotId(), dataImportEventPayload.getTenant(), centralTenantId) - .compose(f -> saveRecord(record, centralTenantId)); + .compose(f -> saveRecord(record, okapiHeaders)); } else { - return saveRecord(record, dataImportEventPayload.getTenant()); + return saveRecord(record, okapiHeaders); } } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/actions/AbstractUpdateModifyEventHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/actions/AbstractUpdateModifyEventHandler.java index fa37481c7..5b6f38a36 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/actions/AbstractUpdateModifyEventHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/actions/AbstractUpdateModifyEventHandler.java @@ -38,6 +38,7 @@ import static java.util.Objects.nonNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.folio.ActionProfile.Action.UPDATE; +import static org.folio.okapi.common.XOkapiHeaders.TENANT; import static org.folio.rest.jaxrs.model.ProfileType.ACTION_PROFILE; import static org.folio.services.handlers.match.AbstractMarcMatchEventHandler.CENTRAL_TENANT_ID; import static org.folio.services.util.AdditionalFieldsUtil.HR_ID_FROM_FIELD; @@ -48,6 +49,7 @@ import static org.folio.services.util.AdditionalFieldsUtil.remove003FieldIfNeeded; import static org.folio.services.util.AdditionalFieldsUtil.remove035WithActualHrId; import static org.folio.services.util.AdditionalFieldsUtil.updateLatestTransactionDate; +import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders; public abstract class AbstractUpdateModifyEventHandler implements EventHandler { @@ -118,11 +120,13 @@ public CompletableFuture handle(DataImportEventPayload p ) .compose(changedRecord -> { String centralTenantId = payload.getContext().get(CENTRAL_TENANT_ID); + var okapiHeaders = toOkapiHeaders(payload); if (centralTenantId != null) { + okapiHeaders.put(TENANT, centralTenantId); return snapshotService.copySnapshotToOtherTenant(changedRecord.getSnapshotId(), payload.getTenant(), centralTenantId) - .compose(snapshot -> recordService.saveRecord(changedRecord, centralTenantId)); + .compose(snapshot -> recordService.saveRecord(changedRecord, okapiHeaders)); } - return recordService.saveRecord(changedRecord, payload.getTenant()); + return recordService.saveRecord(changedRecord, okapiHeaders); }) .onSuccess(savedRecord -> submitSuccessfulEventType(payload, future, marcMappingOption)) .onFailure(throwable -> { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java index e2a4bde97..af8d46b79 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java @@ -1,13 +1,25 @@ package org.folio.services.util; +import static org.folio.okapi.common.XOkapiHeaders.TENANT; +import static org.folio.okapi.common.XOkapiHeaders.URL; +import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER; +import static org.folio.services.domainevent.RecordDomainEventPublisher.RECORD_DOMAIN_TOPIC; +import static org.folio.services.domainevent.RecordDomainEventPublisher.SOURCE_RECORD_CREATED; +import static org.folio.services.domainevent.RecordDomainEventPublisher.SOURCE_RECORD_UPDATED; + import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducer; import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.folio.DataImportEventPayload; import org.folio.kafka.KafkaConfig; import org.folio.kafka.KafkaTopicNameHelper; import org.folio.kafka.SimpleKafkaProducerManager; @@ -17,9 +29,6 @@ import org.folio.rest.jaxrs.model.EventMetadata; import org.folio.rest.tools.utils.ModuleName; -import java.util.List; -import java.util.UUID; - public final class EventHandlingUtil { private static final Logger LOGGER = LogManager.getLogger(); @@ -96,6 +105,9 @@ public static String constructModuleName() { } public static String createTopicName(String eventType, String tenantId, KafkaConfig kafkaConfig) { + if (SOURCE_RECORD_CREATED.equals(eventType) || SOURCE_RECORD_UPDATED.equals(eventType)) { + return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), tenantId, RECORD_DOMAIN_TOPIC); + } return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), KafkaTopicNameHelper.getDefaultNameSpace(), tenantId, eventType); } @@ -108,6 +120,14 @@ public static KafkaProducer createProducer(String eventType, Kaf return new SimpleKafkaProducerManager(Vertx.currentContext().owner(), kafkaConfig).createShared(eventType); } + public static Map toOkapiHeaders(DataImportEventPayload eventPayload) { + var okapiHeaders = new HashMap(); + okapiHeaders.put(URL, eventPayload.getOkapiUrl()); + okapiHeaders.put(TENANT, eventPayload.getTenant()); + okapiHeaders.put(OKAPI_TOKEN_HEADER, eventPayload.getToken()); + return okapiHeaders; + } + private static String extractRecordId(List kafkaHeaders) { return kafkaHeaders.stream() .filter(header -> header.key().equals(RECORD_ID_HEADER))