Skip to content

Commit

Permalink
MODSOURCE-752: draft design
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jul 24, 2024
1 parent b1179d5 commit ac09b6c
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

Expand Down Expand Up @@ -189,10 +190,10 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* Saves {@link Record} to the db
*
* @param record Record to save
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with saved Record
*/
Future<Record> saveRecord(Record record, String tenantId);
Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders);

/**
* Saves {@link Record} to the db using {@link ReactiveClassicGenericQueryExecutor}
Expand All @@ -201,7 +202,7 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* @param record Record to save
* @return future with saved Record
*/
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record);
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record, Map<String, String> okapiHeaders);

/**
* Saves {@link RecordCollection} to the db
Expand All @@ -216,10 +217,10 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* Updates {{@link Record} in the db
*
* @param record Record to update
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with updated Record
*/
Future<Record> updateRecord(Record record, String tenantId);
Future<Record> updateRecord(Record record, Map<String, String> okapiHeaders);

/**
* Increments generation in case a record with the same matchedId exists
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,42 @@
package org.folio.dao;

import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static org.folio.dao.util.AdvisoryLockUtil.acquireLock;
import static org.folio.dao.util.ErrorRecordDaoUtil.ERROR_RECORD_CONTENT;
import static org.folio.dao.util.ParsedRecordDaoUtil.PARSED_RECORD_CONTENT;
import static org.folio.dao.util.RawRecordDaoUtil.RAW_RECORD_CONTENT;
import static org.folio.dao.util.RecordDaoUtil.RECORD_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.RecordDaoUtil.ensureRecordForeignKeys;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalIdNonNull;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByState;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByType;
import static org.folio.dao.util.RecordDaoUtil.getExternalHrid;
import static org.folio.dao.util.RecordDaoUtil.getExternalId;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.rest.jooq.Tables.ERROR_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING;
import static org.folio.rest.jooq.Tables.RAW_RECORDS_LB;
import static org.folio.rest.jooq.Tables.RECORDS_LB;
import static org.folio.rest.jooq.Tables.SNAPSHOTS_LB;
import static org.folio.rest.jooq.enums.RecordType.MARC_BIB;
import static org.folio.rest.util.QueryParamUtil.toRecordType;
import static org.jooq.impl.DSL.condition;
import static org.jooq.impl.DSL.countDistinct;
import static org.jooq.impl.DSL.exists;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.inline;
import static org.jooq.impl.DSL.max;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.primaryKey;
import static org.jooq.impl.DSL.select;
import static org.jooq.impl.DSL.selectDistinct;
import static org.jooq.impl.DSL.table;
import static org.jooq.impl.DSL.trueCondition;

import com.google.common.collect.Lists;
import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor;
import io.github.jklingsporn.vertx.jooq.shared.internal.QueryResult;
Expand All @@ -11,6 +48,24 @@
import io.vertx.reactivex.pgclient.PgPool;
import io.vertx.reactivex.sqlclient.SqlConnection;
import io.vertx.sqlclient.Row;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.NotFoundException;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.BinaryExpression;
import net.sf.jsqlparser.expression.Expression;
Expand Down Expand Up @@ -60,6 +115,7 @@
import org.folio.rest.jooq.tables.records.RecordsLbRecord;
import org.folio.rest.jooq.tables.records.SnapshotsLbRecord;
import org.folio.services.RecordSearchParameters;
import org.folio.services.domainevent.RecordDomainEventPublisher;
import org.folio.services.util.TypeConnection;
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
Expand Down Expand Up @@ -90,50 +146,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.ws.rs.BadRequestException;
import javax.ws.rs.NotFoundException;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static org.folio.dao.util.AdvisoryLockUtil.acquireLock;
import static org.folio.dao.util.ErrorRecordDaoUtil.ERROR_RECORD_CONTENT;
import static org.folio.dao.util.ParsedRecordDaoUtil.PARSED_RECORD_CONTENT;
import static org.folio.dao.util.RawRecordDaoUtil.RAW_RECORD_CONTENT;
import static org.folio.dao.util.RecordDaoUtil.RECORD_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.RecordDaoUtil.ensureRecordForeignKeys;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalIdNonNull;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByState;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByType;
import static org.folio.dao.util.RecordDaoUtil.getExternalHrid;
import static org.folio.dao.util.RecordDaoUtil.getExternalId;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE;
import static org.folio.rest.jooq.Tables.ERROR_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING;
import static org.folio.rest.jooq.Tables.RAW_RECORDS_LB;
import static org.folio.rest.jooq.Tables.RECORDS_LB;
import static org.folio.rest.jooq.Tables.SNAPSHOTS_LB;
import static org.folio.rest.jooq.enums.RecordType.MARC_BIB;
import static org.folio.rest.util.QueryParamUtil.toRecordType;
import static org.jooq.impl.DSL.*;

@Component
public class RecordDaoImpl implements RecordDao {

Expand Down Expand Up @@ -216,13 +228,16 @@ public class RecordDaoImpl implements RecordDao {
public static final Field<UUID> MARC_INDEXERS_MARC_ID = field(TABLE_FIELD_TEMPLATE, UUID.class, field(MARC_INDEXERS), field(MARC_ID));

private final PostgresClientFactory postgresClientFactory;
private final RecordDomainEventPublisher recordDomainEventPublisher;

@org.springframework.beans.factory.annotation.Value("${srs.record.matching.fallback-query.enable:false}")
private boolean enableFallbackQuery;

@Autowired
public RecordDaoImpl(final PostgresClientFactory postgresClientFactory) {
public RecordDaoImpl(final PostgresClientFactory postgresClientFactory,
final RecordDomainEventPublisher recordDomainEventPublisher) {
this.postgresClientFactory = postgresClientFactory;
this.recordDomainEventPublisher = recordDomainEventPublisher;
}

@Override
Expand Down Expand Up @@ -695,15 +710,19 @@ public Future<Optional<Record>> getRecordByCondition(ReactiveClassicGenericQuery
}

@Override
public Future<Record> saveRecord(Record record, String tenantId) {
public Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
LOG.trace("saveRecord:: Saving {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record));
return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record, okapiHeaders))
.onSuccess(created -> recordDomainEventPublisher.publishRecordCreated(created, okapiHeaders));
}

@Override
public Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record) {
public Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record,
Map<String, String> okapiHeaders) {
LOG.trace("saveRecord:: Saving {} record {}", record.getRecordType(), record.getId());
return insertOrUpdateRecord(txQE, record);
return insertOrUpdateRecord(txQE, record)
.onSuccess(created -> recordDomainEventPublisher.publishRecordCreated(created, okapiHeaders));
}

@Override
Expand Down Expand Up @@ -921,11 +940,12 @@ public Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollectio
}

@Override
public Future<Record> updateRecord(Record record, String tenantId) {
public Future<Record> updateRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
LOG.trace("updateRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> getRecordById(txQE, record.getId())
.compose(optionalRecord -> optionalRecord
.map(r -> saveRecord(txQE, record))
.map(r -> saveRecord(txQE, record, okapiHeaders))
.orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId()))))));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void postSourceStoragePopulateTestMarcRecords(TestMarcRecordsCollection e
}
return record;
})
.forEach(marcRecord -> futures.add(recordService.saveRecord(marcRecord, tenantId)));
.forEach(marcRecord -> futures.add(recordService.saveRecord(marcRecord, okapiHeaders)));

GenericCompositeFuture.all(futures).onComplete(result -> {
if (result.succeeded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void postSourceStorageRecords(Record entity, Map<String, String> okapiHea
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
try {
recordService.saveRecord(entity, tenantId)
recordService.saveRecord(entity, okapiHeaders)
.map((Response) PostSourceStorageRecordsResponse.respond201WithApplicationJson(entity, PostSourceStorageRecordsResponse.headersFor201()))
.otherwise(ExceptionHelper::mapExceptionToResponse).onComplete(asyncResultHandler);
} catch (Exception e) {
Expand Down Expand Up @@ -88,7 +88,7 @@ public void putSourceStorageRecordsById(String id, Record entity, Map<String, St
vertxContext.runOnContext(v -> {
try {
entity.setId(id);
recordService.updateRecord(entity, tenantId)
recordService.updateRecord(entity, okapiHeaders)
.map(updated -> PutSourceStorageRecordsByIdResponse.respond200WithApplicationJson(entity))
.map(Response.class::cast).otherwise(ExceptionHelper::mapExceptionToResponse)
.onComplete(asyncResultHandler);
Expand All @@ -103,7 +103,7 @@ public void putSourceStorageRecordsGenerationById(String matchedId, Record entit
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
try {
recordService.updateRecordGeneration(matchedId, entity, tenantId)
recordService.updateRecordGeneration(matchedId, entity, okapiHeaders)
.map(updated -> PutSourceStorageRecordsGenerationByIdResponse.respond200WithApplicationJson(entity))
.map(Response.class::cast).otherwise(ExceptionHelper::mapExceptionToResponse)
.onComplete(asyncResultHandler);
Expand All @@ -119,7 +119,7 @@ public void deleteSourceStorageRecordsById(String id, String idType, Map<String,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
try {
recordService.deleteRecordById(id, toExternalIdType(idType), tenantId).map(r -> true)
recordService.deleteRecordById(id, toExternalIdType(idType), okapiHeaders).map(r -> true)
.map(updated -> DeleteSourceStorageRecordsByIdResponse.respond204()).map(Response.class::cast)
.otherwise(ExceptionHelper::mapExceptionToResponse).onComplete(asyncResultHandler);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import io.vertx.sqlclient.Row;
Expand Down Expand Up @@ -68,10 +69,10 @@ public interface RecordService {
* Saves record
*
* @param record record to save
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with saved Record
*/
Future<Record> saveRecord(Record record, String tenantId);
Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders);

/**
* Saves collection of records
Expand All @@ -86,20 +87,20 @@ public interface RecordService {
* Updates record with given id
*
* @param record record to update
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with updated Record
*/
Future<Record> updateRecord(Record record, String tenantId);
Future<Record> updateRecord(Record record, Map<String, String> okapiHeaders);

/**
* Updates record generation with given matched id
*
* @param matchedId matched id
* @param record record to update
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with updated Record generation
*/
Future<Record> updateRecordGeneration(String matchedId, Record record, String tenantId);
Future<Record> updateRecordGeneration(String matchedId, Record record, Map<String, String> okapiHeaders);

/**
* Searches for {@link SourceRecord} by {@link Condition} and ordered by order fields with offset and limit
Expand Down Expand Up @@ -267,5 +268,5 @@ public interface RecordService {
*/
Future<Void> updateRecordsState(String matchedId, RecordState state, RecordType recordType, String tenantId);

Future<Void> deleteRecordById(String id, IdType idType, String tenantId);
Future<Void> deleteRecordById(String id, IdType idType, Map<String, String> okapiHeaders);
}
Loading

0 comments on commit ac09b6c

Please sign in to comment.