Skip to content

Commit

Permalink
fix(indexing): Fix processing batch of updates to the same instance (#…
Browse files Browse the repository at this point in the history
…748)

* fix(indexing): Fix processing batch of updates to the same instance

- Process only consumer record with latest timestamp

Closes: MSEARCH-951
  • Loading branch information
viacheslavkol authored Feb 17, 2025
1 parent 202333a commit 1ab99c3
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 13 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* Fix documentation not being updated ([MSEARCH-937](https://folio-org.atlassian.net/browse/MSEARCH-937))
* Fix "Fix dead link" api-docs workflow step for release builds ([MSEARCH-947](https://folio-org.atlassian.net/browse/MSEARCH-947))
* Fix issue with using member tenant postgres schema on reindexing ([MSEARCH-957](https://folio-org.atlassian.net/browse/MSEARCH-957))
* Fix processing batch of updates to the same instance ([MSEARCH-951](https://folio-org.atlassian.net/browse/MSEARCH-951))

### Tech Dept
* Recreate upload ranges each upload execution ([MSEARCH-934](https://folio-org.atlassian.net/browse/MSEARCH-934))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ConsumerRecords<String, ResourceEvent> intercept(ConsumerRecords<String,
if (list.size() > 1) {
list.sort(Comparator.comparingLong(ConsumerRecord::timestamp));
}
consumerRecords.add(list.get(0).value());
consumerRecords.add(list.get(list.size() - 1).value());
}
populate(consumerRecords);
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.service.InstanceChildrenResourceService;
import org.folio.search.service.consortium.ConsortiumTenantExecutor;
import org.folio.search.service.reindex.jdbc.ItemRepository;
import org.folio.search.service.reindex.jdbc.MergeInstanceRepository;
import org.folio.spring.exception.SystemUserAuthorizationException;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -39,16 +43,17 @@ class PopulateInstanceBatchInterceptorTest {
@Mock
private InstanceChildrenResourceService instanceChildrenResourceService;
@Mock
private ItemRepository itemRepository;
private MergeInstanceRepository instanceRepository;
@Mock
private Consumer<String, ResourceEvent> consumer;

private PopulateInstanceBatchInterceptor populateInstanceBatchInterceptor;

@BeforeEach
void setUp() {
when(instanceRepository.entityType()).thenCallRealMethod();
populateInstanceBatchInterceptor = new PopulateInstanceBatchInterceptor(
List.of(itemRepository),
List.of(instanceRepository),
executionService,
systemUserScopedExecutionService,
instanceChildrenResourceService
Expand Down Expand Up @@ -76,15 +81,7 @@ void shouldHandleSystemUserAuthorizationExceptionInIntercept() {
@Test
void shouldProcessRecordsSuccessfullyInIntercept() {
// Arrange
doAnswer(invocation -> {
Supplier<?> operation = invocation.getArgument(0);
return operation.get();
}).when(executionService).execute(any(Supplier.class));

doAnswer(invocation -> {
Callable<?> action = invocation.getArgument(1);
return action.call();
}).when(systemUserScopedExecutionService).executeSystemUserScoped(any(String.class), any(Callable.class));
mockExecutionServices();

var resourceEvent = new ResourceEvent().tenant(TENANT_ID).resourceName("instance");
var consumerRecord = new ConsumerRecord<>("topic", 0, 0L, "key", resourceEvent);
Expand All @@ -97,4 +94,45 @@ void shouldProcessRecordsSuccessfullyInIntercept() {
verify(systemUserScopedExecutionService).executeSystemUserScoped(eq(TENANT_ID), any());
verify(executionService).execute(any());
}

@Test
void shouldProcessOnlyLatestRecordsSuccessfullyInIntercept() {
// Arrange
mockExecutionServices();

var now = System.currentTimeMillis();
var expected = Map.<String, Object>of("id", 3);
var consumerRecord1 = createConsumerRecord(Map.of("id", 1), now);
var consumerRecord2 = createConsumerRecord(Map.of("id", 2), now + 1);
var consumerRecord3 = createConsumerRecord(expected, now + 2);
var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 0),
List.of(consumerRecord3, consumerRecord2, consumerRecord1)));

// Act
populateInstanceBatchInterceptor.intercept(records, consumer);

// Assert
verify(instanceRepository).saveEntities(TENANT_ID, List.of(expected));
}

private void mockExecutionServices() {
doAnswer(invocation -> {
Supplier<?> operation = invocation.getArgument(0);
return operation.get();
}).when(executionService).execute(any(Supplier.class));

doAnswer(invocation -> {
Callable<?> action = invocation.getArgument(1);
return action.call();
}).when(systemUserScopedExecutionService).executeSystemUserScoped(any(String.class), any(Callable.class));
}

private ConsumerRecord<String, ResourceEvent> createConsumerRecord(Map<String, Object> resourceNew, long timestamp) {
var resourceEvent = new ResourceEvent()
.tenant(TENANT_ID)
.resourceName("instance")
._new(resourceNew);
return new ConsumerRecord<>("topic", 0, 0L, timestamp, TimestampType.CREATE_TIME, 0, 0, "key",
resourceEvent, new RecordHeaders(), Optional.empty());
}
}

0 comments on commit 1ab99c3

Please sign in to comment.