Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for History Iterator in Replayer #980

Merged
merged 7 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,8 @@ public DecisionEvents next() {
}
decisionEvents.add(events.next());
}
DecisionEvents result =
new DecisionEvents(
newEvents,
decisionEvents,
replay,
replayCurrentTimeMilliseconds,
nextDecisionEventId);
return result;
return new DecisionEvents(
newEvents, decisionEvents, replay, replayCurrentTimeMilliseconds, nextDecisionEventId);
}
}

Expand Down
32 changes: 10 additions & 22 deletions src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ private void processEvent(HistoryEvent event) {
context.handleChildWorkflowExecutionTimedOut(event);
break;
case DecisionTaskCompleted:
// NOOP
break;
case DecisionTaskScheduled:
case WorkflowExecutionTimedOut:
case WorkflowExecutionTerminated:
// NOOP
break;
case DecisionTaskStarted:
Expand All @@ -208,12 +208,6 @@ private void processEvent(HistoryEvent event) {
case WorkflowExecutionStarted:
handleWorkflowExecutionStarted(event);
break;
case WorkflowExecutionTerminated:
// NOOP
break;
case WorkflowExecutionTimedOut:
// NOOP
break;
case ActivityTaskScheduled:
decisionsHelper.handleActivityTaskScheduled(event);
break;
Expand All @@ -227,11 +221,8 @@ private void processEvent(HistoryEvent event) {
context.handleMarkerRecorded(event);
break;
case WorkflowExecutionCompleted:
break;
case WorkflowExecutionFailed:
break;
case WorkflowExecutionCanceled:
break;
case WorkflowExecutionContinuedAsNew:
break;
case TimerStarted:
Expand Down Expand Up @@ -410,7 +401,7 @@ private Map<String, WorkflowQueryResult> getQueryResults(Map<String, WorkflowQue
return queries
.entrySet()
.stream()
.collect(Collectors.toMap(q -> q.getKey(), q -> queryWorkflow(q.getValue())));
.collect(Collectors.toMap(Map.Entry::getKey, q -> queryWorkflow(q.getValue())));
}

private WorkflowQueryResult queryWorkflow(WorkflowQuery query) {
Expand Down Expand Up @@ -632,9 +623,9 @@ private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHis
private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200);
private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4);
private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
private Duration decisionTaskStartToCloseTimeout;
private final Duration decisionTaskStartToCloseTimeout;

private final Duration decisionTaskRemainingTime() {
private Duration decisionTaskRemainingTime() {
Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
return decisionTaskStartToCloseTimeout.minus(passed);
}
Expand Down Expand Up @@ -715,14 +706,11 @@ public HistoryEvent next() {
}
if (!current.hasNext()) {
log.error(
"GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:"
+ request.execution.workflowId
+ ", runID:"
+ request.execution.runId
+ ", domain:"
+ request.domain
+ " token:"
+ Arrays.toString(request.getNextPageToken()));
"GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:{}, runID:{}, domain:{} token:{}",
request.execution.workflowId,
request.execution.runId,
request.domain,
Arrays.toString(request.getNextPageToken()));
throw new Error(
"GetWorkflowExecutionHistory return empty history, maybe a bug in server");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.internal.replay;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import com.uber.cadence.*;
import com.uber.cadence.client.WorkflowClientOptions;
import com.uber.cadence.internal.worker.SingleWorkerOptions;
import com.uber.cadence.serviceclient.IWorkflowService;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.*;

import org.apache.thrift.TException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

public class ReplaceDeciderDecisionTaskWithHistoryIteratorTest {
@Mock
private IWorkflowService mockService;

@Mock
private DecisionContextImpl mockContext;

@Mock
private DecisionsHelper mockedHelper;

private static final int MAXIMUM_PAGE_SIZE = 10000;
private final String WORKFLOW_ID = "testWorkflowId";
private final String RUN_ID = "testRunId";
private final String DOMAIN = "testDomain";
private final String START_PAGE_TOKEN = "testPageToken";
private final WorkflowExecution WORKFLOW_EXECUTION =
new WorkflowExecution().setWorkflowId(WORKFLOW_ID).setRunId(RUN_ID);
private final HistoryEvent START_EVENT =
new HistoryEvent()
.setWorkflowExecutionStartedEventAttributes(new WorkflowExecutionStartedEventAttributes())
.setEventId(1);
private final History HISTORY = new History().setEvents(Collections.singletonList(START_EVENT));
private final PollForDecisionTaskResponse task =
new PollForDecisionTaskResponse()
.setWorkflowExecution(WORKFLOW_EXECUTION)
.setHistory(HISTORY)
.setNextPageToken(START_PAGE_TOKEN.getBytes());

private Object iterator;

private void setupDecisionTaskWithHistoryIteratorImpl() {
try {
// Find the inner class first
Class<?> innerClass = findDecisionTaskWithHistoryIteratorImplClass();

// Get the constructor with the specific parameter types
Constructor<?> constructor =
innerClass.getDeclaredConstructor(
ReplayDecider.class, PollForDecisionTaskResponse.class, Duration.class);
constructor.setAccessible(true);
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved

when(mockedHelper.getTask()).thenReturn(task);
when(mockContext.getDomain()).thenReturn(DOMAIN);

// Create an instance of the outer class
ReplayDecider outerInstance =
new ReplayDecider(
mockService,
DOMAIN,
new WorkflowType().setName("testWorkflow"),
null,
mockedHelper,
SingleWorkerOptions.newBuilder()
.setMetricsScope(WorkflowClientOptions.defaultInstance().getMetricsScope())
.build(),
null);

// Create the instance
iterator = constructor.newInstance(outerInstance, task, Duration.ofSeconds(10));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Failed to set up test: " + e.getMessage(), e);
}
}

// Helper method to find the inner class
private Class<?> findDecisionTaskWithHistoryIteratorImplClass() {
for (Class<?> declaredClass : ReplayDecider.class.getDeclaredClasses()) {
if (declaredClass.getSimpleName().equals("DecisionTaskWithHistoryIteratorImpl")) {
return declaredClass;
}
}
throw new RuntimeException("Could not find DecisionTaskWithHistoryIteratorImpl inner class");
}

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
setupDecisionTaskWithHistoryIteratorImpl();
}

@Test
public void testGetHistoryWithSinglePageOfEvents()
throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// Arrange
List<HistoryEvent> events = Arrays.asList(createMockHistoryEvent(2), createMockHistoryEvent(3));
History mockHistory = new History().setEvents(events);
when(mockService.GetWorkflowExecutionHistory(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(START_PAGE_TOKEN.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
.thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(mockHistory));

// Act & Assert
Method wrapperMethod = iterator.getClass().getMethod("getHistory");

Object result = wrapperMethod.invoke(iterator);
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
assertTrue(historyIterator.hasNext());
assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId());
assertTrue(historyIterator.hasNext());
assertEquals(events.get(0).getEventId(), historyIterator.next().getEventId());
assertTrue(historyIterator.hasNext());
assertEquals(events.get(1).getEventId(), historyIterator.next().getEventId());
assertFalse(historyIterator.hasNext());
}

@Test
public void testGetHistoryWithMultiplePages()
throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// First page events
List<HistoryEvent> firstPageEvents =
Arrays.asList(createMockHistoryEvent(1), createMockHistoryEvent(2));
History firstHistory = new History().setEvents(firstPageEvents);
String firstPageToken = "firstPageToken";
when(mockService.GetWorkflowExecutionHistory(
eq(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(START_PAGE_TOKEN.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE))))
.thenReturn(
new GetWorkflowExecutionHistoryResponse()
.setHistory(firstHistory)
.setNextPageToken(firstPageToken.getBytes()));

// Second page events
List<HistoryEvent> secondPageEvents =
Arrays.asList(createMockHistoryEvent(3), createMockHistoryEvent(4));
History secondHistory = new History().setEvents(secondPageEvents);
when(mockService.GetWorkflowExecutionHistory(
eq(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(firstPageToken.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE))))
.thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(secondHistory));

// Act & Assert
Method wrapperMethod = iterator.getClass().getMethod("getHistory");

Object result = wrapperMethod.invoke(iterator);
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
// Check first page events
assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId());
assertEquals(firstPageEvents.get(0).getEventId(), historyIterator.next().getEventId());
assertEquals(firstPageEvents.get(1).getEventId(), historyIterator.next().getEventId());

// Check second page events
assertEquals(secondPageEvents.get(0).getEventId(), historyIterator.next().getEventId());
assertEquals(secondPageEvents.get(1).getEventId(), historyIterator.next().getEventId());

assertFalse(historyIterator.hasNext());
}

@Test(expected = Error.class)
public void testGetHistoryFailure()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a spin lock :( retryOptions does not have a backoff coefficient. So it tries around 100 times in 4 seconds until it gives up. Our tests are rather slow, but I'm unsure how this should be resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched to usage of validateBuildWithDefaults since I think it should be used instead of manual build()

throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException {
when(mockService.GetWorkflowExecutionHistory(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(START_PAGE_TOKEN.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
.thenThrow(new TException());

// Act & Assert
Method wrapperMethod = iterator.getClass().getMethod("getHistory");

Object result = wrapperMethod.invoke(iterator);
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
historyIterator.next();

historyIterator.next(); // This should throw an Error due to timeout
}

@Test(expected = Error.class)
public void testEmptyHistory()
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException {
when(mockService.GetWorkflowExecutionHistory(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(START_PAGE_TOKEN.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
.thenReturn(
new GetWorkflowExecutionHistoryResponse()
.setHistory(new History().setEvents(new ArrayList<>())));

// Act & Assert
Method wrapperMethod = iterator.getClass().getMethod("getHistory");

Object result = wrapperMethod.invoke(iterator);
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
historyIterator.next();

historyIterator.next(); // This should throw an Error due to timeout
}

// Helper method to create mock HistoryEvent
private HistoryEvent createMockHistoryEvent(int eventId) {
return new HistoryEvent().setEventId(eventId);
}
}